1#!/usr/bin/env python3
2
3"""Allow the `worker` to construct job chains by telling it which new jobs are trigger by existing
4completed jobs.
5
6This is a swappable module and different projects can use their own implementation.
7
8This particular module is tuned for the EPS project and is based around ingestion or orbital jobs
9triggering orbital jobs.
10
11By changing `settings.WORKER_JOB_CHAIN_FN` to point somewhere else a project may use it's own
12mechanism, or no chains by setting it to None.
13"""
14
15import os
16import re
17import logging
18from datetime import datetime, timedelta
19
20from chart.products.fdf.orbit import get_orbit_times
21from chart.products.fdf.orbit import NoSuchOrbit
22from chart.common.decorators import cached
23from chart.common.decorators import lra_cache
24from chart.products.eps.gpfs import EPSFilename
25from chart.common.decorators import memoized
26from chart.products.sf00 import sf
27from chart.backend.job import Job
28from chart.backend.activity import Activity
29from chart.db.model.table import TableInfo
30from chart.db.model.table import StatsStorage
31from chart.products.fdf.orbit import find_orbit
32from chart.db.connection import db_connect
33from chart.db.model.table import TableType
34from chart.project import SID
35
36# delay between the time when the first PDU of an orbit arrives
37# and the per-orbit jobs can be executed
38ORBITAL_DELAY = timedelta(minutes=150)
39
40# Used by hourly processing derived jobs computation
41HOUR = timedelta(hours=1)
42
43
44class SBandFilename:
45 """Decode the name of an MCS sband file to extract sid, start and stop times.
46
47 >> sb = SBandFilename('2010080185059_2010080190258.G1MCSS01.m02s_nom.gz')
48 >> print(sb.sid, sb.sensing_start, sb.sensing_stop)
49 M02 2010-03-22 18:50:59 2010-03-22 19:02:58
50
51 >> sb = SBandFilename('2010080185059_2010080190258.m02s_nom.gz')
52 >> print(sb.sid, sb.sensing_start, sb.sensing_stop)
53 M02 2010-03-22 18:50:59 2010-03-22 19:02:58
54
55 (note, the doctests are disabled because this function uses SID so it not available
56 standalone with no project)
57 """
58
59 matcher = re.compile(
60 r'(?P<start_year>[0-9]{4})'
61 r'(?P<start_doy>[0-9]{3})'
62 r'(?P<start_hour>[0-9]{2})'
63 r'(?P<start_minute>[0-9]{2})'
64 r'(?P<start_second>[0-9]{2})'
65 r'_'
66 r'(?P<stop_year>[0-9]{4})'
67 r'(?P<stop_doy>[0-9]{3})'
68 r'(?P<stop_hour>[0-9]{2})'
69 r'(?P<stop_minute>[0-9]{2})'
70 r'(?P<stop_second>[0-9]{2})'
71 r'(\.[A-Za-z0-9]{8})?'
72 r'\.'
73 r'(?P<scid>[a-zA-Z0-9]{3})' # scidfree
74 r'.*\.gz')
75
76 def __init__(self, filename):
77 # 2007261075013_2007261080212.m02s_nom.gz
78 match = SBandFilename.matcher.match(os.path.basename(filename))
79 if match is None:
80 raise ValueError('Bad sband filename ' + filename)
81
82 self.sid = SID(match.group('scid').upper()) # scidfree
83 self.sensing_start = datetime(int(match.group('start_year')),
84 1,
85 1,
86 int(match.group('start_hour')),
87 int(match.group('start_minute')),
88 int(match.group('start_second'))) +\
89 timedelta(days=int(match.group('start_doy')))
90 self.sensing_stop = datetime(int(match.group('stop_year')),
91 1,
92 1,
93 int(match.group('stop_hour')),
94 int(match.group('stop_minute')),
95 int(match.group('stop_second'))) +\
96 timedelta(days=int(match.group('stop_doy')))
97
98
99@memoized
100def affected_tables(assembly_id):
101 """Return a list of timeseries tables that might be
102 written to when snacks from `assembly_id` are ingested.
103 Returns a list of TableInfo objects.
104 """
105
106 # not sure how well generator functions work with memoized decorator
107
108 res = []
109 for table_info in TableInfo.all_ts():
110 if table_info.source_assembly_id == assembly_id:
111 # yield table_info
112 res.append(table_info)
113
114 return res
115
116
117@memoized
118def assemblies_in_dtype(dtype):
119 """For a given dtype, return the SF00 assembly ID(s) that can
120 be produced from PDUs of that type.
121 """
122
123 # not sure how well generator functions work with memoized decorator
124
125 res = []
126 for assy_id, assy_info in sf.assemblies.items():
127 if 'eps_type' in assy_info and dtype in assy_info['eps_type']:
128 # yield assy_id
129 res.append(assy_id)
130
131 return res
132
133
134def days_in_range(start_time, stop_time):
135 """Return a list of date objects, corresponding to each day in the input time range."""
136
137 while start_time < stop_time:
138 yield start_time.date()
139 start_time += timedelta(days=1)
140
141
142@cached # don't return generators from a cached function
143def orbits_in_range(sid, start_time, stop_time, limit):
144 """Return a list of orbit dictionaries including one for each
145 orbit in the input time range.
146
147 Each dictionary consists of:
148
149 * `number` :: the orbit number
150 * `start` :: orbit start time (fractional seconds removed)
151 * `stop` :: orbit stop time (fractional seconds removed)
152
153 Note, this might be more efficient moved into orbits.py and using
154 a single query against FDF_EVENTS.
155
156 """
157
158 # to try and avoid problems where orbital stats don't get recomputed when they should,
159 # we introduce BORDER which will cause, after data has been written close to an orbit boundary,
160 # neighbouring orbits to be computed
161 BORDER = timedelta(seconds=1)
162
163 # TDB: cache the last result don't use memoized as it will grow
164 first_orbit = find_orbit(sid, start_time - BORDER)
165 # logging.info('find_orbit ' + sid + ' ' + str(start_time) + ' returns ' + str(first_orbit))
166 last_orbit = find_orbit(sid, stop_time + BORDER)
167 # print 'orbits in rane ', sid, start_time, stop_time, limit, first_orbit, last_orbit
168 # logging.debug('orbits_in_range start {start} orbit {first_orbit} stop {stop}
169 # orbit {last_orbit}'.format(start=start_time,first_orbit=first_orbit,stop=stop_time,
170 # last_orbit=last_orbit))
171 if first_orbit is None or last_orbit is None:
172 logging.debug('Cannot find orbits for times {sid} {start} {stop}'.format(
173 sid=sid, start=start_time, stop=stop_time))
174 return []
175
176 if limit is not None and limit != 0 and (last_orbit - first_orbit) > limit:
177 logging.critical('Too many orbits for {sid} from {start} to {stop} '
178 '({first} - {last})'.format(
179 sid=sid, start=start_time, stop=stop_time, first=first_orbit, last=last_orbit))
180 return []
181
182 res = []
183 for o in range(first_orbit, last_orbit + 1):
184 try:
185 times = get_orbit_times(sid, o)
186 except NoSuchOrbit:
187 # raise Exception('cannot locate orbit ' + str(o))
188 logging.error('Could not find orbit times for {sid} for orbit {orbit}'.format(
189 sid=sid, orbit=o))
190 continue
191
192 # yield {'number': o, 'start': times[0], 'stop': times[1]}
193 res.append({'number': o, 'start': times[0], 'stop': times[1]})
194
195 # logging.info('orbits in range res ' + str(res))
196
197 return res
198
199
200@lra_cache(maxsize=20)
201def find_daynight_regions(sid, start_time, stop_time):
202 """Return a list of days and nights intersecting given time window."""
203
204 regions = [] # {'type':'day'/'night', 'start':, 'stop':}
205
206 # look for events within 2 hours of our time window. Superfluous regions
207 # completely outside the required time range are removed at the end of the function/
208 WINDOW = timedelta(hours=2)
209
210 # first find times when we have either a penumbra_str stop time + 60s
211 # or a penumbra_end start time - 60s within our time range
212 db_conn = db_connect('FDF_EVENTS')
213 for name, region_time, orbit in db_conn.query(
214 "SELECT name, time, orbit FROM ("
215 " SELECT"
216 " name,"
217 " CASE WHEN name='PENUMBRA_STR' THEN start_time+INTERVAL '60' SECOND ELSE"
218 " start_time-INTERVAL '60' SECOND END AS time,"
219 " orbit"
220 " FROM fdf_events"
221 " WHERE scid=:scid" # scidfree
222 " AND target='EARTH'"
223 " AND ((name='PENUMBRA_STR' AND"
224 " (start_time+INTERVAL '60' SECOND) BETWEEN :start_time AND :stop_time)"
225 " OR (name='PENUMBRA_END' AND"
226 " (start_time-INTERVAL '60' SECOND) BETWEEN :start_time AND :stop_time))) q "
227 "ORDER BY time",
228 scid=sid.scid, # scidfree
229 start_time=start_time - WINDOW,
230 stop_time=stop_time + WINDOW):
231 # print name, region_time
232
233 if name == 'PENUMBRA_STR':
234 night_start = region_time
235 if len(regions) == 0:
236 regions.append({'type': 'day', 'stop': night_start, 'orbit': orbit})
237 else:
238 regions[-1]['stop'] = night_start
239
240 regions.append({'type': 'night', 'start': night_start, 'orbit': orbit})
241
242 if name == 'PENUMBRA_END':
243 day_start = region_time
244 if len(regions) == 0:
245 regions.append({'type': 'night', 'stop': day_start, 'orbit': orbit})
246 else:
247 regions[-1]['stop'] = day_start
248
249 regions.append({'type': 'day', 'start': day_start, 'orbit': orbit})
250
251 return [r for r in regions if ('start' in r and
252 'stop' in r and
253 r['stop'] >= start_time and
254 r['start'] <= stop_time)]
255
256
257def derived_jobs_table_timerange(category,
258 table_info,
259 sid,
260 orbit=None,
261 start_time=None,
262 stop_time=None,
263 dups=None):
264 """Low level function to find derived jobs arising from changes to a single
265 table for a specific time range (either an orbit or some other duration).
266 This will include orbital stats, other orbital algorithms and daily algorithms.
267 This function only works on all-points tables and doesn't compute derived from
268 writing to stats tables.
269 TDB: This function incorrectly assumes that the timestamp of the last
270 MDR/snack in the PDU/SF00 file is the stop time of the file. The module
271 should be modified to compute a true stop time or at least take a guess.
272 To do it perfectly would mean modifying the ingester algorithm so it returns
273 true stop times for each job performed.
274 """
275
276 # logging.debug('derived jobs table timerange ' + sid + ' ' + str(orbit) + ' ' +
277 # str(start_time) + ' ' + str(stop_time))
278
279 def periods():
280 """Yield all orbits/hours for our timerange (if only `start_time` and `stop_time`
281 were given), or just 1 orbit if `orbit` was given.
282 """
283
284 if orbit is not None:
285 yield {'number': orbit,
286 'start': start_time,
287 'stop': stop_time}
288
289 elif sid.satellite.orbit_duration is not None:
290 # sometimes something does wrong and we could end up creating thousands of jobs,
291 # hence the limit.
292 # SSR_POWER can run for 3 days so allow for sufficient orbits
293 for o in orbits_in_range(sid, start_time, stop_time, 60):
294 yield o
295
296 else:
297 # hourly processing.
298 # We don't actually use the <processing-frequency> from satellites.xml
299 # but this is where it would be used if we did
300 acc = start_time.replace(minute=0, second=0, microsecond=0)
301 while acc <= stop_time:
302 yield {'number': None,
303 'start': acc,
304 'stop': acc + HOUR}
305
306 acc += HOUR
307
308 # if sid=='N19':
309 # logging.debug('N19 orbits for '+str(start_time)+' to '+str(stop_time))
310 # for o in orbits():
311 # logging.debug(' '+str(o))
312
313 # if the table has any associated stats tables, create jobs to (re)compute
314 # them
315 if table_info.stats_storage is not StatsStorage.NONE:
316 # add a new job entry per orbit
317 for o in periods():
318 if o['number'] is not None:
319 yield Job(category=category,
320 activity='ORBITAL_STATS',
321 sid=sid,
322 tablename=table_info.name,
323 orbit=o['number'],
324 sensing_start=o['start'],
325 sensing_stop=o['stop'])
326
327 else:
328 yield Job(category=category,
329 activity='HOURLY_STATS',
330 sid=sid,
331 tablename=table_info.name,
332 sensing_start=o['start'],
333 sensing_stop=o['stop'])
334
335 # day/night stats table jobs
336 if table_info.has_daynight_stats:
337 for r in find_daynight_regions(sid, start_time, stop_time):
338 yield Job(category=category,
339 activity='DAY_STATS' if r['type'] == 'day' else 'NIGHT_STATS',
340 sid=sid,
341 orbit=r['orbit'],
342 tablename=table_info.name,
343 sensing_start=r['start'],
344 sensing_stop=r['stop'])
345
346 # look for any db views with orbital stats tables and schedule them
347 # for (re)computation
348 for t in TableInfo.all_ts():
349 if t.table_type is TableType.VIEW and table_info in t.source_tables:
350 for o in periods():
351 yield Job(category=category,
352 activity='ORBITAL_STATS',
353 sid=sid,
354 tablename=t.name,
355 orbit=o['number'],
356 sensing_start=o['start'],
357 sensing_stop=o['stop'])
358
359 # next build a list of activities with orbital or daily triggers
360 # watching this table
361 for a in Activity.all():
362 if a.name in dups:
363 continue # each activity (except ORBITAL_STATS, handled via special case)
364 # can only trigger once per source table write
365
366 if not a.enabled:
367 continue
368
369 if not a.match_sid(sid):
370 # this activity is not valid for this sid
371 continue
372
373 # make sure we catch any permutation of input orbital/daily job triggering
374 # derived orbital/daily job
375 for trigger in a.triggers:
376 # see if any of the input triggers to this activity match the table which has just been
377 # modified.
378 if trigger['table'] == table_info:
379 if trigger['type'] == 'orbital':
380 for o in periods():
381 yield Job(category=category,
382 activity=a.name,
383 sid=sid,
384 orbit=o['number'],
385 sensing_start=o['start'],
386 sensing_stop=o['stop'])
387
388 elif trigger['type'] == 'daily':
389 for d in days_in_range(start_time,
390 stop_time):
391 yield Job(category=category,
392 activity=a.name,
393 sid=sid,
394 sensing_start=d,
395 sensing_stop=d + timedelta(days=1))
396
397 dups.add(a.name)
398
399 break # each derived activity can only be added once
400
401
402def find_derived_jobs(job):
403 """Given a source job, return a list of all the derived jobs that it triggers."""
404
405 # first pass - we build a list of affected tables.
406 # list consists of dictionaries of:
407 # type :: orbital or ???
408 # tablename ::
409 # orbit :: only if its orbital
410 # start_time
411 # stop_time
412
413 dups = set()
414
415 if job.activity.name == 'PDU_INGESTER':
416 # PDU_INGEST is a special case. It requires some logic that cannot be put into the
417 # XML Activity file because it converts filenames into affected tables
418 # and so to per-orbit and per-day jobs
419
420 # This could be removed since the result.xml format now allows a list of affected
421 # tables to be stored.
422 # So if all ingestion code sets this properly, this rather ugly special case would not be
423 # needed.
424
425 eps_filename = EPSFilename(job.filename)
426
427 # if 'N19' in job['filename']:
428 # logging.debug('N19 affected tables')
429 # for assy_id in assemblies_in_dtype(eps_filename.dtype):
430 # logging.debug(' '+str(assy_id))
431 # for t in affected_tables(assy_id):
432 # logging.debug(' '+str(t.name))
433
434 # logging.debug('N19 done')
435
436 # find the list of tablenames that could be modified by this file
437 for assy_id in assemblies_in_dtype(eps_filename.dtype):
438 # logging.debug('affected assy id '+str(assy_id))
439 # Now find all raw tables that could have data written to them from this
440 # SF00 assembly ID
441 for t in affected_tables(assy_id):
442 # logging.debug('affected table '+str(t))
443 # Generate all potential jobs created be a write to table 't' for
444 # given timerange
445 for j in derived_jobs_table_timerange(
446 job.category,
447 t,
448 eps_filename.sid,
449 None,
450 eps_filename.sensing_start,
451 eps_filename.sensing_stop,
452 dups):
453
454 # logging.debug('Fresh job cat '+j['category'])
455 if j.category == 'SCHEDULER':
456 # For SCHEDULER (NRT) jobs only we may defer processing, to make sure
457 # that per-orbit or per-day jobs are not executable until we have all
458 # data for that period
459 # logging.debug('It is scheduler, activity is '+j['activity']+\
460 # ' tablename is '+str(j.get('tablename'))+' orbit is '+\
461 # str(j.get('orbit','SETTONONE'))+' whole job '+str(j))
462 if j.orbit is not None:
463 # For orbital jobs, delay processing by ORBITAL_DELAY
464 earliest_execution_time = min(
465 datetime.utcnow() + ORBITAL_DELAY,
466 eps_filename.sensing_start + timedelta(days=1))
467 # logging.debug('NRT orbital job exec '+str(earliest_execution_time))
468 j.earliest_execution_time = earliest_execution_time
469
470 elif j.sensing_start is not None and j.sensing_stop is not None:
471 # For daily jobs, defer processing until midnight
472 # this should actually test if the trigger is a daily trigger
473 earliest_execution_time = min(
474 datetime.utcnow() + timedelta(days=1),
475 eps_filename.sensing_start + timedelta(days=1))
476 # logging.debug('non orbital, time based job start '+
477 # str(j['sensing_start'])+\
478 # ' stop '+str(j['sensing_stop']))
479 if (j.sensing_stop - j.sensing_start) == timedelta(days=1):
480 # logging.debug('Daily job starting '+str(earliest_execution_time))
481 j.earliest_execution_time = earliest_execution_time
482
483 yield j
484
485 # elif job['activity'] == 'SF00_INGESTER':
486 # sf00_file = SFReader(os.path.join(job['dirname'], job['filename']))
487 # # this happens if we have a archive product with no snacks
488 # if sf00_file.sensing_start is None and sf00_file.sensing_stop is None:
489 # return
490
491 # # find the list of tablenames that could be modified by this file
492 # for t in affected_tables(sf00_file.assembly_id):
493 # for j in derived_jobs_table_timerange(
494 # job['category'],
495 # t,
496 # sf00_file.sid,
497 # None,
498 # sf00_file.sensing_start,
499 # sf00_file.sensing_stop,
500 # dups):
501 # if job['category'] == 'SCHEDULER':
502 # j['earliest_execution_time'] = datetime.utcnow() + ORBITAL_DELAY
503
504 # yield j
505
506 elif job.activity.name == 'SBAND_INGESTER':
507 sband_filename = SBandFilename(job.filename)
508 # find the list of tablenames that could be modified by this file
509 for t in affected_tables(1): # 1 = assembly ID for HKTM
510 for j in derived_jobs_table_timerange(
511 job.category,
512 t,
513 sband_filename.sid,
514 None,
515 sband_filename.sensing_start,
516 sband_filename.sensing_stop,
517 dups):
518
519 yield j
520
521 elif job.activity.name == 'ORBITAL_STATS':
522 # disabled for now. orbital stats jobs cannot trigger further jobs.
523 pass
524
525 # also a special case. Stats jobs use the `tablename` field so we know which
526 # tables they modify. We just look for explicit triggers from those tables.
527 # Note the MHS report uses stats tables and triggers on MHS, MHS_MINS, etc.
528 # so this is probably needed.
529 # Although technically it can be skipped so long as the worker favours
530 # ingestion, then per-orbit, then per-day jobs
531 # Except then MHS_EVENT uses stats tables for performance.
532 # This probably doesn't work very well unless it's an orbital alg being triggered.
533 # Also duplicates are not taken care of.
534
535 # As with PDU_INGESTER this ugly special case could be avoided by patching
536 # *_stats activity files to include a list of affected tables in their result.xml files.
537
538 # for s in TableInfo(job['tablename']).stats_tables():
539 # for each stats table associated with source `job['tablename']` ...
540 # for a in Activity.all():
541 # look through all activities ...
542 # for t in a.triggers:
543 # see if the activity is triggered by this associated table...
544 # if t['table'].name == s['tablename']:
545 # if so create a job
546 # yield {'category': job['category'],
547 # 'activity': a.name,
548 # 'sid': job['sid'],
549 # 'orbit': job['orbit'],
550 # 'sensing_start': job['sensing_start'],
551 # 'sensing_stop': job['sensing_stop']}
552
553 # create a LIMIT job
554 # yield {'category': job['category'],
555 # 'activity': 'LIMITS',
556 # 'tablename': job['tablename'],
557 # 'sid': job['sid'],
558 # 'orbit': job['orbit'],
559 # 'sensing_start': job['sensing_start'],
560 # 'sensing_stop': job['sensing_stop'],
561
562 else:
563 # We have 2 systems for algorithms to tell the worker which tables they
564 # have modified:
565 # - output-tables elements in the Activity file
566 # - table elements in the result file
567
568 # for all other activities we look in the activity file, if its an orbital/daily
569 # activity which modifies tables then it must have one or more <modifies> tags,
570 # in which case we look for other activities triggered by that table
571 # (note an orbital algorithm can trigger a daily one and vice-versa)
572 # output_tables = set(activity.output_tables)
573
574 # we cannot distinguish between an activity that has not been upgraded
575 # to generate result.xml <table> elements, and old jobs that don't bother
576 if job.tables is None or len(job.tables) == 0:
577 logging.debug('Creating automatic derived jobs')
578 # this is the old system that guesses which tables were written based on
579 # activity files
580 for output_table in job.activity.output_tables:
581 for j in derived_jobs_table_timerange(job.category,
582 output_table,
583 job.sid,
584 job.orbit,
585 job.sensing_start,
586 job.sensing_stop,
587 dups):
588 yield j
589
590 else:
591 logging.debug('Creating manual derived jobs')
592 # this is the new system using the <tables> section of the results file
593 # logging.debug('we have tables')
594 for t in job.tables:
595 # logging.debug('Creating derived jobs from {t}'.format(t=name_of_thing(t)))
596 # we adjust the times slightly otherwise full orbit job can trigger processing
597 # of neighbouring orbits.
598 ORBIT_DEOVERLAP = timedelta(minutes=1)
599 for j in derived_jobs_table_timerange(
600 category=job.category,
601 table_info=t['table'],
602 sid=job.sid,
603 # job.orbit,
604 # None,
605 start_time=t['sensing_start'] + ORBIT_DEOVERLAP,
606 stop_time=t['sensing_stop'] - ORBIT_DEOVERLAP,
607 dups=dups):
608 # logging.debug('Derived job ' + str(j))
609 yield j