1#!/usr/bin/env python3
   2
   3"""Main CHART scheduler process.
   4Builds the list of tasks to run from xml files.
   5In an infinite loop checks what task to run next and when and sleeps until then.
   6"""
   7
   8import os
   9import time
  10import signal
  11import fnmatch
  12import logging
  13import importlib
  14from datetime import datetime, timedelta
  15
  16from chart.project import settings
  17from chart.backend import worker
  18from chart.common.args import ArgumentParser
  19from chart.common import notifications
  20from chart.common.env import set_env
  21from chart.common.prettyprint import show_timedelta
  22from chart.common.xml import parsechilddatetime
  23from chart.common.xml import parsechildstr
  24from chart.common.xml import xml_to_timedelta
  25from chart.common.exceptions import ConfigError
  26from chart.common.prettyprint import show_time
  27from chart.backend.job import Job
  28from chart.backend.jobs import find_jobs
  29from chart.db.func import Count
  30from chart.db.connection import db_connect
  31from chart.backend.activity import CallingConvention
  32from chart.backend.jobcreator import JobCreator
  33from chart.common.log import init_log
  34from chart.common.exceptions import UnknownSIDError
  35from chart.backend.jobcreator import ProductAttributes
  36from chart.db.func import Reversed
  37from chart.project import SID
  38from chart.common.prettyprint import Table
  39
  40jobs_conn = db_connect('JOBS')
  41
  42# ELEM_DAY = 'day'
  43# ELEM_HOUR = 'hour'
  44# ELEM_MINUTE = 'minute'
  45# ELEM_OFFSET = 'offset'
  46# ELEM_DURATION = 'duration'
  47# ELEM_MINUTE = 'minute'
  48
  49logger = logging.getLogger()
  50
  51
  52# If a directory has not been scanned due to mtime delta after RESCAN_TIMEOUT,
  53# we rescan it anyway (in case the directory mtime has not been updated).
  54# Not sure if this is actually useful
  55RESCAN_TIMEOUT = timedelta(minutes=15)
  56
  57
  58def module(module_name):
  59    """Cached load of `module_name` from settings."""
  60    result = module.cache.get(module_name)
  61    if result is not None:
  62        return result
  63
  64    try:
  65        mod = importlib.import_module(module_name)
  66    except ImportError as e:
  67        # we rework the error as the default message lacks path i.e. No module named fileattr
  68        logger.error(e)
  69        raise ValueError('Could not import {mod}'.format(mod=module_name))
  70
  71    module.cache[module_name] = mod
  72    return mod
  73
  74
  75module.cache = {}
  76
  77
  78class TriggerRegisterer(type):
  79    """We probably don't really need metaclasses here."""
  80    triggers = {}
  81
  82    def __init__(cls, name, bases, members):
  83        if name != 'Trigger':
  84            assert hasattr(cls, 'trigger_name')
  85            TriggerRegisterer.triggers[cls.trigger_name] = cls
  86            # logger.debug('Registering trigger class ' + name + ' with element tag ' + \
  87            # cls.trigger_name)
  88
  89        super(TriggerRegisterer, cls).__init__(name, bases, members)
  90
  91
  92class Trigger(metaclass=TriggerRegisterer):
  93    """Base for individual Trigger classes."""
  94
  95    @classmethod
  96    def factory(cls, category, trigger):
  97        """Generate an instance of a derived *Trigger* class from a trigger
  98        dictionary from a JobCreator."""
  99        name = trigger['type']
 100        # logger.debug('Building trigger name ' + name)
 101        return TriggerRegisterer.triggers[name](category, trigger)
 102
 103
 104class Now(Trigger):
 105    """Class for triggering event ever schedule pass i.e. every SLEEP_TIME period."""
 106
 107    trigger_name = 'now'
 108
 109    def __init__(self, _, trigger):  # category
 110        super(Now, self).__init__()
 111        self.dependency = trigger.get('dependency', 0)
 112
 113    def __str__(self):
 114        return 'When activity {act} is executed'.format(act=self.dependency)
 115
 116    def check(self, current_time):
 117        """Raise a job if our time window has expired."""
 118        executions = []
 119
 120         # check for dependencies
 121        if self.dependency is None:
 122            # No dependency
 123            jobs = 1
 124
 125        else:
 126            # only run if dependent job has been executed in meantime
 127            gen_time = (datetime.utcnow() -
 128                        timedelta(seconds=settings.SCHEDULER_SLEEP.total_seconds() * 1.5))
 129            dependency = self.dependency
 130
 131            jobs = find_jobs(fields=(Count(), ),
 132                             activity=dependency,
 133                             gen_time_gt=gen_time).fetchone()[0]
 134
 135            # logger.debug('Number of dependent ' + dependency + ' jobs performed since ' +
 136            # str(gen_time) + ' is = ' + str(jobs))
 137
 138        if jobs >= 1:
 139            executions.append({'sensing_start': current_time -
 140                               timedelta(hours=current_time.hour,
 141                                         minutes=current_time.minute,
 142                                         seconds=current_time.second,
 143                                         microseconds=current_time.microsecond),
 144                               'sensing_stop': datetime.utcnow()})
 145            # 'sensing_stop': current_time})
 146        return executions
 147
 148
 149class Hourly(Trigger):
 150    """Class for triggering hours events."""
 151
 152    trigger_name = 'hourly'
 153    period = timedelta(hours=1)
 154
 155    def __init__(self, _, trigger):
 156        super(Hourly, self).__init__()
 157        self.minute = trigger.get('minute', 0)
 158        self.next_run_time = None
 159
 160    def __str__(self):
 161        return 'Once per hour at {min} minutes past the hour'.format(min=self.minute)
 162
 163    def check(self, current_time):
 164        """Given the time now is `current_time`, compute when we should next
 165        trigger."""
 166        if self.next_run_time is None:
 167            self.next_run_time = (current_time + timedelta(hours=1) -
 168                                  timedelta(minutes=(current_time.minute - self.minute) % 60,
 169                                            seconds=current_time.second,
 170                                            microseconds=current_time.microsecond))
 171
 172            # print 'next run time ', self.next_run_time
 173            return []
 174
 175        executions = []
 176        if self.next_run_time < current_time:
 177            while self.next_run_time <= current_time:
 178                # executions.append({'sensing_stop':self.next_run_time})
 179                # next_run_time is the theoretical stop time for the algorithm run.
 180                # the scheduler actually runs 6 hours behind to allow for data latencies.
 181                # The event raised should include sensing_start and sensing_stop
 182                executions.append({'sensing_start': self.next_run_time - Hourly.period,
 183                                   'sensing_stop': self.next_run_time})
 184                self.next_run_time += Hourly.period
 185
 186        return executions
 187
 188
 189class Daily(Trigger):
 190    """Class for events which trigger daily."""
 191
 192    trigger_name = 'daily'
 193    period = timedelta(days=1)
 194
 195    def __init__(self, _, trigger):  # category
 196        super(Daily, self).__init__()
 197        self.hour = trigger['hour']  # parsechildint(elem, ELEM_HOUR, 0)
 198        self.next_run_time = None
 199        self.offset = trigger.get('offset', None)
 200        self.duration = trigger['duration']
 201
 202    def __str__(self):
 203        res = 'Once each day at hour {hour}'.format(hour=self.hour)
 204
 205        if self.offset is not None:
 206            res += ' applying offset of {offset}'.format(
 207                offset=show_timedelta(self.offset))
 208
 209        if self.duration is not Daily.period:
 210            res += ' for {dur}'.format(dur=show_timedelta(self.duration))
 211
 212        return res
 213
 214    def check(self, current_time):
 215        """Given the time now is `current_time`, compute when we should next
 216        trigger.
 217        """
 218        # print 'current_time ', current_time, ' next run time ', self.next_run_time
 219        if self.next_run_time is None:
 220            self.next_run_time = current_time + timedelta(days=1) - \
 221                timedelta(hours=(current_time.hour - self.hour) % 24,
 222                          minutes=current_time.minute,
 223                          seconds=current_time.second,
 224                          microseconds=current_time.microsecond)
 225
 226            # print 'next run time ', self.next_run_time
 227            return []
 228
 229        executions = []
 230        if self.next_run_time < current_time:
 231            while self.next_run_time <= current_time:
 232                job_start = self.next_run_time - self.duration
 233                job_stop = self.next_run_time
 234
 235                if self.offset is not None:
 236                    job_start += self.offset
 237                    job_stop += self.offset
 238
 239                executions.append({'sensing_start': job_start,
 240                                   'sensing_stop': job_stop})
 241                self.next_run_time += Daily.period
 242
 243        return executions
 244
 245
 246class Weekly(Trigger):
 247    """Class for events which should trigger each week."""
 248
 249    trigger_name = 'weekly'
 250    period = timedelta(days=7)
 251
 252    def __init__(self, _, trigger):
 253        super(Weekly, self).__init__()
 254        self.day = trigger['day']
 255        self.hour = trigger['hour']
 256        self.offset = trigger['offset']
 257        self.next_run_time = None
 258
 259    def __str__(self):
 260        res = 'Once each week on day {day} at {hour}:00'.format(
 261            day=self.day, hour=self.hour)
 262
 263        if self.offset is not None:
 264            res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
 265
 266        return res
 267
 268    def check(self, current_time):
 269        """Given the time now is `current_time`, compute when we should next
 270        trigger.
 271        """
 272
 273        # print 'current_time ', current_time, ' next run time ', self.next_run_time
 274        if self.next_run_time is None:
 275            # self.next_run_time = (
 276                # current_time  # start now
 277                # - timedelta(
 278                # timedelta(days=7) -  # add a week
 279                                # timedelta(days=(current_time.weekday() - self.day) % 7,
 280                                        # hours=(current_time.hour - self.hour) % 60,
 281                                        # minutes=(current_time.minute - self.minute) % 60,
 282                                        # seconds=current_time.second,
 283                                        # microseconds=current_time.microsecond)
 284
 285            self.next_run_time = current_time
 286            self.next_run_time -= timedelta(days=current_time.weekday(),
 287                                            hours=current_time.hour,
 288                                            minutes=current_time.minute,
 289                                            seconds=current_time.second,
 290                                            microseconds=current_time.microsecond)
 291            # logger.debug('    back to ' + str(self.next_run_time) + \
 292            # ' bzos ' + str(current_time.weekday()))
 293            self.next_run_time += timedelta(days=self.day,
 294                                            hours=self.hour)
 295            if self.next_run_time < current_time:
 296                self.next_run_time += timedelta(days=7)
 297
 298            # logger.debug('Weekly set initial next_run_time to {next} based on {now}'.format(
 299            # next=self.next_run_time, now=current_time))
 300            return []
 301
 302        executions = []
 303        if self.next_run_time < current_time:
 304            while self.next_run_time <= current_time:
 305                # yield a new job, applying our offset (normally negative, if used)
 306
 307                job_start = self.next_run_time - Weekly.period
 308                job_stop = self.next_run_time
 309
 310                if self.offset is not None:
 311                    job_start += self.offset
 312                    job_stop += self.offset
 313
 314                executions.append({'sensing_start': job_start,
 315                                   'sensing_stop': job_stop})
 316
 317                self.next_run_time += Weekly.period
 318
 319        return executions
 320
 321
 322class Monthly(Trigger):
 323    """Class for events which are triggered each month."""
 324
 325    trigger_name = 'monthly'
 326
 327    def __init__(self, _, trigger):
 328        super(Monthly, self).__init__()
 329        self.day = trigger['day']
 330        self.hour = trigger['hour']
 331        self.offset = trigger['offset']
 332        self.next_run_time = None
 333
 334    def __str__(self):
 335        res = 'Once each month on day {day} hour {hour}'.format(
 336            day=self.day, hour=self.hour)
 337
 338        if self.offset is not None:
 339            res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
 340
 341        return res
 342
 343    def check(self, current_time):
 344        """Given the time now is `current_time`, compute when we should next
 345        trigger.
 346        """
 347
 348        # print 'current_time ', current_time, ' next run time ', self.next_run_time
 349        if self.next_run_time is None:
 350            acc = datetime(current_time.year, current_time.month,
 351                           current_time.day, current_time.hour)
 352            # print 'acc starts at ', acc
 353            while not (acc.day == self.day and acc.hour == self.hour):
 354                acc -= timedelta(hours=1)
 355                # print 'acc becomes ', acc
 356
 357            # print 'acc ends up as ', acc
 358
 359            hour = acc.hour
 360            day = acc.day
 361            month = acc.month
 362            year = acc.year
 363
 364            month += 1
 365            if month == 13:
 366                year += 1
 367                month = 1
 368
 369            self.next_run_time = datetime(year, month, day, hour)
 370            # print 'next run time ', self.next_run_time
 371            return []
 372
 373        executions = []
 374        if self.next_run_time < current_time:
 375            while self.next_run_time <= current_time:
 376                # executions.append({'sensing_stop':self.next_run_time})
 377                event = {'sensing_stop': self.next_run_time}
 378                hour = self.next_run_time.hour
 379                day = self.next_run_time.day
 380                month = self.next_run_time.month
 381                year = self.next_run_time.year
 382                month -= 1
 383                if month == 0:
 384                    year -= 1
 385                    month = 12
 386
 387                event['sensing_start'] = datetime(year, month, day, hour)
 388                if self.offset is not None:
 389                    event['sensing_start'] += self.offset
 390                    event['sensing_stop'] += self.offset
 391
 392                executions.append(event)
 393                month = self.next_run_time.month
 394                year = self.next_run_time.year
 395                month += 1
 396                if month == 13:
 397                    year += 1
 398                    month = 1
 399
 400                self.next_run_time = datetime(year, month, day, hour)
 401
 402        return executions
 403
 404
 405class Annual(Trigger):
 406    """Class for events which are triggered each month."""
 407
 408    trigger_name = 'annual'
 409
 410    def __init__(self, _, trigger):
 411        super(Annual, self).__init__()
 412        self.month = trigger['month']
 413        self.day = trigger['day']
 414        self.hour = trigger['hour']
 415        self.offset = trigger['offset']
 416        self.next_run_time = None
 417
 418    def __str__(self):
 419        res = 'Once each year on month {month} day {day} hour {hour}'.format(
 420            month=self.month, day=self.day, hour=self.hour)
 421
 422        if self.offset is not None:
 423            res += ' applying offset of {offset}'.format(offset=show_timedelta(self.offset))
 424
 425        return res
 426
 427    def check(self, current_time):
 428        """Given the time now is `current_time`, compute when we should next
 429        trigger.
 430        """
 431        # First time around, compute when we should have last run
 432        if self.next_run_time is None:
 433            acc = datetime(current_time.year, current_time.month,
 434                           current_time.day, current_time.hour)
 435            # print 'acc starts at ', acc
 436            while not (acc.month == self.month and acc.day == self.day and acc.hour == self.hour):
 437                acc -= timedelta(hours=1)
 438
 439            hour = acc.hour
 440            day = acc.day
 441            month = acc.month
 442            year = acc.year
 443
 444            # now increment by a year
 445            year += 1
 446
 447            self.next_run_time = datetime(year, month, day, hour)
 448            return []
 449
 450        executions = []
 451        if self.next_run_time < current_time:
 452            while self.next_run_time <= current_time:
 453                # executions.append({'sensing_stop':self.next_run_time})
 454                event = {'sensing_stop': self.next_run_time}
 455                hour = self.next_run_time.hour
 456                day = self.next_run_time.day
 457                month = self.next_run_time.month
 458                year = self.next_run_time.year
 459                month -= 1
 460                if month == 0:
 461                    year -= 1
 462                    month = 12
 463
 464                event['sensing_start'] = datetime(year, month, day, hour)
 465                if self.offset is not None:
 466                    event['sensing_start'] += self.offset
 467                    event['sensing_stop'] += self.offset
 468
 469                executions.append(event)
 470                month = self.next_run_time.month
 471                year = self.next_run_time.year
 472                month += 1
 473                if month == 13:
 474                    year += 1
 475                    month = 1
 476
 477                self.next_run_time = datetime(year, month, day, hour)
 478
 479        return executions
 480
 481
 482class Periodic(Trigger):
 483    """Schedule periodic runs.
 484    Correctly handle cases when system was restarted - try to
 485    run the missed scheduling ocurrances.
 486    Absolute start of periodic sequence is specified in xml.
 487    """
 488
 489    trigger_name = 'periodic'
 490
 491    def __init__(self, _, elem):  # category
 492        super(Periodic, self).__init__()
 493        period = parsechildstr(elem, 'period')
 494        self.delta = xml_to_timedelta(period)
 495        self.start = parsechilddatetime(elem, 'start')
 496        self.last_time = None
 497        self.acc = None
 498
 499    def __str__(self):
 500        return 'from {start} run every {delta}'.format(start=self.start, delta=self.delta)
 501
 502    def check(self, current_time):
 503        """Given the time now is `current_time`, compute when we should next
 504        trigger.
 505        """
 506
 507        # print 'periodic check current_time ', current_time
 508        if self.last_time is None:
 509            self.last_time = current_time
 510            return None
 511
 512        if self.acc is None:
 513            self.acc = self.start
 514
 515        result = []
 516        while self.acc < current_time:
 517            # print 'testing self.acc ', self.acc, ' last_time ', self.last_time
 518            if self.acc >= self.last_time:
 519                # print '    MATCH'
 520                result.append({'sensing_start': self.acc - self.delta,
 521                               'sensing_stop': self.acc})
 522
 523            self.acc += self.delta
 524
 525        return result
 526
 527
 528class DirectoryMonitor(Trigger):
 529    """Periodically scans directory for new files.
 530    check() returns a sorted list of new files found.
 531    """
 532
 533    trigger_name = 'directory-monitor'
 534
 535    def __init__(self, category, trigger):
 536        super(DirectoryMonitor, self).__init__()
 537        self.category = category
 538
 539        # name of directory to scan
 540        self.directory = trigger['directory']
 541
 542        # glob pattern to scan for
 543        self.pattern = trigger['pattern']
 544
 545        # number of PDUs to hold in a queue before ingesting them,
 546        # to allow out-of-order PDUs to be reordered first
 547        # (needed for HKTM and NOAA GAC distillation)
 548        self.queue_length = trigger['pdu_delay_queue']
 549        self.timeout = trigger['timeout']
 550
 551        # if not os.path.isdir(self.directory):
 552            # raise IOError('Cannot open directory ' + self.directory)
 553
 554        # directory handle
 555        # self.dir_fd = None
 556
 557        # directory modification time
 558        self.mtime = None
 559        self.parser = trigger['parser']
 560
 561        if self.queue_length is not None:
 562            self.queue = []
 563
 564        # timestamp of last directory scan.
 565        self.last_scan_time = datetime.utcnow()
 566
 567        # timestamp of most recent received file,
 568        # or None for startup or when we are in active notification state
 569        # self.last_new_file_time = self.last_scan_time
 570        self.timeout_deadline = None
 571        self.old_files = None
 572
 573    def __str__(self):
 574        result = 'Scan directory {dir} for {pattern}'.format(
 575            dir=self.directory, pattern=self.pattern)
 576
 577        if self.queue_length is not None:
 578            result += ' with delay queue of {queue}'.format(queue=self.queue_length)
 579
 580        return result
 581
 582    @staticmethod
 583    def db_scan(category):
 584        """Perform an initial scan of the JOBS table to build a list of all previously
 585        detected PDUs."""
 586
 587        logger.info('Building initial PDUs list from database')
 588        DirectoryMonitor.initial_pdus = set(row[0] for row in find_jobs(
 589                category=category,
 590                fields=('FILENAME',),
 591                filename_ne=None,
 592                sensing_start_ge=datetime.utcnow() - settings.FILE_SCAN_CUTOFF))
 593        logger.info('Found {cc} ingested filenames from jobs table'.format(
 594                cc=len(DirectoryMonitor.initial_pdus)))
 595
 596    def check(self, current_time):  # (unused argument) pylint: disable=W0613
 597        """Search for new products (PDUs) in self.directory.
 598        Returns a suitably sorted list of filename events.
 599        """
 600        if not hasattr(DirectoryMonitor, 'initial_pdus'):
 601            DirectoryMonitor.db_scan(self.category)
 602
 603        if self.old_files is None:
 604            # DirectoryMonitor.initial_pdus is shared across all DirectoryMonitors,
 605            # and cleared by the __main__ loop afer the first iteration.
 606            self.old_files = DirectoryMonitor.initial_pdus
 607
 608        if self.directory is None:
 609            # Special scheduler files used by the generic ingester tool
 610            return
 611
 612        if not self.directory.is_dir():
 613            logger.warning('No such directory: {d}'.format(d=self.directory))
 614            return
 615
 616        mtime = self.directory.stat().st_mtime
 617        # logger.debug(self.directory+' mtime '+str(mtime))
 618
 619        # If it's been less than 15 minutes since the last full scan and the directory
 620        # mtime hasn't changes, do nothing
 621        if (self.mtime and self.mtime == mtime and
 622            (datetime.utcnow() - self.last_scan_time) < RESCAN_TIMEOUT):
 623            return
 624
 625        if self.mtime is None:
 626            logger.info('Scanning {dir} first pass'.format(dir=self.directory))
 627
 628        elif self.mtime != mtime:
 629            logger.info('Scanning {dir} due to changed directory timestamp'.format(
 630                    dir=self.directory))
 631
 632        else:
 633            logger.info('Scanning {dir} due to timeout'.format(dir=self.directory))
 634
 635        self.mtime = mtime
 636        self.last_scan_time = datetime.utcnow()
 637
 638        # Read the current set of files, and subtract the known processed files
 639        logger.debug('Scan {d} for {p}'.format(d=self.directory, p=self.pattern))
 640        current_files = set(
 641            fnmatch.filter(
 642                (str(f.name) for f in self.directory.iterdir()),
 643                self.pattern))
 644        new_files = current_files - self.old_files
 645        logger.debug('Found {total} files of which {new} are not in initial or previous scan'.format(
 646                total=len(current_files), new=len(new_files)))
 647
 648        if self.queue_length is not None:
 649            assert self.queue_length != 0  # we don't handle this properly
 650            # the queue is needed for PDU types where multiple sequential MDRs are required
 651            # to build a snack (esp. Metop HKTM and NOAA GAC). The ftp transfer to MET TCE
 652            # does not always deliver PDUs in creation order so we hold back some in an
 653            # ordered queue
 654            self.queue.extend(new_files)
 655            self.queue.sort()
 656            new_files = self.queue[:-self.queue_length]
 657            self.queue = self.queue[-self.queue_length:]
 658            # logger.debug('Taken from queue: '+str(result))
 659
 660        # for the first pass only, examine all files but diiscard if older than
 661        # settings.FILE_SCAN_CUTOFF
 662        # for all other passes process all files
 663        # Possible problem: if someone inserts a really old product into the incoming
 664        # directory while the system is running, it'll get processed even though it's
 665        # too old
 666        cutoff_time = datetime.utcnow() - settings.FILE_SCAN_CUTOFF
 667        cutoff_count = 0
 668        for new_file in new_files:
 669            path = self.directory.joinpath(new_file)
 670            try:
 671                parsed = self.parser(path)
 672                if parsed is None:
 673                    # clean rejection by product attribute function
 674                    continue
 675
 676                if isinstance(parsed, ProductAttributes):
 677                    sid = parsed.sid
 678                    sensing_start = parsed.sensing_start
 679
 680                else:
 681                    sid, sensing_start = self.parser(path)[:2]
 682            except UnknownSIDError as e:
 683                logger.warning('Rejecting product {e}'.format(e=e))
 684                continue
 685            except ValueError as e:
 686                logger.warning('Rejecting {name} as filename cannot be parsed: {e}'.format(
 687                    name=new_file, e=e))
 688                continue
 689
 690            if self.old_files is DirectoryMonitor.initial_pdus:
 691                if sensing_start is None or sensing_start < cutoff_time:
 692                    cutoff_count += 1
 693                    continue
 694
 695            yield {'filename': path,
 696                   'sid': sid,
 697                   'sensing_start': sensing_start}
 698
 699        if cutoff_count > 0:
 700            logger.info('Rejected {cc} files for being older than cutoff of {cutoff}'.format(
 701                cc=cutoff_count, cutoff=cutoff_time))
 702
 703        self.old_files = current_files
 704
 705        # if we have had no ingestions of a particular rule for a while, raise a notification
 706        # if configured
 707        if self.timeout is not None:
 708            # here is a bug here. If the scheduler raises a notification due to lack of data,
 709            # then the scheduler is restarted, then the data flow starts again, the notification
 710            # will not be cleared.
 711            # One solution is to delete all notification state files after 24 hours.
 712            now = datetime.utcnow()
 713            if len(new_files) > 0 or self.timeout_deadline is None:
 714                # if we had ingestion for this directory in this cycle, reset the timeout
 715                # deadline
 716                # if self.last_new_file_time is None:
 717                dtype = os.path.basename(str(self.directory))
 718                notifications.denotify('NO_INGESTION', parameter=dtype)
 719
 720                self.timeout_deadline = now + self.timeout
 721                logger.debug('Ingestion or first pass in {dir} so timeout reset to {now} + '
 722                              '{timeout} = {deadline}'.format(
 723                        dir=self.directory,
 724                        now=show_time(now),
 725                        timeout=self.timeout,
 726                        deadline=show_time(self.timeout_deadline)))
 727
 728            else:
 729                # if there was no ingestion this cycle, check if we are past deadline
 730                # if self.last_new_file_time is not None:
 731                    # time since we last spotted a new PDU
 732                    # duration = self.last_scan_time - self.last_new_file_time
 733                if now > self.timeout_deadline:
 734                    logger.debug('Timeout deadline for {dir} is {deadline}, now is {now} so we '
 735                                  'throw a notification'.format(
 736                            dir=self.directory,
 737                            deadline=show_time(self.timeout_deadline),
 738                            now=show_time(now)))
 739
 740                    dtype = os.path.basename(str(self.directory))
 741                    notifications.notify('NO_INGESTION',
 742                                         parameter=dtype,
 743                                         message='No ingestion on {dtype} for over {dur}'.format(
 744                            dtype=dtype,
 745                            dur=show_timedelta(self.timeout)))
 746
 747                else:
 748                    logger.debug('Timeout deadline for {dir} is {deadline}, now is {now} so no '
 749                                  'notification'.format(
 750                            dir=self.directory,
 751                            deadline=show_time(self.timeout_deadline),
 752                            now=show_time(now)))
 753
 754                    # prevent the message from being raised again
 755                    # self.last_new_file_time = None
 756
 757            # logger.debug('dir {dur} last scan {ls} last file {lf} delta {d}'.format(
 758                    # dur=self.directory,
 759                    # ls=self.last_scan_time,
 760                    # lf=self.last_new_file_time,
 761                    # d=self.last_scan_time - self.last_new_file_time if \
 762                        # self.last_new_file_time is not None else None))
 763
 764
 765def run_activity(category,
 766                 activity,
 767                 event,
 768                 simulate):
 769    """Schedule new job(s) for `activity` to be dispatched based on the `event` which just
 770    occurred."""
 771    if simulate:
 772        logger.debug('Simulating job {activity} {job}'.format(
 773            activity=activity.name, job=event))
 774        if activity.convention is CallingConvention.SID_TIMERANGE and 'sid' not in event:
 775            raise ValueError('No SID in SID_TIMERANGE event')
 776
 777        return
 778
 779    if activity.convention is CallingConvention.FILENAME:
 780        worker.add_job(Job(category=category,
 781                           activity=activity.name,
 782                           sid=event['sid'],
 783                           sensing_start=event['sensing_start'],
 784                           filename=event['filename']))
 785
 786    elif activity.convention is CallingConvention.SID_TIMERANGE:
 787        worker.add_job(Job(category=category,
 788                           activity=activity.name,
 789                           sensing_start=event['sensing_start'],
 790                           sensing_stop=event['sensing_stop'],
 791                           sid=event['sid']))
 792
 793    elif activity.convention is CallingConvention.SID_TIMERANGE_TABLE:
 794        # if run manually or from worker chains the client would need to specify the
 795        # tablename. However when run from a timed job we allow this to be omitted.
 796        worker.add_job(Job(category=category,
 797                           activity=activity.name,
 798                           sensing_start=event['sensing_start'],
 799                           sensing_stop=event['sensing_stop'],
 800                           # tablename won't be there in timed jobs
 801                           sid=event.get('sid')))
 802
 803    elif activity.convention is CallingConvention.TIMERANGE:
 804        worker.add_job(Job(category=category,
 805                           activity=activity.name,
 806                           sensing_start=event['sensing_start'],
 807                           sensing_stop=event['sensing_stop']))
 808
 809    elif activity.convention is CallingConvention.NONE:
 810        worker.add_job(Job(category=category,
 811                           activity=activity.name))
 812
 813    else:
 814        raise ConfigError('Unhandled calling convention {c}'.format(c=activity.convention.value))
 815
 816
 817def scan(category,  # (dangerous argument []) pylint: disable=W0102
 818         allowed_activities=None,
 819         excluded_activities=[],
 820         exclude_directory_monitoring=False):
 821    """Scan the "schedule" directory and pull scheduling info from xml files.
 822    Build the control list of tasks. List consists of tuples:
 823    (<scheduling-type>, <action-type>), which are instances of, respectively,
 824
 825    * Trigger: Hourly, Weekly, Periodic, DirectoryMonitor
 826    * Response: List of named Activities
 827
 828    Args:
 829        `alg_dir` (Path): Path to the directory containing task desciption xml files
 830            (usually ".../schedule")
 831    """
 832    schedule = []
 833
 834    for job_creator in JobCreator.all():
 835        if not job_creator.enabled:
 836            continue
 837
 838        response = list(job_creator.responses(
 839            allowed_activities=allowed_activities,
 840            excluded_activities=excluded_activities,
 841            exclude_directory_monitoring=exclude_directory_monitoring))
 842
 843        if len(response) == 0:
 844            continue
 845
 846        schedule_obj = {'name': job_creator.name,
 847                        'response': response,
 848                        'trigger': Trigger.factory(category, job_creator.trigger)}
 849        schedule.append(schedule_obj)
 850
 851    return schedule
 852
 853
 854def show_schedule(schedule):
 855    """Display all configured activities."""
 856    for task in schedule:
 857        responses = ' and '.join('run {name} ({sids})'.format(
 858            name=x['activity'].name,
 859            sids=','.join(str(s) for s in x['sids'])) for x in task['response'])
 860        print('{trigger} {responses}'.format(trigger=task['trigger'], responses=responses))
 861
 862
 863def signal_handler(sig, _):
 864    """Try to handle control-C cleanly."""
 865    if sig == signal.SIGINT:  # Sent by ctrl-C or 'kill -SIGINT'
 866        logger.info('SIGINT received')
 867
 868    elif sig == signal.SIGTERM:  # Default signal sent by 'kill'
 869        logger.info('SIGTERM received')
 870
 871    else:
 872        logger.info('Signal {sig} received'.format(sig=sig))
 873
 874    raise SystemExit
 875
 876
 877def run_forever(category, schedule, simulate, start, sleep, one_shot=False):
 878    """Main loop. Constantly check all triggers and run responses.
 879    Args:
 880        `schedule` (list of dicts): Job schedule read from XML files in scheduler dir
 881        `simulate` (timedelta or None): Increase the internal time value by this amount instead
 882            of real time and don't actually create any jobs.
 883        `start` (datetime or None): Force a different start time
 884        `sleep` (timedelta or None): Sleep time between iterations
 885        `one_shot` (bool): Terminate after a single pass
 886    """
 887    # Used to send hourly-ish notifications as a test
 888    next_hour = datetime.utcnow()
 889
 890    now = None
 891    if start is not None:
 892        now = start
 893
 894    else:
 895        now = datetime.utcnow()
 896
 897    # make category local?
 898    if simulate is None:
 899        logger.info('Scheduling jobs with category {c}'.format(c=category))
 900
 901    else:
 902        logger.info('Simulating schedule with time jump of {s}'.format(s=simulate))
 903        sleep = timedelta(seconds=1)
 904
 905    while True:
 906        if simulate is None:
 907            now = datetime.utcnow()
 908            logger.info('Wall clock time is now {now}'.format(now=now))
 909
 910        else:
 911            now += simulate
 912            logger.info('Synthetic time now {now}'.format(now=now))
 913
 914        # logger.debug('Checking for events for '+str(now))
 915
 916        for task in schedule:
 917            events = task['trigger'].check(now)
 918
 919            if start:
 920                # if a time range is specified, only run timed events
 921                events = [x for x in events if 'sensing_stop' in x]
 922
 923            for event in events:
 924                for response in task['response']:
 925                    if len(response['sids']) > 0:
 926                        for sid in response['sids']:
 927                            event['sid'] = sid
 928                            run_activity(category, response['activity'], event, simulate)
 929
 930                    else:
 931                        run_activity(category, response['activity'], event, simulate)
 932
 933        # the initial PDU list is large and we don't need it in memory after the first
 934        # iteration
 935        DirectoryMonitor.initial_pdus = None
 936
 937        jobs_conn.commit()
 938
 939        if one_shot:
 940            return
 941
 942        logger.debug('Sleeping for {delay}'.format(delay=sleep))
 943        time.sleep(sleep.total_seconds())
 944
 945        # logger.debug('hourly test {a} against {b}'.format(a=datetime.utcnow(), b=next_hour))
 946        if datetime.utcnow() > next_hour:
 947            notifications.notify('HOURLY_NOTIFICATION', 'Test hourly notification')
 948            next_hour = datetime.utcnow() + timedelta(hours=1)
 949
 950
 951def show_scheduler_status(category:str, limit:int):
 952    """Display recent jobs created by scheduler."""
 953    table = Table(headings=('Generation', 'ID', 'Activity', 'Start', 'Filename / Table', 'Source'))
 954    for row in find_jobs(
 955            fields=['GEN_TIME', 'ID', 'ACTIVITY', 'SENSING_START', 'FILENAME', 'TABLENAME'] + SID.sql_sys_select('JOBS'),
 956            category=category,
 957            parent_ne=None,
 958            order_by=Reversed('gen_time'),
 959            limit=limit):
 960        gen_time, job_id, activity, sensing_start, filename, tablename = row[:6]
 961        sid = SID.from_sys_select('JOBS', row[6:])
 962        table.append((gen_time,
 963                      job_id,
 964                      activity,
 965                      sensing_start,
 966                      filename if filename is not None else tablename,
 967                      sid))
 968
 969    table.write()
 970
 971
 972def main():
 973    """Command line entry point."""
 974
 975    # Must be done in scheduler as it has to scan these directories
 976    # if 'CHART_INGATE_DIR' not in os.environ:
 977        # os.environ['CHART_INGATE_DIR'] = settings.INGATE_DIR
 978
 979    signal.signal(signal.SIGINT, signal_handler)
 980    signal.signal(signal.SIGTERM, signal_handler)
 981
 982    parser = ArgumentParser()
 983    parser.add_argument('--log-file',
 984                        help='Single logfile location')
 985    parser.add_argument('--rotating-log-file',
 986                        help='Rotating logfile location')
 987    parser.add_argument('--db', '--conn', '-d',
 988                        metavar='CONN',
 989                        help='Use database connection CONNECTION')
 990    parser.add_argument('--project-id',
 991                        type=int,
 992                        metavar='ID',
 993                        help='Force PROJECT for multi-project JOBS table function')
 994    parser.add_argument('--category',
 995                        default=settings.DEFAULT_CATEGORY,
 996                        help='Category to use when creating new jobs')
 997    parser.add_argument('--simulate',
 998                        type=ArgumentParser.timedelta,
 999                        metavar='SPEED',
1000                        help='Increase the internal clock by SPEED each iteration instead of '
1001                        'real time. Also deactivates directory monitoring and only simulated '
1002                        'scheduling')
1003    parser.add_argument('--show-schedule', '--show', '--list', '-l',
1004                        action='store_true',
1005                        help='Just display configured schedule then exit')
1006    parser.add_argument('--start',
1007                        type=ArgumentParser.datetime,
1008                        metavar='TIME',
1009                        help='Begin scheduling at TIME')
1010    # parser.add_argument('--simulate',
1011                        # action='store_true',
1012                        # help='Just show which algorithms would be scheduled instead '
1013                        # 'of running them')
1014    parser.add_argument('--only-activities', '--activities',
1015                        nargs='*',
1016                        help='Only call activities from this list')
1017    parser.add_argument('--exclude-activities', '--exclude',
1018                        nargs='*',
1019                        help='Exclude activities from this list')
1020    parser.add_argument('--scan-period',
1021                        type=ArgumentParser.timedelta,
1022                        default=settings.FILE_SCAN_CUTOFF,
1023                        help='Configure how far back in the file scanner should look')
1024    parser.add_argument('--sleep-period',
1025                        type=ArgumentParser.timedelta,
1026                        metavar='SLEEP',
1027                        default=settings.SCHEDULER_SLEEP,
1028                        help='Wait for SLEEP between iterations')
1029    parser.add_argument('--one-shot',
1030                        action='store_true',
1031                        help='Just loop once then terminate')
1032    parser.add_argument('--status',
1033                        action='store_true',
1034                        help='Show most recently created jobs')
1035    parser.add_argument('--limit',
1036                        type=int,
1037                        help='Number of jobs to display if --status is used',
1038                        default=10)
1039
1040    # postpone log file init until we do it explicitly later
1041    args = parser.parse_args(no_log_init=True)
1042
1043    if args.rotating_log_file:
1044        settings.ROTATING_LOG_FILE = args.rotating_log_file
1045
1046    if args.log_file:
1047        settings.SINGLE_LOG_FILE = args.single_log_file
1048
1049    init_log()
1050
1051    if args.db:
1052        settings.set_db_name(args.db)
1053
1054    if args.project_id:
1055        settings.DATABASE_PROJECT_ID = args.project_id
1056
1057    if args.status:
1058        show_scheduler_status(args.category, limit=args.limit)
1059        parser.exit()
1060
1061    if not settings.SCHEDULE_DIR.is_dir():
1062        parser.error('Schedule directory {p} not found'.format(
1063            p=settings.SCHEDULE_DIR))
1064
1065    if args.start and not args.simulate:
1066        parser.error('--start option can only be used if --simulate is used')
1067
1068    # scheduler requires state file dir if notifications are raised
1069    if not settings.DEBUG and not settings.STATEFILE_DIR.is_dir():
1070        raise ValueError('Statefile directory {name} not found'.format(name=settings.STATEFILE_DIR))
1071
1072    if args.scan_period:
1073        settings.FILE_SCAN_CUTOFF = args.scan_period
1074
1075    category = args.category.upper()
1076
1077    # the notification module only sends emails if being run from inside the dispatcher.
1078    # This is to prevent superflous emails being send when running command line tools manually.
1079    # So we trick it here into thinking we are running from dispatcher.
1080    # This is messy and should be done better.
1081    os.environ['{ENV_PREFIX}DISPATCHER'.format(ENV_PREFIX=settings.ENV_PREFIX)] = '1'
1082    schedule = scan(category=args.category,
1083                    allowed_activities=args.only_activities,
1084                    excluded_activities=args.exclude_activities,
1085                    exclude_directory_monitoring=args.simulate is not None)
1086
1087    if args.show_schedule:
1088        show_schedule(schedule)
1089        parser.exit()
1090
1091    notifications.notify('SCHEDULER_STARTUP',
1092                         message='This is a test notification sent on schedule startup')
1093
1094    # Tell the notification module it is being run by the dispatcher
1095    # otherwise notification emails are disabled
1096    set_env('{ENV_PREFIX}DISPATCHER', '1')
1097
1098    run_forever(category,
1099                schedule,
1100                simulate=args.simulate,
1101                start=args.start,
1102                sleep=args.sleep_period,
1103                one_shot=args.one_shot)
1104
1105if __name__ == '__main__':
1106    main()