1#!/usr/bin/env python3
  2
  3"""Generic job chaining function."""
  4
  5import logging
  6from datetime import datetime
  7from datetime import timedelta
  8
  9from chart.backend.activity import Activity
 10from chart.backend.job import Job
 11from chart.backend.activity import ActivityTriggerType
 12
 13logger = logging.getLogger()
 14
 15# Duration of long term stats jobs
 16DAY = timedelta(days=1)
 17
 18# Handling hourly trigger type
 19HOUR = timedelta(hours=1)
 20
 21# Delay processing long term stats jobs for a few hours to allow all data to arrive
 22LATENCY = timedelta(hours=6)
 23
 24
 25def threeday_start_time(s: datetime):
 26    """Round a date down to the start of the 3-day period it's in.
 27
 28    For computing 3-day stats.
 29    """
 30    reftime = datetime(2000, 1, 1)
 31    result = reftime + timedelta(days=(s - reftime).days // 3) * 3
 32    return result
 33
 34
 35def find_derived_jobs(job):
 36    """Given a source job, return a list of all the derived jobs that it triggers."""
 37    # Look for table based triggering
 38    # Examine every table this job wrote to ...
 39    # logger.debug('Looking for jobs derived from {j}'.format(j=job))
 40    for table in job.tables:
 41        # logger.debug('It wrote table {t}'.format(t=table))
 42        # ... now check through every activity ...
 43        for activity in Activity.all():
 44            # logger.debug('Checking if activity {a} will fire'.format(a=activity))
 45            # ... and see if it has any trigger matching this table
 46            for trigger in activity.triggers:
 47                # logger.debug('Examining trigger {t}'.format(t=trigger))
 48                # triggers can use wildcards like <table>*</table>
 49                # if not fnmatch(table.table.name.lower(), trigger.table.name.lower()):
 50                    # logger.debug('Trigger name does not match')
 51                    # continue
 52                if trigger.table is not None and table.table != trigger.table:
 53                    continue
 54
 55                # logger.debug('trigger with stats ' + str(trigger.with_stats))
 56                # If it's a stats trigger but this table lacks stats, skip over it
 57                if trigger.with_stats and not table.table.has_stats:
 58                    # logger.debug('    skipping')
 59                    continue
 60
 61                # Similarly, if it's a stats trigger but the table has a storage
 62                # table which lacks stats, skip it
 63                if trigger.with_stats and table.table.storage_table is not None and\
 64                   not table.table.storage_table.has_stats:
 65                    continue
 66
 67                # Raise a derived job
 68                # result = Job(category=job.category,
 69                             # activity=activity,
 70                             # sid=table.sid or job.sid,
 71                             # table=table.table,
 72                             # sensing_start=table.sensing_start,
 73                             # sensing_stop=table.sensing_stop)
 74                # logger.debug('Made derived job {d}'.format(d=result))
 75
 76                # If the trigger specifies a job time split (daily, hourly, 20min) we
 77                # potentially make a series of jobs not just one
 78
 79                # If the trigger <type> is daily, expand the derived job to cover full day
 80                if trigger.trigger_type is ActivityTriggerType.DAILY:
 81                    # result.sensing_start = result.sensing_start.date()
 82                    series_start = table.sensing_start.replace(
 83                        hour=0, minute=0, second=0, microsecond=0)
 84                    # result.sensing_stop = result.sensing_start + DAY
 85                    series_block = DAY
 86                    # logger.debug('Expanded timerange to {d}'.format(d=result))
 87
 88                # If the trigger <type> is hourly, expand the derived job to cover full hour
 89                elif trigger.trigger_type is ActivityTriggerType.HOURLY:
 90                    series_start = table.sensing_start.replace(
 91                        minute=0, second=0, microsecond=0)
 92                    # result.sensing_stop = result.sensing_stop.replace(
 93                        # minute=0, second=0, microsecond=0) + HOUR
 94                    series_block = HOUR
 95
 96                elif trigger.trigger_type is ActivityTriggerType.TWENTYMINUTE:
 97                    series_start = table.sensing_start.replace(
 98                        minute=table.sensing_start.minute - table.sensing_start.minute % 20,
 99                        second=0,
100                        microsecond=0)
101                    series_block = timedelta(minutes=20)
102
103                # If the trigger <type> is 3-day, expand the derived job to a 3-day range
104                elif trigger.trigger_type is ActivityTriggerType.THREEDAILY:
105                    series_start = threeday_start_time(table.sensing_start)
106                    # result.sensing_stop = result.sensing_start + timedelta(days=3)
107                    series_block = timedelta(days=3)
108
109                else:
110                    # otherwise just make a single job
111                    series_start = table.sensing_start
112                    series_block = None
113
114                while series_start < table.sensing_stop:
115                    result = Job(
116                        category=job.category,
117                        activity=activity,
118                        sid=table.sid or job.sid,
119                        table=table.table,
120                        sensing_start=series_start,
121                        sensing_stop=(series_start + series_block) if series_block is not None
122                        else table.sensing_stop)
123                    # Add latency if present and we're doing NRT processing
124                    if trigger.latency and\
125                       result.sensing_stop > (datetime.utcnow()-timedelta(days=3)):
126                        result.earliest_execution_time = datetime.utcnow() + trigger.latency
127                        # logger.debug('Set earliest execution time to {e}'.format(
128                        # e=result.earliest_execution_time))
129
130                    yield result
131
132                    if series_block is None:
133                        break
134
135                    series_start += series_block
136
137                # yield result
138
139                # if a daily job crosses the day boundary we need to create extra jobs for the
140                # next day
141                # if trigger.trigger_type is ActivityTriggerType.DAILY and\
142                   # table.sensing_start.day != table.sensing_stop.day:
143                    # result.sensing_start += DAY
144                    # result.sensing_stop += DAY
145                    # if result.earliest_execution_time is not None:
146                        # result.earliest_execution_time += DAY
147
148                    # logger.debug('Yielding extra job due to day barrier crossing {d}'.format(
149                        # d=result))
150                    # yield result
151
152    # More things we could handle:
153    # - Event based triggering
154    # - Allow job result.xml files to include a <derived-jobs> section explicitly triggering
155    #   more jobs
156    # - Parallel running like:
157
158    #         - Job B -
159    #        /         \
160    # Job A --- Job C --- Job E
161    #        \         /
162    #         - Job D -
163
164    # If this ever comes up...
165    # It could be done by yielding jobs B, C, D, E here when A completes
166    # but crucially writing the job IDs (obtained somehow) of B, C and D
167    # into the proposed PRECURSOR list-of-int column of the JOBS table
168    # for E so it won't be run until ready