1#!/usr/bin/env python3
  2
  3"""Allow the `worker` to construct job chains by telling it which new jobs are trigger by existing
  4completed jobs.
  5
  6This is a swappable module and different projects can use their own implementation.
  7
  8This particular module is tuned for the EPS project and is based around ingestion or orbital jobs
  9triggering orbital jobs.
 10
 11By changing `settings.WORKER_JOB_CHAIN_FN` to point somewhere else a project may use it's own
 12mechanism, or no chains by setting it to None.
 13"""
 14
 15import os
 16import re
 17import logging
 18from datetime import datetime, timedelta
 19
 20from chart.products.fdf.orbit import get_orbit_times
 21from chart.products.fdf.orbit import NoSuchOrbit
 22from chart.common.decorators import cached
 23from chart.common.decorators import lra_cache
 24from chart.products.eps.gpfs import EPSFilename
 25from chart.common.decorators import memoized
 26from chart.products.sf00 import sf
 27from chart.backend.job import Job
 28from chart.backend.activity import Activity
 29from chart.db.model.table import TableInfo
 30from chart.db.model.table import StatsStorage
 31from chart.products.fdf.orbit import find_orbit
 32from chart.db.connection import db_connect
 33from chart.db.model.table import TableType
 34from chart.project import SID
 35
 36# delay between the time when the first PDU of an orbit arrives
 37# and the per-orbit jobs can be executed
 38ORBITAL_DELAY = timedelta(minutes=150)
 39
 40# Used by hourly processing derived jobs computation
 41HOUR = timedelta(hours=1)
 42
 43
 44class SBandFilename:
 45    """Decode the name of an MCS sband file to extract sid, start and stop times.
 46
 47    >> sb = SBandFilename('2010080185059_2010080190258.G1MCSS01.m02s_nom.gz')
 48    >> print(sb.sid, sb.sensing_start, sb.sensing_stop)
 49    M02 2010-03-22 18:50:59 2010-03-22 19:02:58
 50
 51    >> sb = SBandFilename('2010080185059_2010080190258.m02s_nom.gz')
 52    >> print(sb.sid, sb.sensing_start, sb.sensing_stop)
 53    M02 2010-03-22 18:50:59 2010-03-22 19:02:58
 54
 55    (note, the doctests are disabled because this function uses SID so it not available
 56    standalone with no project)
 57    """
 58
 59    matcher = re.compile(
 60        r'(?P<start_year>[0-9]{4})'
 61        r'(?P<start_doy>[0-9]{3})'
 62        r'(?P<start_hour>[0-9]{2})'
 63        r'(?P<start_minute>[0-9]{2})'
 64        r'(?P<start_second>[0-9]{2})'
 65        r'_'
 66        r'(?P<stop_year>[0-9]{4})'
 67        r'(?P<stop_doy>[0-9]{3})'
 68        r'(?P<stop_hour>[0-9]{2})'
 69        r'(?P<stop_minute>[0-9]{2})'
 70        r'(?P<stop_second>[0-9]{2})'
 71        r'(\.[A-Za-z0-9]{8})?'
 72        r'\.'
 73        r'(?P<scid>[a-zA-Z0-9]{3})'  # scidfree
 74        r'.*\.gz')
 75
 76    def __init__(self, filename):
 77        # 2007261075013_2007261080212.m02s_nom.gz
 78        match = SBandFilename.matcher.match(os.path.basename(filename))
 79        if match is None:
 80            raise ValueError('Bad sband filename ' + filename)
 81
 82        self.sid = SID(match.group('scid').upper())  # scidfree
 83        self.sensing_start = datetime(int(match.group('start_year')),
 84                                      1,
 85                                      1,
 86                                      int(match.group('start_hour')),
 87                                      int(match.group('start_minute')),
 88                                      int(match.group('start_second'))) +\
 89                                      timedelta(days=int(match.group('start_doy')))
 90        self.sensing_stop = datetime(int(match.group('stop_year')),
 91                                      1,
 92                                      1,
 93                                      int(match.group('stop_hour')),
 94                                      int(match.group('stop_minute')),
 95                                      int(match.group('stop_second'))) +\
 96                                      timedelta(days=int(match.group('stop_doy')))
 97
 98
 99@memoized
