1#!/usr/bin/env python3
2
3"""Main CHART scheduler process.
4Builds the list of tasks to run from xml files.
5In an infinite loop checks what task to run next and when and sleeps until then.
6"""
7
8import os
9import time
10import signal
11import fnmatch
12import logging
13import importlib
14from datetime import datetime, timedelta
15
16from chart.project import settings
17from chart.backend import worker
18from chart.common.args import ArgumentParser
19from chart.common import notifications
20from chart.common.env import set_env
21from chart.common.prettyprint import show_timedelta
22from chart.common.xml import parsechilddatetime
23from chart.common.xml import parsechildstr
24from chart.common.xml import xml_to_timedelta
25from chart.common.exceptions import ConfigError
26from chart.common.prettyprint import show_time
27from chart.backend.job import Job
28from chart.backend.jobs import find_jobs
29from chart.db.func import Count
30from chart.db.connection import db_connect
31from chart.backend.activity import CallingConvention
32from chart.backend.jobcreator import JobCreator
33from chart.common.log import init_log
34from chart.common.exceptions import UnknownSIDError
35from chart.backend.jobcreator import ProductAttributes
36from chart.db.func import Reversed
37from chart.project import SID
38from chart.common.prettyprint import Table
39
40jobs_conn = db_connect('JOBS')
41
42# ELEM_DAY = 'day'
43# ELEM_HOUR = 'hour'
44# ELEM_MINUTE = 'minute'
45# ELEM_OFFSET = 'offset'
46# ELEM_DURATION = 'duration'
47# ELEM_MINUTE = 'minute'
48
49logger = logging.getLogger()
50
51
52# If a directory has not been scanned due to mtime delta after RESCAN_TIMEOUT,
53# we rescan it anyway (in case the directory mtime has not been updated).
54# Not sure if this is actually useful
55RESCAN_TIMEOUT = timedelta(minutes=15)
56
57
58def module(module_name):
59 """Cached load of `module_name` from settings."""
60 result = module.cache.get(module_name)
61 if result is not None:
62 return result
63
64 try:
65 mod = importlib.import_module(module_name)
66 except ImportError as e:
67 # we rework the error as the default message lacks path i.e. No module named fileattr
68 logger.error(e)
69 raise ValueError('Could not import {mod}'.format(mod=module_name))
70
71 module.cache[module_name] = mod
72 return mod
73
74
75module.cache = {}
76
77
78class TriggerRegisterer(type):
79 """We probably don't really need metaclasses here."""
80 triggers = {}
81
82 def __init__(cls, name, bases, members):
83 if name != 'Trigger':
84 assert hasattr(cls, 'trigger_name')
85 TriggerRegisterer.triggers[cls.trigger_name] = cls
86 # logger.debug('Registering trigger class ' + name + ' with element tag ' + \
87 # cls.trigger_name)
88
89 super(TriggerRegisterer, cls).__init__(name, bases, members)
90
91
92class Trigger(metaclass=TriggerRegisterer):
93 """Base for individual Trigger classes."""
94
95 @classmethod
96 def factory(cls, category, trigger):
97 """Generate an instance of a derived *Trigger* class from a trigger
98 dictionary from a JobCreator."""
99 name = trigger['type']
100 # logger.debug('Building trigger name ' + name)
101 return TriggerRegisterer.triggers[name](category, trigger)
102
103
104class Now(Trigger):
105 """Class for triggering event ever schedule pass i.e. every SLEEP_TIME period."""
106
107 trigger_name = 'now'
108
109 def __init__(self, _, trigger): # category
110 super(Now, self).__init__()
111 self.dependency = trigger.get('dependency', 0)
112
113 def __str__(self):
114 return 'When activity {act} is executed'.format(act=self.dependency)
115
116 def check(self, current_time):
117 """Raise a job if our time window has expired."""
118 executions = []
119
120 # check for dependencies
121 if self.dependency is None:
122 # No dependency
123 jobs = 1
124
125 else:
126 # only run if dependent job has been executed in meantime
127 gen_time = (datetime.utcnow() -
128 timedelta(seconds=settings.SCHEDULER_SLEEP.total_seconds() * 1.5))
129 dependency = self.dependency
130
131 jobs = find_jobs(fields=(Count(), ),
132 activity=dependency,
133 gen_time_gt=gen_time).fetchone()[0]
134
135 # logger.debug('Number of dependent ' + dependency + ' jobs performed since ' +
136 # str(gen_time) + ' is = ' + str(jobs))
137
138 if jobs >= 1:
139 executions.append({'sensing_start': current_time -
140 timedelta(hours=current_time.hour,
141 minutes=current_time.minute,
142 seconds=current_time.second,
143 microseconds=current_time.microsecond),
144 'sensing_stop': datetime.utcnow()})
145 # 'sensing_stop': current_time})
146 return executions
147
148
149class Hourly(Trigger):
150 """Class for triggering hours events."""
151
152 trigger_name = 'hourly'
153 period = timedelta(hours=1)
154
155 def __init__(self, _, trigger):
156 super(Hourly, self).__init__()
157 self.minute = trigger.get('minute', 0)
158 self.next_run_time = None
159
160 def __str__(self):
161 return 'Once per hour at {min} minutes past the hour'.format(min=self.minute)
162
163 def check(self, current_time):
164 """Given the time now is `current_time`, compute when we should next
165 trigger."""
166 if self.next_run_time is None:
167 self.next_run_time = (current_time + timedelta(hours=1) -
168 timedelta(minutes=(current_time.minute - self.minute) % 60,
169 seconds=current_time.second,
170 microseconds=current_time.microsecond))
171
172 # print 'next run time ', self.next_run_time
173 return []
174
175 executions = []
176 if self.next_run_time < current_time:
177 while self.next_run_time <= current_time:
178 # executions.append({'sensing_stop':self.next_run_time})
179 # next_run_time is the theoretical stop time for the algorithm run.
180 # the scheduler actually runs 6 hours behind to allow for data latencies.
181 # The event raised should include sensing_start and sensing_stop
182 executions.append({'sensing_start': self.next_run_time - Hourly.period,
183 'sensing_stop': self.next_run_time})
184 self.next_run_time += Hourly.period
185
186 return executions
187
188
189class Daily(Trigger):
190 """Class for events which trigger daily."""
191
192 trigger_name = 'daily'
193 period = timedelta(days=1)
194
195 def __init__(self, _, trigger): # category
196 super(Daily, self).__init__()
197 self.hour = trigger['hour'] # parsechildint(elem, ELEM_HOUR, 0)
198 self.next_run_time = None
199 self.offset = trigger.get('offset', None)
200 self.duration = trigger['duration']
201
202 def __str__(self):
203 res = 'Once each day at hour {hour}'.format(hour=self.hour)
204
205 if self.offset is not None:
206 res += ' applying offset of {offset}'.format(
207 offset=show_timedelta(self.offset))
208
209 if self.duration is not Daily.period:
210 res += ' for {dur}'.format(dur=show_timedelta(self.duration))
211
212 return res
213
214 def check(self, current_time):
215 """Given the time now is `current_time`, compute when we should next
216 trigger.
217 """
218 # print 'current_time ', current_time, ' next run time ', self.next_run_time
219 if self.next_run_time is None:
220 self.next_run_time = current_time + timedelta(days=1) - \
221 timedelta(hours=(current_time.hour - self.hour) % 24,
222 minutes=current_time.minute,
223 seconds=current_time.second,
224 microseconds=current_time.microsecond)
225
226 # print 'next run time ', self.next_run_time
227 return []
228
229 executions = []
230 if self.next_run_time < current_time:
231 while self.next_run_time <= current_time:
232 job_start = self.next_run_time - self.duration
233 job_stop = self.next_run_time
234
235 if self.offset is not None:
236 job_start += self.offset
237 job_stop += self.offset
238
239 executions.append({'sensing_start': job_start,
240 'sensing_stop': job_stop})
241 self.next_run_time += Daily.period
242
243 return executions
244
245
246class Weekly(Trigger):
247 """Class for events which should trigger each week."""
248
249 trigger_name = 'weekly'
250 period = timedelta(days=7)
251
252 def __init__(self, _, trigger):
253 super(Weekly, self).__init__()
254 self.day = trigger['day']
255 self.hour = trigger['hour']
256 self.offset = trigger['offset']
257 self.next_run_time = None
258
259 def __str__(self):
260 res = 'Once each week on day {day} at {hour}:00'.format(
261 day=self.day, hour=self.hour)
262
263 if self.offset is not None:
264 res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
265
266 return res
267
268 def check(self, current_time):
269 """Given the time now is `current_time`, compute when we should next
270 trigger.
271 """
272
273 # print 'current_time ', current_time, ' next run time ', self.next_run_time
274 if self.next_run_time is None:
275 # self.next_run_time = (
276 # current_time # start now
277 # - timedelta(
278 # timedelta(days=7) - # add a week
279 # timedelta(days=(current_time.weekday() - self.day) % 7,
280 # hours=(current_time.hour - self.hour) % 60,
281 # minutes=(current_time.minute - self.minute) % 60,
282 # seconds=current_time.second,
283 # microseconds=current_time.microsecond)
284
285 self.next_run_time = current_time
286 self.next_run_time -= timedelta(days=current_time.weekday(),
287 hours=current_time.hour,
288 minutes=current_time.minute,
289 seconds=current_time.second,
290 microseconds=current_time.microsecond)
291 # logger.debug(' back to ' + str(self.next_run_time) + \
292 # ' bzos ' + str(current_time.weekday()))
293 self.next_run_time += timedelta(days=self.day,
294 hours=self.hour)
295 if self.next_run_time < current_time:
296 self.next_run_time += timedelta(days=7)
297
298 # logger.debug('Weekly set initial next_run_time to {next} based on {now}'.format(
299 # next=self.next_run_time, now=current_time))
300 return []
301
302 executions = []
303 if self.next_run_time < current_time:
304 while self.next_run_time <= current_time:
305 # yield a new job, applying our offset (normally negative, if used)
306
307 job_start = self.next_run_time - Weekly.period
308 job_stop = self.next_run_time
309
310 if self.offset is not None:
311 job_start += self.offset
312 job_stop += self.offset
313
314 executions.append({'sensing_start': job_start,
315 'sensing_stop': job_stop})
316
317 self.next_run_time += Weekly.period
318
319 return executions
320
321
322class Monthly(Trigger):
323 """Class for events which are triggered each month."""
324
325 trigger_name = 'monthly'
326
327 def __init__(self, _, trigger):
328 super(Monthly, self).__init__()
329 self.day = trigger['day']
330 self.hour = trigger['hour']
331 self.offset = trigger['offset']
332 self.next_run_time = None
333
334 def __str__(self):
335 res = 'Once each month on day {day} hour {hour}'.format(
336 day=self.day, hour=self.hour)
337
338 if self.offset is not None:
339 res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
340
341 return res
342
343 def check(self, current_time):
344 """Given the time now is `current_time`, compute when we should next
345 trigger.
346 """
347
348 # print 'current_time ', current_time, ' next run time ', self.next_run_time
349 if self.next_run_time is None:
350 acc = datetime(current_time.year, current_time.month,
351 current_time.day, current_time.hour)
352 # print 'acc starts at ', acc
353 while not (acc.day == self.day and acc.hour == self.hour):
354 acc -= timedelta(hours=1)
355 # print 'acc becomes ', acc
356
357 # print 'acc ends up as ', acc
358
359 hour = acc.hour
360 day = acc.day
361 month = acc.month
362 year = acc.year
363
364 month += 1
365 if month == 13:
366 year += 1
367 month = 1
368
369 self.next_run_time = datetime(year, month, day, hour)
370 # print 'next run time ', self.next_run_time
371 return []
372
373 executions = []
374 if self.next_run_time < current_time:
375 while self.next_run_time <= current_time:
376 # executions.append({'sensing_stop':self.next_run_time})
377 event = {'sensing_stop': self.next_run_time}
378 hour = self.next_run_time.hour
379 day = self.next_run_time.day
380 month = self.next_run_time.month
381 year = self.next_run_time.year
382 month -= 1
383 if month == 0:
384 year -= 1
385 month = 12
386
387 event['sensing_start'] = datetime(year, month, day, hour)
388 if self.offset is not None:
389 event['sensing_start'] += self.offset
390 event['sensing_stop'] += self.offset
391
392 executions.append(event)
393 month = self.next_run_time.month
394 year = self.next_run_time.year
395 month += 1
396 if month == 13:
397 year += 1
398 month = 1
399
400 self.next_run_time = datetime(year, month, day, hour)
401
402 return executions
403
404
405class Annual(Trigger):
406 """Class for events which are triggered each month."""
407
408 trigger_name = 'annual'
409
410 def __init__(self, _, trigger):
411 super(Annual, self).__init__()
412 self.month = trigger['month']
413 self.day = trigger['day']
414 self.hour = trigger['hour']
415 self.offset = trigger['offset']
416 self.next_run_time = None
417
418 def __str__(self):
419 res = 'Once each year on month {month} day {day} hour {hour}'.format(
420 month=self.month, day=self.day, hour=self.hour)
421
422 if self.offset is not None:
423 res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
424
425 return res
426
427 def check(self, current_time):
428 """Given the time now is `current_time`, compute when we should next
429 trigger.
430 """
431 # First time around, compute when we should have last run
432 if self.next_run_time is None:
433 acc = datetime(current_time.year, current_time.month,
434 current_time.day, current_time.hour)
435 # print 'acc starts at ', acc
436 while not (acc.month == self.month and acc.day == self.day and acc.hour == self.hour):
437 acc -= timedelta(hours=1)
438
439 hour = acc.hour
440 day = acc.day
441 month = acc.month
442 year = acc.year
443
444 # now increment by a year
445 year += 1
446
447 self.next_run_time = datetime(year, month, day, hour)
448 return []
449
450 executions = []
451 if self.next_run_time < current_time:
452 while self.next_run_time <= current_time:
453 # executions.append({'sensing_stop':self.next_run_time})
454 event = {'sensing_stop': self.next_run_time}
455 hour = self.next_run_time.hour
456 day = self.next_run_time.day
457 month = self.next_run_time.month
458 year = self.next_run_time.year
459 month -= 1
460 if month == 0:
461 year -= 1
462 month = 12
463
464 event['sensing_start'] = datetime(year, month, day, hour)
465 if self.offset is not None:
466 event['sensing_start'] += self.offset
467 event['sensing_stop'] += self.offset
468
469 executions.append(event)
470 month = self.next_run_time.month
471 year = self.next_run_time.year
472 month += 1
473 if month == 13:
474 year += 1
475 month = 1
476
477 self.next_run_time = datetime(year, month, day, hour)
478
479 return executions
480
481
482class Periodic(Trigger):
483 """Schedule periodic runs.
484 Correctly handle cases when system was restarted - try to
485 run the missed scheduling ocurrances.
486 Absolute start of periodic sequence is specified in xml.
487 """
488
489 trigger_name = 'periodic'
490
491 def __init__(self, _, elem): # category
492 super(Periodic, self).__init__()
493 period = parsechildstr(elem, 'period')
494 self.delta = xml_to_timedelta(period)
495 self.start = parsechilddatetime(elem, 'start')
496 self.last_time = None
497 self.acc = None
498
499 def __str__(self):
500 return 'from {start} run every {delta}'.format(start=self.start, delta=self.delta)
501
502 def check(self, current_time):
503 """Given the time now is `current_time`, compute when we should next
504 trigger.
505 """
506
507 # print 'periodic check current_time ', current_time
508 if self.last_time is None:
509 self.last_time = current_time
510 return None
511
512 if self.acc is None:
513 self.acc = self.start
514
515 result = []
516 while self.acc < current_time:
517 # print 'testing self.acc ', self.acc, ' last_time ', self.last_time
518 if self.acc >= self.last_time:
519 # print ' MATCH'
520 result.append({'sensing_start': self.acc - self.delta,
521 'sensing_stop': self.acc})
522
523 self.acc += self.delta
524
525 return result
526
527
528class DirectoryMonitor(Trigger):
529 """Periodically scans directory for new files.
530 check() returns a sorted list of new files found.
531 """
532
533 trigger_name = 'directory-monitor'
534
535 def __init__(self, category, trigger):
536 super(DirectoryMonitor, self).__init__()
537 self.category = category
538
539 # name of directory to scan
540 self.directory = trigger['directory']
541
542 # glob pattern to scan for
543 self.pattern = trigger['pattern']
544
545 # number of PDUs to hold in a queue before ingesting them,
546 # to allow out-of-order PDUs to be reordered first
547 # (needed for HKTM and NOAA GAC distillation)
548 self.queue_length = trigger['pdu_delay_queue']
549 self.timeout = trigger['timeout']
550
551 # if not os.path.isdir(self.directory):
552 # raise IOError('Cannot open directory ' + self.directory)
553
554 # directory handle
555 # self.dir_fd = None
556
557 # directory modification time
558 self.mtime = None
559 self.parser = trigger['parser']
560
561 if self.queue_length is not None:
562 self.queue = []
563
564 # timestamp of last directory scan.
565 self.last_scan_time = datetime.utcnow()
566
567 # timestamp of most recent received file,
568 # or None for startup or when we are in active notification state
569 # self.last_new_file_time = self.last_scan_time
570 self.timeout_deadline = None
571 self.old_files = None
572
573 def __str__(self):
574 result = 'Scan directory {dir} for {pattern}'.format(
575 dir=self.directory, pattern=self.pattern)
576
577 if self.queue_length is not None:
578 result += ' with delay queue of {queue}'.format(queue=self.queue_length)
579
580 return result
581
582 @staticmethod
583 def db_scan(category):
584 """Perform an initial scan of the JOBS table to build a list of all previously
585 detected PDUs."""
586
587 logger.info('Building initial PDUs list from database')
588 DirectoryMonitor.initial_pdus = set(row[0] for row in find_jobs(
589 category=category,
590 fields=('FILENAME',),
591 filename_ne=None,
592 sensing_start_ge=datetime.utcnow() - settings.FILE_SCAN_CUTOFF))
593 logger.info('Found {cc} ingested filenames from jobs table'.format(
594 cc=len(DirectoryMonitor.initial_pdus)))
595
596 def check(self, current_time): # (unused argument) pylint: disable=W0613
597 """Search for new products (PDUs) in self.directory.
598 Returns a suitably sorted list of filename events.
599 """
600 if not hasattr(DirectoryMonitor, 'initial_pdus'):
601 DirectoryMonitor.db_scan(self.category)
602
603 if self.old_files is None:
604 # DirectoryMonitor.initial_pdus is shared across all DirectoryMonitors,
605 # and cleared by the __main__ loop afer the first iteration.
606 self.old_files = DirectoryMonitor.initial_pdus
607
608 if self.directory is None:
609 # Special scheduler files used by the generic ingester tool
610 return
611
612 if not self.directory.is_dir():
613 logger.warning('No such directory: {d}'.format(d=self.directory))
614 return
615
616 mtime = self.directory.stat().st_mtime
617 # logger.debug(self.directory+' mtime '+str(mtime))
618
619 # If it's been less than 15 minutes since the last full scan and the directory
620 # mtime hasn't changes, do nothing
621 if (self.mtime and self.mtime == mtime and
622 (datetime.utcnow() - self.last_scan_time) < RESCAN_TIMEOUT):
623 return
624
625 if self.mtime is None:
626 logger.info('Scanning {dir} first pass'.format(dir=self.directory))
627
628 elif self.mtime != mtime:
629 logger.info('Scanning {dir} due to changed directory timestamp'.format(
630 dir=self.directory))
631
632 else:
633 logger.info('Scanning {dir} due to timeout'.format(dir=self.directory))
634
635 self.mtime = mtime
636 self.last_scan_time = datetime.utcnow()
637
638 # Read the current set of files, and subtract the known processed files
639 logger.debug('Scan {d} for {p}'.format(d=self.directory, p=self.pattern))
640 current_files = set(
641 fnmatch.filter(
642 (str(f.name) for f in self.directory.iterdir()),
643 self.pattern))
644 new_files = current_files - self.old_files
645 logger.debug('Found {total} files of which {new} are not in initial or previous scan'.format(
646 total=len(current_files), new=len(new_files)))
647
648 if self.queue_length is not None:
649 assert self.queue_length != 0 # we don't handle this properly
650 # the queue is needed for PDU types where multiple sequential MDRs are required
651 # to build a snack (esp. Metop HKTM and NOAA GAC). The ftp transfer to MET TCE
652 # does not always deliver PDUs in creation order so we hold back some in an
653 # ordered queue
654 self.queue.extend(new_files)
655 self.queue.sort()
656 new_files = self.queue[:-self.queue_length]
657 self.queue = self.queue[-self.queue_length:]
658 # logger.debug('Taken from queue: '+str(result))
659
660 # for the first pass only, examine all files but diiscard if older than
661 # settings.FILE_SCAN_CUTOFF
662 # for all other passes process all files
663 # Possible problem: if someone inserts a really old product into the incoming
664 # directory while the system is running, it'll get processed even though it's
665 # too old
666 cutoff_time = datetime.utcnow() - settings.FILE_SCAN_CUTOFF
667 cutoff_count = 0
668 for new_file in new_files:
669 path = self.directory.joinpath(new_file)
670 try:
671 parsed = self.parser(path)
672 if parsed is None:
673 # clean rejection by product attribute function
674 continue
675
676 if isinstance(parsed, ProductAttributes):
677 sid = parsed.sid
678 sensing_start = parsed.sensing_start
679
680 else:
681 sid, sensing_start = self.parser(path)[:2]
682 except UnknownSIDError as e:
683 logger.warning('Rejecting product {e}'.format(e=e))
684 continue
685 except ValueError as e:
686 logger.warning('Rejecting {name} as filename cannot be parsed: {e}'.format(
687 name=new_file, e=e))
688 continue
689
690 if self.old_files is DirectoryMonitor.initial_pdus:
691 if sensing_start is None or sensing_start < cutoff_time:
692 cutoff_count += 1
693 continue
694
695 yield {'filename': path,
696 'sid': sid,
697 'sensing_start': sensing_start}
698
699 if cutoff_count > 0:
700 logger.info('Rejected {cc} files for being older than cutoff of {cutoff}'.format(
701 cc=cutoff_count, cutoff=cutoff_time))
702
703 self.old_files = current_files
704
705 # if we have had no ingestions of a particular rule for a while, raise a notification
706 # if configured
707 if self.timeout is not None:
708 # here is a bug here. If the scheduler raises a notification due to lack of data,
709 # then the scheduler is restarted, then the data flow starts again, the notification
710 # will not be cleared.
711 # One solution is to delete all notification state files after 24 hours.
712 now = datetime.utcnow()
713 if len(new_files) > 0 or self.timeout_deadline is None:
714 # if we had ingestion for this directory in this cycle, reset the timeout
715 # deadline
716 # if self.last_new_file_time is None:
717 dtype = os.path.basename(str(self.directory))
718 notifications.denotify('NO_INGESTION', parameter=dtype)
719
720 self.timeout_deadline = now + self.timeout
721 logger.debug('Ingestion or first pass in {dir} so timeout reset to {now} + '
722 '{timeout} = {deadline}'.format(
723 dir=self.directory,
724 now=show_time(now),
725 timeout=self.timeout,
726 deadline=show_time(self.timeout_deadline)))
727
728 else:
729 # if there was no ingestion this cycle, check if we are past deadline
730 # if self.last_new_file_time is not None:
731 # time since we last spotted a new PDU
732 # duration = self.last_scan_time - self.last_new_file_time
733 if now > self.timeout_deadline:
734 logger.debug('Timeout deadline for {dir} is {deadline}, now is {now} so we '
735 'throw a notification'.format(
736 dir=self.directory,
737 deadline=show_time(self.timeout_deadline),
738 now=show_time(now)))
739
740 dtype = os.path.basename(str(self.directory))
741 notifications.notify('NO_INGESTION',
742 parameter=dtype,
743 message='No ingestion on {dtype} for over {dur}'.format(
744 dtype=dtype,
745 dur=show_timedelta(self.timeout)))
746
747 else:
748 logger.debug('Timeout deadline for {dir} is {deadline}, now is {now} so no '
749 'notification'.format(
750 dir=self.directory,
751 deadline=show_time(self.timeout_deadline),
752 now=show_time(now)))
753
754 # prevent the message from being raised again
755 # self.last_new_file_time = None
756
757 # logger.debug('dir {dur} last scan {ls} last file {lf} delta {d}'.format(
758 # dur=self.directory,
759 # ls=self.last_scan_time,
760 # lf=self.last_new_file_time,
761 # d=self.last_scan_time - self.last_new_file_time if \
762 # self.last_new_file_time is not None else None))
763
764
765def run_activity(category,
766 activity,
767 event,
768 simulate):
769 """Schedule new job(s) for `activity` to be dispatched based on the `event` which just
770 occurred."""
771 if simulate:
772 logger.debug('Simulating job {activity} {job}'.format(
773 activity=activity.name, job=event))
774 if activity.convention is CallingConvention.SID_TIMERANGE and 'sid' not in event:
775 raise ValueError('No SID in SID_TIMERANGE event')
776
777 return
778
779 if activity.convention is CallingConvention.FILENAME:
780 worker.add_job(Job(category=category,
781 activity=activity.name,
782 sid=event['sid'],
783 sensing_start=event['sensing_start'],
784 filename=event['filename']))
785
786 elif activity.convention is CallingConvention.SID_TIMERANGE:
787 worker.add_job(Job(category=category,
788 activity=activity.name,
789 sensing_start=event['sensing_start'],
790 sensing_stop=event['sensing_stop'],
791 sid=event['sid']))
792
793 elif activity.convention is CallingConvention.SID_TIMERANGE_TABLE:
794 # if run manually or from worker chains the client would need to specify the
795 # tablename. However when run from a timed job we allow this to be omitted.
796 worker.add_job(Job(category=category,
797 activity=activity.name,
798 sensing_start=event['sensing_start'],
799 sensing_stop=event['sensing_stop'],
800 # tablename won't be there in timed jobs
801 sid=event.get('sid')))
802
803 elif activity.convention is CallingConvention.TIMERANGE:
804 worker.add_job(Job(category=category,
805 activity=activity.name,
806 sensing_start=event['sensing_start'],
807 sensing_stop=event['sensing_stop']))
808
809 elif activity.convention is CallingConvention.NONE:
810 worker.add_job(Job(category=category,
811 activity=activity.name))
812
813 else:
814 raise ConfigError('Unhandled calling convention {c}'.format(c=activity.convention.value))
815
816
817def scan(category, # (dangerous argument []) pylint: disable=W0102
818 allowed_activities=None,
819 excluded_activities=[],
820 exclude_directory_monitoring=False):
821 """Scan the "schedule" directory and pull scheduling info from xml files.
822 Build the control list of tasks. List consists of tuples:
823 (<scheduling-type>, <action-type>), which are instances of, respectively,
824
825 * Trigger: Hourly, Weekly, Periodic, DirectoryMonitor
826 * Response: List of named Activities
827
828 Args:
829 `alg_dir` (Path): Path to the directory containing task desciption xml files
830 (usually ".../schedule")
831 """
832 schedule = []
833
834 for job_creator in JobCreator.all():
835 if not job_creator.enabled:
836 continue
837
838 response = list(job_creator.responses(
839 allowed_activities=allowed_activities,
840 excluded_activities=excluded_activities,
841 exclude_directory_monitoring=exclude_directory_monitoring))
842
843 if len(response) == 0:
844 continue
845
846 schedule_obj = {'name': job_creator.name,
847 'response': response,
848 'trigger': Trigger.factory(category, job_creator.trigger)}
849 schedule.append(schedule_obj)
850
851 return schedule
852
853
854def show_schedule(schedule):
855 """Display all configured activities."""
856 for task in schedule:
857 responses = ' and '.join('run {name} ({sids})'.format(
858 name=x['activity'].name,
859 sids=','.join(str(s) for s in x['sids'])) for x in task['response'])
860 print('{trigger} {responses}'.format(trigger=task['trigger'], responses=responses))
861
862
863def signal_handler(sig, _):
864 """Try to handle control-C cleanly."""
865 if sig == signal.SIGINT: # Sent by ctrl-C or 'kill -SIGINT'
866 logger.info('SIGINT received')
867
868 elif sig == signal.SIGTERM: # Default signal sent by 'kill'
869 logger.info('SIGTERM received')
870
871 else:
872 logger.info('Signal {sig} received'.format(sig=sig))
873
874 raise SystemExit
875
876
877def run_forever(category, schedule, simulate, start, sleep, one_shot=False):
878 """Main loop. Constantly check all triggers and run responses.
879 Args:
880 `schedule` (list of dicts): Job schedule read from XML files in scheduler dir
881 `simulate` (timedelta or None): Increase the internal time value by this amount instead
882 of real time and don't actually create any jobs.
883 `start` (datetime or None): Force a different start time
884 `sleep` (timedelta or None): Sleep time between iterations
885 `one_shot` (bool): Terminate after a single pass
886 """
887 # Used to send hourly-ish notifications as a test
888 next_hour = datetime.utcnow()
889
890 now = None
891 if start is not None:
892 now = start
893
894 else:
895 now = datetime.utcnow()
896
897 # make category local?
898 if simulate is None:
899 logger.info('Scheduling jobs with category {c}'.format(c=category))
900
901 else:
902 logger.info('Simulating schedule with time jump of {s}'.format(s=simulate))
903 sleep = timedelta(seconds=1)
904
905 while True:
906 if simulate is None:
907 now = datetime.utcnow()
908 logger.info('Wall clock time is now {now}'.format(now=now))
909
910 else:
911 now += simulate
912 logger.info('Synthetic time now {now}'.format(now=now))
913
914 # logger.debug('Checking for events for '+str(now))
915
916 for task in schedule:
917 events = task['trigger'].check(now)
918
919 if start:
920 # if a time range is specified, only run timed events
921 events = [x for x in events if 'sensing_stop' in x]
922
923 for event in events:
924 for response in task['response']:
925 if len(response['sids']) > 0:
926 for sid in response['sids']:
927 event['sid'] = sid
928 run_activity(category, response['activity'], event, simulate)
929
930 else:
931 run_activity(category, response['activity'], event, simulate)
932
933 # the initial PDU list is large and we don't need it in memory after the first
934 # iteration
935 DirectoryMonitor.initial_pdus = None
936
937 jobs_conn.commit()
938
939 if one_shot:
940 return
941
942 logger.debug('Sleeping for {delay}'.format(delay=sleep))
943 time.sleep(sleep.total_seconds())
944
945 # logger.debug('hourly test {a} against {b}'.format(a=datetime.utcnow(), b=next_hour))
946 if datetime.utcnow() > next_hour:
947 notifications.notify('HOURLY_NOTIFICATION', 'Test hourly notification')
948 next_hour = datetime.utcnow() + timedelta(hours=1)
949
950
951def show_scheduler_status(category:str, limit:int):
952 """Display recent jobs created by scheduler."""
953 table = Table(headings=('Generation', 'ID', 'Activity', 'Start', 'Filename / Table', 'Source'))
954 for row in find_jobs(
955 fields=['GEN_TIME', 'ID', 'ACTIVITY', 'SENSING_START', 'FILENAME', 'TABLENAME'] + SID.sql_sys_select('JOBS'),
956 category=category,
957 parent_ne=None,
958 order_by=Reversed('gen_time'),
959 limit=limit):
960 gen_time, job_id, activity, sensing_start, filename, tablename = row[:6]
961 sid = SID.from_sys_select('JOBS', row[6:])
962 table.append((gen_time,
963 job_id,
964 activity,
965 sensing_start,
966 filename if filename is not None else tablename,
967 sid))
968
969 table.write()
970
971
972def main():
973 """Command line entry point."""
974
975 # Must be done in scheduler as it has to scan these directories
976 # if 'CHART_INGATE_DIR' not in os.environ:
977 # os.environ['CHART_INGATE_DIR'] = settings.INGATE_DIR
978
979 signal.signal(signal.SIGINT, signal_handler)
980 signal.signal(signal.SIGTERM, signal_handler)
981
982 parser = ArgumentParser()
983 parser.add_argument('--log-file',
984 help='Single logfile location')
985 parser.add_argument('--rotating-log-file',
986 help='Rotating logfile location')
987 parser.add_argument('--db', '--conn', '-d',
988 metavar='CONN',
989 help='Use database connection CONNECTION')
990 parser.add_argument('--project-id',
991 type=int,
992 metavar='ID',
993 help='Force PROJECT for multi-project JOBS table function')
994 parser.add_argument('--category',
995 default=settings.DEFAULT_CATEGORY,
996 help='Category to use when creating new jobs')
997 parser.add_argument('--simulate',
998 type=ArgumentParser.timedelta,
999 metavar='SPEED',
1000 help='Increase the internal clock by SPEED each iteration instead of '
1001 'real time. Also deactivates directory monitoring and only simulated '
1002 'scheduling')
1003 parser.add_argument('--show-schedule', '--show', '--list', '-l',
1004 action='store_true',
1005 help='Just display configured schedule then exit')
1006 parser.add_argument('--start',
1007 type=ArgumentParser.datetime,
1008 metavar='TIME',
1009 help='Begin scheduling at TIME')
1010 # parser.add_argument('--simulate',
1011 # action='store_true',
1012 # help='Just show which algorithms would be scheduled instead '
1013 # 'of running them')
1014 parser.add_argument('--only-activities', '--activities',
1015 nargs='*',
1016 help='Only call activities from this list')
1017 parser.add_argument('--exclude-activities', '--exclude',
1018 nargs='*',
1019 help='Exclude activities from this list')
1020 parser.add_argument('--scan-period',
1021 type=ArgumentParser.timedelta,
1022 default=settings.FILE_SCAN_CUTOFF,
1023 help='Configure how far back in the file scanner should look')
1024 parser.add_argument('--sleep-period',
1025 type=ArgumentParser.timedelta,
1026 metavar='SLEEP',
1027 default=settings.SCHEDULER_SLEEP,
1028 help='Wait for SLEEP between iterations')
1029 parser.add_argument('--one-shot',
1030 action='store_true',
1031 help='Just loop once then terminate')
1032 parser.add_argument('--status',
1033 action='store_true',
1034 help='Show most recently created jobs')
1035 parser.add_argument('--limit',
1036 type=int,
1037 help='Number of jobs to display if --status is used',
1038 default=10)
1039
1040 # postpone log file init until we do it explicitly later
1041 args = parser.parse_args(no_log_init=True)
1042
1043 if args.rotating_log_file:
1044 settings.ROTATING_LOG_FILE = args.rotating_log_file
1045
1046 if args.log_file:
1047 settings.SINGLE_LOG_FILE = args.single_log_file
1048
1049 init_log()
1050
1051 if args.db:
1052 settings.set_db_name(args.db)
1053
1054 if args.project_id:
1055 settings.DATABASE_PROJECT_ID = args.project_id
1056
1057 if args.status:
1058 show_scheduler_status(args.category, limit=args.limit)
1059 parser.exit()
1060
1061 if not settings.SCHEDULE_DIR.is_dir():
1062 parser.error('Schedule directory {p} not found'.format(
1063 p=settings.SCHEDULE_DIR))
1064
1065 if args.start and not args.simulate:
1066 parser.error('--start option can only be used if --simulate is used')
1067
1068 # scheduler requires state file dir if notifications are raised
1069 if not settings.DEBUG and not settings.STATEFILE_DIR.is_dir():
1070 raise ValueError('Statefile directory {name} not found'.format(name=settings.STATEFILE_DIR))
1071
1072 if args.scan_period:
1073 settings.FILE_SCAN_CUTOFF = args.scan_period
1074
1075 category = args.category.upper()
1076
1077 # the notification module only sends emails if being run from inside the dispatcher.
1078 # This is to prevent superflous emails being send when running command line tools manually.
1079 # So we trick it here into thinking we are running from dispatcher.
1080 # This is messy and should be done better.
1081 os.environ['{ENV_PREFIX}DISPATCHER'.format(ENV_PREFIX=settings.ENV_PREFIX)] = '1'
1082 schedule = scan(category=args.category,
1083 allowed_activities=args.only_activities,
1084 excluded_activities=args.exclude_activities,
1085 exclude_directory_monitoring=args.simulate is not None)
1086
1087 if args.show_schedule:
1088 show_schedule(schedule)
1089 parser.exit()
1090
1091 notifications.notify('SCHEDULER_STARTUP',
1092 message='This is a test notification sent on schedule startup')
1093
1094 # Tell the notification module it is being run by the dispatcher
1095 # otherwise notification emails are disabled
1096 set_env('{ENV_PREFIX}DISPATCHER', '1')
1097
1098 run_forever(category,
1099 schedule,
1100 simulate=args.simulate,
1101 start=args.start,
1102 sleep=args.sleep_period,
1103 one_shot=args.one_shot)
1104
1105if __name__ == '__main__':
1106 main()