1#!/usr/bin/env python3
2
3"""Allow the `worker` to construct job chains by telling it which new jobs are trigger by existing
4completed jobs.
5
6This is a swappable module and different projects can use their own implementation.
7
8In future we may have a fully XML-controlled system but for now the job chaining has to
9be hard-coded here.
10
11The rules we have are:
12
13- After TM_INGESTER completes create a series of 20-min stats jobs and 1-day stats jos
14- After TM_INGESTER completes create a 1-day job, set to execute at 6am,
15- After TM_INGESTER completes create a short-term stats job for the time range written to in TM
16 and a long-term starts job for the full day set to execute at 6am
17- After GEOEVENTS_INGESTER completes, run GEOEVENTS_EVENTS over the time period written to / specified
18
19This particular module is tuned for the Chart S3 project and is based around ingestion or orbital jobs
20triggering orbital jobs.
21
22By changing `settings.WORKER_JOB_CHAIN` to point somewhere else a project may use it's own
23mechanism, or no chains by setting it to None.
24"""
25
26import logging
27from datetime import timedelta
28
29from chart.backend.job import Job
30
31logger = logging.getLogger()
32
33# Duration of long term stats jobs
34DAY = timedelta(days=1)
35
36# Delay processing long term stats jobs for a few hours to allow all data to arrive
37LATENCY = timedelta(hours=6)
38
39
40def find_derived_jobs(job):
41 """Given a source job, return a list of all the derived jobs that it triggers."""
42
43 # first pass - we build a list of affected tables.
44 # list consists of dictionaries of:
45 # type :: orbital or ???
46 # tablename ::
47 # orbit :: only if its orbital
48 # start_time
49 # stop_time
50
51 logger.info("Looking for derived jobs from {j}".format(j=job))
52
53 # We can either trigger GEOEVENTS_EVENTS by detecting that the GEOEVENTS_INGESTER
54 # activity ran, or more generally by checking for any jobs that write to the
55 # GEO_EVENTS table
56 # A special case for making EVENTS from the geo_events table
57 # if job.activity.name == 'GEOEVENTS_INGESTER':
58 # yield Job(category=job.category,
59 # activity='GEOEVENTS_EVENTS',
60 # sid=job.sid,
61 # sensing_start=job.sensing_start,
62 # sensing_stop=job.sensing_stop)
63
64 # Scan for jobs which recorded writing to tables. This has to be done
65 # explicitly by a well-behaved algorithm when it marks jobs as complete
66 for t in job.tables:
67 logger.info(
68 "Detected a write to {t} from {strt} to {stop} looking for reactions".format(
69 t=t["table"].name, strt=t["sensing_start"], stop=t["sensing_stop"]
70 )
71 )
72 if t["table"].name == "GEO_EVENTS":
73 yield Job(
74 category=job.category,
75 activity="GEOEVENTS_EVENTS",
76 sid=t["sid"],
77 sensing_start=t["sensing_start"],
78 sensing_stop=t["sensing_stop"],
79 )
80
81
82 #
83 # if t["table"].name == "TC":
84 # yield Job(
85 # category=job.category,
86 # activity="TC_EVENTS",
87 # sid=t["sid"],
88 # sensing_start=t["sensing_start"],
89 # sensing_stop=t["sensing_stop"],
90 # )
91
92 if t["table"].name == "TM":
93 yield Job(
94 category=job.category,
95 activity="ONBOARD_EVENTS",
96 sid=t["sid"],
97 sensing_start=t["sensing_start"],
98 sensing_stop=t["sensing_stop"],
99 )
100
101 if t["table"].name == "APEX_LOG":
102 yield Job(
103 category=job.category,
104 activity="APEX_EVENTS",
105 sid=t["sid"],
106 sensing_start=t["sensing_start"],
107 sensing_stop=t["sensing_stop"],
108 )
109
110 if t["table"].name == "MANOEUVRE_PREDICTION":
111 yield Job(
112 category=job.category,
113 activity="MANO_PRED_EVENTS",
114 sid=t["sid"],
115 sensing_start=t["sensing_start"],
116 sensing_stop=t["sensing_stop"],
117 )
118
119 if t["table"].name == "GEO_EVENTS":
120 yield Job(
121 category=job.category,
122 activity="GEOEVENTS_EVENTS",
123 sid=t["sid"],
124 sensing_start=t["sensing_start"],
125 sensing_stop=t["sensing_stop"],
126 )
127
128 if t["table"].name == "SCHEDULE_ACTIVITIES":
129 yield Job(
130 category=job.category,
131 activity="SCHEDULE_EVENTS",
132 sid=t["sid"],
133 sensing_start=t["sensing_start"],
134 sensing_stop=t["sensing_stop"],
135 )
136
137 # Try and implement generic stats handling
138 if t["table"].has_stats:
139 # First we make short term stats. These are for statistics which are generally the
140 # same length or shorted than our input files, therefore it makes sense to trigger
141 # them immediately after ingestion
142 yield Job(
143 category=job.category,
144 activity="SHORT_TERM_STATS",
145 sid=t["sid"],
146 tablename=t["table"].name,
147 sensing_start=t["sensing_start"],
148 sensing_stop=t["sensing_stop"],
149 )
150
151 # How handle long term stats such as daily stats. This cover multiple input products
152 # and it makes sense to delay them until many input products have been ingested,
153 # to avoid recomputing big stats jobs too often.
154 # Here, we assume that daily generation is reasonable, and raise daily stats
155 # jobs delayed until 6am the next day.
156 # Note the worker will automatically squash duplicate jobs to it's safe
157 # to create multiple long term stats jobs (from multiple input files)
158 # only one will be executed
159 start = t["sensing_start"].replace(
160 hour=0, minute=0, second=0, microsecond=0
161 )
162 yield Job(
163 category=job.category,
164 activity="LONG_TERM_STATS",
165 sid=t["sid"],
166 tablename=t["table"].name,
167 sensing_start=start,
168 sensing_stop=start + DAY,
169 earliest_execution_time=start + DAY + LATENCY,
170 )