100def affected_tables(assembly_id):
101    """Return a list of timeseries tables that might be
102    written to when snacks from `assembly_id` are ingested.
103    Returns a list of TableInfo objects.
104     """
105
106    # not sure how well generator functions work with memoized decorator
107
108    res = []
109    for table_info in TableInfo.all_ts():
110        if table_info.source_assembly_id == assembly_id:
111            # yield table_info
112            res.append(table_info)
113
114    return res
115
116
117@memoized
118def assemblies_in_dtype(dtype):
119    """For a given dtype, return the SF00 assembly ID(s) that can
120    be produced from PDUs of that type.
121    """
122
123    # not sure how well generator functions work with memoized decorator
124
125    res = []
126    for assy_id, assy_info in sf.assemblies.items():
127        if 'eps_type' in assy_info and dtype in assy_info['eps_type']:
128            # yield assy_id
129            res.append(assy_id)
130
131    return res
132
133
134def days_in_range(start_time, stop_time):
135    """Return a list of date objects, corresponding to each day in the input time range."""
136
137    while start_time < stop_time:
138        yield start_time.date()
139        start_time += timedelta(days=1)
140
141
142@cached  # don't return generators from a cached function
143def orbits_in_range(sid, start_time, stop_time, limit):
144    """Return a list of orbit dictionaries including one for each
145    orbit in the input time range.
146
147    Each dictionary consists of:
148
149     * `number` :: the orbit number
150     * `start` :: orbit start time (fractional seconds removed)
151     * `stop` :: orbit stop time (fractional seconds removed)
152
153     Note, this might be more efficient moved into orbits.py and using
154     a single query against FDF_EVENTS.
155
156    """
157
158    # to try and avoid problems where orbital stats don't get recomputed when they should,
159    # we introduce BORDER which will cause, after data has been written close to an orbit boundary,
160    # neighbouring orbits to be computed
161    BORDER = timedelta(seconds=1)
162
163    # TDB: cache the last result don't use memoized as it will grow
164    first_orbit = find_orbit(sid, start_time - BORDER)
165    # logging.info('find_orbit ' + sid + ' ' + str(start_time) + ' returns ' + str(first_orbit))
166    last_orbit = find_orbit(sid, stop_time + BORDER)
167    # print 'orbits in rane ', sid, start_time, stop_time, limit, first_orbit, last_orbit
168    # logging.debug('orbits_in_range start {start} orbit {first_orbit} stop {stop}
169    # orbit {last_orbit}'.format(start=start_time,first_orbit=first_orbit,stop=stop_time,
170    # last_orbit=last_orbit))
171    if first_orbit is None or last_orbit is None:
172        logging.debug('Cannot find orbits for times {sid} {start} {stop}'.format(
173                sid=sid, start=start_time, stop=stop_time))
174        return []
175
176    if limit is not None and limit != 0 and (last_orbit - first_orbit) > limit:
177        logging.critical('Too many orbits for {sid} from {start} to {stop} '
178                         '({first} - {last})'.format(
179                sid=sid, start=start_time, stop=stop_time, first=first_orbit, last=last_orbit))
180        return []
181
182    res = []
183    for o in range(first_orbit, last_orbit + 1):
184        try:
185            times = get_orbit_times(sid, o)
186        except NoSuchOrbit:
187            # raise Exception('cannot locate orbit ' + str(o))
188            logging.error('Could not find orbit times for {sid} for orbit {orbit}'.format(
189                    sid=sid, orbit=o))
190            continue
191
192        # yield {'number': o, 'start': times[0], 'stop': times[1]}
193        res.append({'number': o, 'start': times[0], 'stop': times[1]})
194
195    # logging.info('orbits in range res ' + str(res))
196
197    return res
198
199
200@lra_cache(maxsize=20)
201def find_daynight_regions(sid, start_time, stop_time):
202    """Return a list of days and nights intersecting given time window."""
203
204    regions = []  # {'type':'day'/'night', 'start':, 'stop':}
205
206    # look for events within 2 hours of our time window. Superfluous regions
207    # completely outside the required time range are removed at the end of the function/
208    WINDOW = timedelta(hours=2)
209
210    # first find times when we have either a penumbra_str stop time + 60s
211    # or a penumbra_end start time - 60s within our time range
212    db_conn = db_connect('FDF_EVENTS')
213    for name, region_time, orbit in db_conn.query(
214        "SELECT name, time, orbit FROM ("
215        "  SELECT"
216        "    name,"
217        "    CASE WHEN name='PENUMBRA_STR' THEN start_time+INTERVAL '60' SECOND ELSE"
218        "                                       start_time-INTERVAL '60' SECOND END AS time,"
219        "    orbit"
220        "  FROM fdf_events"
221        "  WHERE scid=:scid"  # scidfree
222        "  AND target='EARTH'"
223        "  AND ((name='PENUMBRA_STR' AND"
224        "        (start_time+INTERVAL '60' SECOND) BETWEEN :start_time AND :stop_time)"
225        "       OR (name='PENUMBRA_END' AND"
226        "        (start_time-INTERVAL '60' SECOND) BETWEEN :start_time AND :stop_time))) q "
227        "ORDER BY time",
228        scid=sid.scid,  # scidfree
229        start_time=start_time - WINDOW,
230        stop_time=stop_time + WINDOW):
231        # print name, region_time
232
233        if name == 'PENUMBRA_STR':
234            night_start = region_time
235            if len(regions) == 0:
236                regions.append({'type': 'day', 'stop': night_start, 'orbit': orbit})
237            else:
238                regions[-1]['stop'] = night_start
239
240            regions.append({'type': 'night', 'start': night_start, 'orbit': orbit})
241
242        if name == 'PENUMBRA_END':
243            day_start = region_time
244            if len(regions) == 0:
245                regions.append({'type': 'night', 'stop': day_start, 'orbit': orbit})
246            else:
247                regions[-1]['stop'] = day_start
248
249            regions.append({'type': 'day', 'start': day_start, 'orbit': orbit})
250
251    return [r for r in regions if ('start' in r and
252                                   'stop' in r and
253                                   r['stop'] >= start_time and
254                                   r['start'] <= stop_time)]
255
256
257def derived_jobs_table_timerange(category,
258                                 table_info,
259                                 sid,
260                                 orbit=None,
261                                 start_time=None,
262                                 stop_time=None,
263                                 dups=None):
264    """Low level function to find derived jobs arising from changes to a single
265    table for a specific time range (either an orbit or some other duration).
266    This will include orbital stats, other orbital algorithms and daily algorithms.
267    This function only works on all-points tables and doesn't compute derived from
268    writing to stats tables.
269    TDB: This function incorrectly assumes that the timestamp of the last
270    MDR/snack in the PDU/SF00 file is the stop time of the file. The module
271    should be modified to compute a true stop time or at least take a guess.
272    To do it perfectly would mean modifying the ingester algorithm so it returns
273    true stop times for each job performed.
274    """
275
276    # logging.debug('derived jobs table timerange ' + sid + ' ' + str(orbit) + ' ' +
277    # str(start_time) + ' ' + str(stop_time))
278
279    def periods():
280        """Yield all orbits/hours for our timerange (if only `start_time` and `stop_time`
281        were given), or just 1 orbit if `orbit` was given.
282        """
283
284        if orbit is not None:
285            yield {'number': orbit,
286                   'start': start_time,
287                   'stop': stop_time}
288
289        elif sid.satellite.orbit_duration is not None:
290            # sometimes something does wrong and we could end up creating thousands of jobs,
291            # hence the limit.
292            # SSR_POWER can run for 3 days so allow for sufficient orbits
293            for o in orbits_in_range(sid, start_time, stop_time, 60):
294                yield o
295
296        else:
297            # hourly processing.
298            # We don't actually use the <processing-frequency> from satellites.xml
299            # but this is where it would be used if we did
300            acc = start_time.replace(minute=0, second=0, microsecond=0)
301            while acc <= stop_time:
302                yield {'number': None,
303                       'start': acc,
304                       'stop': acc + HOUR}
305
306                acc += HOUR
307
308    # if sid=='N19':
309    #     logging.debug('N19 orbits for '+str(start_time)+' to '+str(stop_time))
310    #     for o in orbits():
311    #         logging.debug('    '+str(o))
312
313    # if the table has any associated stats tables, create jobs to (re)compute
314    # them
315    if table_info.stats_storage is not StatsStorage.NONE:
316        # add a new job entry per orbit
317        for o in periods():
318            if o['number'] is not None:
319                yield Job(category=category,
320                          activity='ORBITAL_STATS',
321                          sid=sid,
322                          tablename=table_info.name,
323                          orbit=o['number'],
324                          sensing_start=o['start'],
325                          sensing_stop=o['stop'])
326
327            else:
328                yield Job(category=category,
329                          activity='HOURLY_STATS',
330                          sid=sid,
331                          tablename=table_info.name,
332                          sensing_start=o['start'],
333                          sensing_stop=o['stop'])
334
335    # day/night stats table jobs
336    if table_info.has_daynight_stats:
337        for r in find_daynight_regions(sid, start_time, stop_time):
338            yield Job(category=category,
339                      activity='DAY_STATS' if r['type'] == 'day' else 'NIGHT_STATS',
340                      sid=sid,
341                      orbit=r['orbit'],
342                      tablename=table_info.name,
343                      sensing_start=r['start'],
344                      sensing_stop=r['stop'])
345
346    # look for any db views with orbital stats tables and schedule them
347    # for (re)computation
348    for t in TableInfo.all_ts():
349        if t.table_type is TableType.VIEW and table_info in t.source_tables:
350            for o in periods():
351                yield Job(category=category,
352                          activity='ORBITAL_STATS',
353                          sid=sid,
354                          tablename=t.name,
355                          orbit=o['number'],
356                          sensing_start=o['start'],
357                          sensing_stop=o['stop'])
358
359    # next build a list of activities with orbital or daily triggers
360    # watching this table
361    for a in Activity.all():
362        if a.name in dups:
363            continue  # each activity (except ORBITAL_STATS, handled via special case)
364            # can only trigger once per source table write
365
366        if not a.enabled:
367            continue
368
369        if not a.match_sid(sid):
370            # this activity is not valid for this sid
371            continue
372
373        # make sure we catch any permutation of input orbital/daily job triggering
374        # derived orbital/daily job
375        for trigger in a.triggers:
376            # see if any of the input triggers to this activity match the table which has just been
377            # modified.
378            if trigger['table'] == table_info:
379                if trigger['type'] == 'orbital':
380                    for o in periods():
381                        yield Job(category=category,
382                                  activity=a.name,
383                                  sid=sid,
384                                  orbit=o['number'],
385                                  sensing_start=o['start'],
386                                  sensing_stop=o['stop'])
387
388                elif trigger['type'] == 'daily':
389                    for d in days_in_range(start_time,
390                                           stop_time):
391                        yield Job(category=category,
392                                  activity=a.name,
393                                  sid=sid,
394                                  sensing_start=d,
395                                  sensing_stop=d + timedelta(days=1))
396
397                dups.add(a.name)
398
399            break  # each derived activity can only be added once
400
401
402def find_derived_jobs(job):
403    """Given a source job, return a list of all the derived jobs that it triggers."""
404
405    # first pass - we build a list of affected tables.
406    # list consists of dictionaries of:
407    #  type :: orbital or ???
408    #  tablename ::
409    #  orbit :: only if its orbital
410    #  start_time
411    #  stop_time
412
413    dups = set()
414
415    if job.activity.name == 'PDU_INGESTER':
416        # PDU_INGEST is a special case. It requires some logic that cannot be put into the
417        # XML Activity file because it converts filenames into affected tables
418        # and so to per-orbit and per-day jobs
419
420        # This could be removed since the result.xml format now allows a list of affected
421        # tables to be stored.
422        # So if all ingestion code sets this properly, this rather ugly special case would not be
423        # needed.
424
425        eps_filename = EPSFilename(job.filename)
426
427        # if 'N19' in job['filename']:
428        #     logging.debug('N19 affected tables')
429        #     for assy_id in assemblies_in_dtype(eps_filename.dtype):
430        #         logging.debug('    '+str(assy_id))
431        #         for t in affected_tables(assy_id):
432        #             logging.debug('       '+str(t.name))
433
434        #     logging.debug('N19 done')
435
436        # find the list of tablenames that could be modified by this file
437        for assy_id in assemblies_in_dtype(eps_filename.dtype):
438            # logging.debug('affected assy id '+str(assy_id))
439            # Now find all raw tables that could have data written to them from this
440            # SF00 assembly ID
441            for t in affected_tables(assy_id):
442                # logging.debug('affected table '+str(t))
443                # Generate all potential jobs created be a write to table 't' for
444                # given timerange
445                for j in derived_jobs_table_timerange(
446                    job.category,
447                    t,
448                    eps_filename.sid,
449                    None,
450                    eps_filename.sensing_start,
451                    eps_filename.sensing_stop,
452                    dups):
453
454                    # logging.debug('Fresh job cat '+j['category'])
455                    if j.category == 'SCHEDULER':
456                        # For SCHEDULER (NRT) jobs only we may defer processing, to make sure
457                        # that per-orbit or per-day jobs are not executable until we have all
458                        # data for that period
459                        # logging.debug('It is scheduler, activity is '+j['activity']+\
460                            # ' tablename is '+str(j.get('tablename'))+' orbit is '+\
461                            # str(j.get('orbit','SETTONONE'))+' whole job '+str(j))
462                        if j.orbit is not None:
463                            # For orbital jobs, delay processing by ORBITAL_DELAY
464                            earliest_execution_time = min(
465                                datetime.utcnow() + ORBITAL_DELAY,
466                                eps_filename.sensing_start + timedelta(days=1))
467                            # logging.debug('NRT orbital job exec '+str(earliest_execution_time))
468                            j.earliest_execution_time = earliest_execution_time
469
470                        elif j.sensing_start is not None and j.sensing_stop is not None:
471                            # For daily jobs, defer processing until midnight
472                            # this should actually test if the trigger is a daily trigger
473                            earliest_execution_time = min(
474                                datetime.utcnow() + timedelta(days=1),
475                                eps_filename.sensing_start + timedelta(days=1))
476                            # logging.debug('non orbital, time based job start '+
477                            # str(j['sensing_start'])+\
478                                # ' stop '+str(j['sensing_stop']))
479                            if (j.sensing_stop - j.sensing_start) == timedelta(days=1):
480                                # logging.debug('Daily job starting '+str(earliest_execution_time))
481                                j.earliest_execution_time = earliest_execution_time
482
483                    yield j
484
485    # elif job['activity'] == 'SF00_INGESTER':
486    #     sf00_file = SFReader(os.path.join(job['dirname'], job['filename']))
487    #     # this happens if we have a archive product with no snacks
488    #     if sf00_file.sensing_start is None and sf00_file.sensing_stop is None:
489    #         return
490
491    #     # find the list of tablenames that could be modified by this file
492    #     for t in affected_tables(sf00_file.assembly_id):
493    #         for j in derived_jobs_table_timerange(
494    #             job['category'],
495    #             t,
496    #             sf00_file.sid,
497    #             None,
498    #             sf00_file.sensing_start,
499    #             sf00_file.sensing_stop,
500    #             dups):
501    #             if job['category'] == 'SCHEDULER':
502    #                 j['earliest_execution_time'] = datetime.utcnow() + ORBITAL_DELAY
503
504    #             yield j
505
506    elif job.activity.name == 'SBAND_INGESTER':
507        sband_filename = SBandFilename(job.filename)
508        # find the list of tablenames that could be modified by this file
509        for t in affected_tables(1):  # 1 = assembly ID for HKTM
510            for j in derived_jobs_table_timerange(
511                job.category,
512                t,
513                sband_filename.sid,
514                None,
515                sband_filename.sensing_start,
516                sband_filename.sensing_stop,
517                dups):
518
519                yield j
520
521    elif job.activity.name == 'ORBITAL_STATS':
522        # disabled for now. orbital stats jobs cannot trigger further jobs.
523        pass
524
525        # also a special case. Stats jobs use the `tablename` field so we know which
526        # tables they modify. We just look for explicit triggers from those tables.
527        # Note the MHS report uses stats tables and triggers on MHS, MHS_MINS, etc.
528        # so this is probably needed.
529        # Although technically it can be skipped so long as the worker favours
530        # ingestion, then per-orbit, then per-day jobs
531        # Except then MHS_EVENT uses stats tables for performance.
532        # This probably doesn't work very well unless it's an orbital alg being triggered.
533        # Also duplicates are not taken care of.
534
535        # As with PDU_INGESTER this ugly special case could be avoided by patching
536        # *_stats activity files to include a list of affected tables in their result.xml files.
537
538        # for s in TableInfo(job['tablename']).stats_tables():
539            # for each stats table associated with source `job['tablename']` ...
540            # for a in Activity.all():
541                # look through all activities ...
542                # for t in a.triggers:
543                    # see if the activity is triggered by this associated table...
544                    # if t['table'].name == s['tablename']:
545                        # if so create a job
546                        # yield {'category': job['category'],
547                    # 'activity': a.name,
548                # 'sid': job['sid'],
549            # 'orbit': job['orbit'],
550        # 'sensing_start': job['sensing_start'],
551    # 'sensing_stop': job['sensing_stop']}
552
553        # create a LIMIT job
554        # yield {'category': job['category'],
555            # 'activity': 'LIMITS',
556            # 'tablename': job['tablename'],
557            # 'sid': job['sid'],
558            # 'orbit': job['orbit'],
559            # 'sensing_start': job['sensing_start'],
560            # 'sensing_stop': job['sensing_stop'],
561
562    else:
563        # We have 2 systems for algorithms to tell the worker which tables they
564        # have modified:
565        # - output-tables elements in the Activity file
566        # - table elements in the result file
567
568        # for all other activities we look in the activity file, if its an orbital/daily
569        # activity which modifies tables then it must have one or more <modifies> tags,
570        # in which case we look for other activities triggered by that table
571        # (note an orbital algorithm can trigger a daily one and vice-versa)
572        # output_tables = set(activity.output_tables)
573
574        # we cannot distinguish between an activity that has not been upgraded
575        # to generate result.xml <table> elements, and old jobs that don't bother
576        if job.tables is None or len(job.tables) == 0:
577            logging.debug('Creating automatic derived jobs')
578            # this is the old system that guesses which tables were written based on
579            # activity files
580            for output_table in job.activity.output_tables:
581                for j in derived_jobs_table_timerange(job.category,
582                                                      output_table,
583                                                      job.sid,
584                                                      job.orbit,
585                                                      job.sensing_start,
586                                                      job.sensing_stop,
587                                                      dups):
588                    yield j
589
590        else:
591            logging.debug('Creating manual derived jobs')
592            # this is the new system using the <tables> section of the results file
593            # logging.debug('we have tables')
594            for t in job.tables:
595                # logging.debug('Creating derived jobs from {t}'.format(t=name_of_thing(t)))
596                # we adjust the times slightly otherwise full orbit job can trigger processing
597                # of neighbouring orbits.
598                ORBIT_DEOVERLAP = timedelta(minutes=1)
599                for j in derived_jobs_table_timerange(
600                    category=job.category,
601                    table_info=t['table'],
602                    sid=job.sid,
603                    # job.orbit,
604                    # None,
605                    start_time=t['sensing_start'] + ORBIT_DEOVERLAP,
606                    stop_time=t['sensing_stop'] - ORBIT_DEOVERLAP,
607                    dups=dups):
608                    # logging.debug('Derived job ' + str(j))
609                    yield j