1#!/usr/bin/env python3
2
3"""Generic job chaining function."""
4
5import logging
6from datetime import datetime
7from datetime import timedelta
8
9from chart.backend.activity import Activity
10from chart.backend.job import Job
11from chart.backend.activity import ActivityTriggerType
12
13logger = logging.getLogger()
14
15# Duration of long term stats jobs
16DAY = timedelta(days=1)
17
18# Handling hourly trigger type
19HOUR = timedelta(hours=1)
20
21# Delay processing long term stats jobs for a few hours to allow all data to arrive
22LATENCY = timedelta(hours=6)
23
24
25def threeday_start_time(s: datetime):
26 """Round a date down to the start of the 3-day period it's in.
27
28 For computing 3-day stats.
29 """
30 reftime = datetime(2000, 1, 1)
31 result = reftime + timedelta(days=(s - reftime).days // 3) * 3
32 return result
33
34
35def find_derived_jobs(job):
36 """Given a source job, return a list of all the derived jobs that it triggers."""
37 # Look for table based triggering
38 # Examine every table this job wrote to ...
39 # logger.debug('Looking for jobs derived from {j}'.format(j=job))
40 for table in job.tables:
41 # logger.debug('It wrote table {t}'.format(t=table))
42 # ... now check through every activity ...
43 for activity in Activity.all():
44 # logger.debug('Checking if activity {a} will fire'.format(a=activity))
45 # ... and see if it has any trigger matching this table
46 for trigger in activity.triggers:
47 # logger.debug('Examining trigger {t}'.format(t=trigger))
48 # triggers can use wildcards like <table>*</table>
49 # if not fnmatch(table.table.name.lower(), trigger.table.name.lower()):
50 # logger.debug('Trigger name does not match')
51 # continue
52 if trigger.table is not None and table.table != trigger.table:
53 continue
54
55 # logger.debug('trigger with stats ' + str(trigger.with_stats))
56 # If it's a stats trigger but this table lacks stats, skip over it
57 if trigger.with_stats and not table.table.has_stats:
58 # logger.debug(' skipping')
59 continue
60
61 # Similarly, if it's a stats trigger but the table has a storage
62 # table which lacks stats, skip it
63 if trigger.with_stats and table.table.storage_table is not None and\
64 not table.table.storage_table.has_stats:
65 continue
66
67 # Raise a derived job
68 # result = Job(category=job.category,
69 # activity=activity,
70 # sid=table.sid or job.sid,
71 # table=table.table,
72 # sensing_start=table.sensing_start,
73 # sensing_stop=table.sensing_stop)
74 # logger.debug('Made derived job {d}'.format(d=result))
75
76 # If the trigger specifies a job time split (daily, hourly, 20min) we
77 # potentially make a series of jobs not just one
78
79 # If the trigger <type> is daily, expand the derived job to cover full day
80 if trigger.trigger_type is ActivityTriggerType.DAILY:
81 # result.sensing_start = result.sensing_start.date()
82 series_start = table.sensing_start.replace(
83 hour=0, minute=0, second=0, microsecond=0)
84 # result.sensing_stop = result.sensing_start + DAY
85 series_block = DAY
86 # logger.debug('Expanded timerange to {d}'.format(d=result))
87
88 # If the trigger <type> is hourly, expand the derived job to cover full hour
89 elif trigger.trigger_type is ActivityTriggerType.HOURLY:
90 series_start = table.sensing_start.replace(
91 minute=0, second=0, microsecond=0)
92 # result.sensing_stop = result.sensing_stop.replace(
93 # minute=0, second=0, microsecond=0) + HOUR
94 series_block = HOUR
95
96 elif trigger.trigger_type is ActivityTriggerType.TWENTYMINUTE:
97 series_start = table.sensing_start.replace(
98 minute=table.sensing_start.minute - table.sensing_start.minute % 20,
99 second=0,
100 microsecond=0)
101 series_block = timedelta(minutes=20)
102
103 # If the trigger <type> is 3-day, expand the derived job to a 3-day range
104 elif trigger.trigger_type is ActivityTriggerType.THREEDAILY:
105 series_start = threeday_start_time(table.sensing_start)
106 # result.sensing_stop = result.sensing_start + timedelta(days=3)
107 series_block = timedelta(days=3)
108
109 else:
110 # otherwise just make a single job
111 series_start = table.sensing_start
112 series_block = None
113
114 while series_start < table.sensing_stop:
115 result = Job(
116 category=job.category,
117 activity=activity,
118 sid=table.sid or job.sid,
119 table=table.table,
120 sensing_start=series_start,
121 sensing_stop=(series_start + series_block) if series_block is not None
122 else table.sensing_stop)
123 # Add latency if present and we're doing NRT processing
124 if trigger.latency and\
125 result.sensing_stop > (datetime.utcnow()-timedelta(days=3)):
126 result.earliest_execution_time = datetime.utcnow() + trigger.latency
127 # logger.debug('Set earliest execution time to {e}'.format(
128 # e=result.earliest_execution_time))
129
130 yield result
131
132 if series_block is None:
133 break
134
135 series_start += series_block
136
137 # yield result
138
139 # if a daily job crosses the day boundary we need to create extra jobs for the
140 # next day
141 # if trigger.trigger_type is ActivityTriggerType.DAILY and\
142 # table.sensing_start.day != table.sensing_stop.day:
143 # result.sensing_start += DAY
144 # result.sensing_stop += DAY
145 # if result.earliest_execution_time is not None:
146 # result.earliest_execution_time += DAY
147
148 # logger.debug('Yielding extra job due to day barrier crossing {d}'.format(
149 # d=result))
150 # yield result
151
152 # More things we could handle:
153 # - Event based triggering
154 # - Allow job result.xml files to include a <derived-jobs> section explicitly triggering
155 # more jobs
156 # - Parallel running like:
157
158 # - Job B -
159 # / \
160 # Job A --- Job C --- Job E
161 # \ /
162 # - Job D -
163
164 # If this ever comes up...
165 # It could be done by yielding jobs B, C, D, E here when A completes
166 # but crucially writing the job IDs (obtained somehow) of B, C and D
167 # into the proposed PRECURSOR list-of-int column of the JOBS table
168 # for E so it won't be run until ready