1#!/usr/bin/env python3
  2
  3"""Allow the `worker` to construct job chains by telling it which new jobs are trigger by existing
  4completed jobs.
  5
  6This is a swappable module and different projects can use their own implementation.
  7
  8In future we may have a fully XML-controlled system but for now the job chaining has to
  9be hard-coded here.
 10
 11The rules we have are:
 12
 13- After TM_INGESTER completes create a series of 20-min stats jobs and 1-day stats jos
 14- After TM_INGESTER completes create a 1-day job, set to execute at 6am,
 15- After TM_INGESTER completes create a short-term stats job for the time range written to in TM
 16  and a long-term starts job for the full day set to execute at 6am
 17- After GEOEVENTS_INGESTER completes, run GEOEVENTS_EVENTS over the time period written to / specified
 18
 19This particular module is tuned for the Chart S3 project and is based around ingestion or orbital jobs
 20triggering orbital jobs.
 21
 22By changing `settings.WORKER_JOB_CHAIN` to point somewhere else a project may use it's own
 23mechanism, or no chains by setting it to None.
 24"""
 25
 26import logging
 27from datetime import timedelta
 28
 29from chart.backend.job import Job
 30
 31logger = logging.getLogger()
 32
 33# Duration of long term stats jobs
 34DAY = timedelta(days=1)
 35
 36# Delay processing long term stats jobs for a few hours to allow all data to arrive
 37LATENCY = timedelta(hours=6)
 38
 39
 40def find_derived_jobs(job):
 41    """Given a source job, return a list of all the derived jobs that it triggers."""
 42
 43    # first pass - we build a list of affected tables.
 44    # list consists of dictionaries of:
 45    #  type :: orbital or ???
 46    #  tablename ::
 47    #  orbit :: only if its orbital
 48    #  start_time
 49    #  stop_time
 50
 51    logger.info("Looking for derived jobs from {j}".format(j=job))
 52
 53    # We can either trigger GEOEVENTS_EVENTS by detecting that the GEOEVENTS_INGESTER
 54    # activity ran, or more generally by checking for any jobs that write to the
 55    # GEO_EVENTS table
 56    # A special case for making EVENTS from the geo_events table
 57    # if job.activity.name == 'GEOEVENTS_INGESTER':
 58    # yield Job(category=job.category,
 59    # activity='GEOEVENTS_EVENTS',
 60    # sid=job.sid,
 61    # sensing_start=job.sensing_start,
 62    # sensing_stop=job.sensing_stop)
 63
 64    # Scan for jobs which recorded writing to tables. This has to be done
 65    # explicitly by a well-behaved algorithm when it marks jobs as complete
 66    for t in job.tables:
 67        logger.info(
 68            "Detected a write to {t} from {strt} to {stop} looking for reactions".format(
 69                t=t["table"].name, strt=t["sensing_start"], stop=t["sensing_stop"]
 70            )
 71        )
 72        if t["table"].name == "GEO_EVENTS":
 73            yield Job(
 74                category=job.category,
 75                activity="GEOEVENTS_EVENTS",
 76                sid=t["sid"],
 77                sensing_start=t["sensing_start"],
 78                sensing_stop=t["sensing_stop"],
 79            )
 80
 81
 82        # 
 83        # if t["table"].name == "TC":
 84        #     yield Job(
 85        #         category=job.category,
 86        #         activity="TC_EVENTS",
 87        #         sid=t["sid"],
 88        #         sensing_start=t["sensing_start"],
 89        #         sensing_stop=t["sensing_stop"],
 90        #     )
 91
 92        if t["table"].name == "TM":
 93            yield Job(
 94                category=job.category,
 95                activity="ONBOARD_EVENTS",
 96                sid=t["sid"],
 97                sensing_start=t["sensing_start"],
 98                sensing_stop=t["sensing_stop"],
 99            )
100
101        if t["table"].name == "APEX_LOG":
102            yield Job(
103                category=job.category,
104                activity="APEX_EVENTS",
105                sid=t["sid"],
106                sensing_start=t["sensing_start"],
107                sensing_stop=t["sensing_stop"],
108            )
109
110        if t["table"].name == "MANOEUVRE_PREDICTION":
111            yield Job(
112                category=job.category,
113                activity="MANO_PRED_EVENTS",
114                sid=t["sid"],
115                sensing_start=t["sensing_start"],
116                sensing_stop=t["sensing_stop"],
117            )
118
119        if t["table"].name == "GEO_EVENTS":
120            yield Job(
121                category=job.category,
122                activity="GEOEVENTS_EVENTS",
123                sid=t["sid"],
124                sensing_start=t["sensing_start"],
125                sensing_stop=t["sensing_stop"],
126            )
127
128        if t["table"].name == "SCHEDULE_ACTIVITIES":
129            yield Job(
130                category=job.category,
131                activity="SCHEDULE_EVENTS",
132                sid=t["sid"],
133                sensing_start=t["sensing_start"],
134                sensing_stop=t["sensing_stop"],
135            )
136
137        # Try and implement generic stats handling
138        if t["table"].has_stats:
139            # First we make short term stats. These are for statistics which are generally the
140            # same length or shorted than our input files, therefore it makes sense to trigger
141            # them immediately after ingestion
142            yield Job(
143                category=job.category,
144                activity="SHORT_TERM_STATS",
145                sid=t["sid"],
146                tablename=t["table"].name,
147                sensing_start=t["sensing_start"],
148                sensing_stop=t["sensing_stop"],
149            )
150
151            # How handle long term stats such as daily stats. This cover multiple input products
152            # and it makes sense to delay them until many input products have been ingested,
153            # to avoid recomputing big stats jobs too often.
154            # Here, we assume that daily generation is reasonable, and raise daily stats
155            # jobs delayed until 6am the next day.
156            # Note the worker will automatically squash duplicate jobs to it's safe
157            # to create multiple long term stats jobs (from multiple input files)
158            # only one will be executed
159            start = t["sensing_start"].replace(
160                hour=0, minute=0, second=0, microsecond=0
161            )
162            yield Job(
163                category=job.category,
164                activity="LONG_TERM_STATS",
165                sid=t["sid"],
166                tablename=t["table"].name,
167                sensing_start=start,
168                sensing_stop=start + DAY,
169                earliest_execution_time=start + DAY + LATENCY,
170            )