1#!/usr/bin/env python3
  2
  3"""Each time the `worker` wakes up, it has to select the next job to execute."""
  4
  5import logging
  6from datetime import datetime
  7
  8from chart.project import settings
  9from chart.backend.job import job_retrieve_fields
 10from chart.backend.job import Job
 11from chart.backend.jobs import find_jobs
 12from chart.backend.jobs import prepare_find_jobs
 13from chart.backend.job import JobStatus
 14
 15logger = logging.getLogger()
 16
 17
 18def prepare_find_jobs(fields,
 19                      category_bind,
 20                      status=None,
 21                      sid_bind=False,
 22                      activity=None,
 23                      activity_bind=None,
 24                      job_id_ne_bind=None,
 25                      earliest_execution_time_null_le_bind=None,
 26                      tablename_bind=None,
 27                      tablename_ne_bind=None,
 28                      filename_bind=None,
 29                      order_by=None,
 30                      limit=None,
 31                      limit_bind=None,
 32                      db_conn=default_connection):
 33    """Construct a prepared cursor for reading `fields` from the JOBS table.
 34    Some clauses in the query are represented using bind variables (set by the _bind
 35    parameters) and other clauses may be set at query compilation time.
 36
 37    Args:
 38        `fields` (list of str): Fields to select
 39        `category_bind` (str): Name of bind variable to use for "CATEGORY=:bind" clause
 40        `status` (str): If set, search for this STATUS only.
 41        `sid_bind` (bool): If true include SID determined bind variables for the SID
 42        `activity_bind` (str): Name of bind variable to use for "ACTIVITY=:bind" clause
 43        `jobid_ne_bind` (str): Name of bind variable for "JOBID!=:bind" clause
 44        `earliest_execution_time_null_le_bind` (str): Bind variable to use for
 45            "(EARLIEST_EXECUTION_TIME IS NOT NULL AND EARLIEST_EXECUTION_TIME<:bind)" clause
 46        `tablename_bind`: (str): Bind variable to use in "TABLENAME=:bind" clause
 47        `order_by` (str): Sort field
 48        `limit` (str): Limit results to absolute number
 49        `limit_bind` (str): Bind variable to use in LIMIT clause
 50            (or LIMIT-like clause for Oracle db)
 51    """
 52
 53    if not is_listlike(fields):
 54        fields = (fields, )
 55
 56    clauses = ['category=:{bind}'.format(bind=category_bind)]
 57
 58    if status is not None:
 59        assert status is JobStatus.PENDING, 'Only PENDING jobs can be requested in this function'
 60        if status is JobStatus.PENDING:
 61            clauses.append('STATUS IS NULL')
 62
 63    if activity is not None:
 64        clauses.append('ACTIVITY=\'{activity}\''.format(activity=activity))
 65
 66    if activity_bind is not None:
 67        clauses.append('ACTIVITY=:{bind}'.format(bind=activity_bind))
 68
 69    if sid_bind:
 70        # clauses.append('SID=:{bind}'.format(bind=sid_bind))
 71        clauses.append(SID.sql_sys_where_bind('JOBS'))
 72
 73    if job_id_ne_bind is not None:
 74        clauses.append('ID!=:{bind}'.format(bind=job_id_ne_bind))
 75
 76    if earliest_execution_time_null_le_bind is not None:
 77        clauses.append('(EARLIEST_EXECUTION_TIME IS NULL OR '
 78                       'EARLIEST_EXECUTION_TIME<=:{bind})'.format(
 79                bind=earliest_execution_time_null_le_bind))
 80
 81    if tablename_bind is not None:
 82        clauses.append('TABLENAME=:{bind}'.format(bind=tablename_bind))
 83
 84    if tablename_ne_bind is not None:
 85        clauses.append('TABLENAME!=:{bind}'.format(bind=tablename_ne_bind))
 86
 87    if filename_bind is not None:
 88        clauses.append('FILENAME=:{bind}'.format(bind=filename_bind))
 89
 90    if order_by is None:
 91        order_by_sql = ''
 92
 93    elif is_listlike(order_by):
 94        order_by_sql = ' ORDER BY {o}'.format(o=','.join(order_by))
 95
 96    else:
 97        order_by_sql = ' ORDER BY {o}'.format(o=order_by)
 98
 99    if order_by is None:
100        order_row_number = ' ORDER BY filename'
101
102    elif is_listlike(order_by):
103        order_row_number = ' ORDER BY {o}'.format(o=','.join(order_by))
104
105    else:
106        order_row_number = ' ORDER BY {o}'.format(o=order_by)
107
108    if limit is None and limit_bind is None:
109        return db_conn.prepared_cursor('SELECT {fields} '
110                                       'FROM jobs '
111                                       'WHERE {where}{order}'.format(
112                fields=','.join(fields),
113                where=' AND '.join(clauses),
114                order=order_by_sql))
115
116    # make either a reference to a bind variable, or hard wire an actual limit
117    limit_sql = ':{bind}'.format(bind=limit_bind) if limit is None else limit
118    if db_conn.engine is DatabaseEngine.ORACLE:
119        return db_conn.prepared_cursor(
120            'SELECT {fields} FROM ('
121            '  SELECT {fields}, row_number() OVER ({order_rownum}) rn '
122            '  FROM jobs '
123            '  WHERE {where}{order}) q '
124            'WHERE rn<={limit}'.format(
125                fields=','.join(fields),
126                where=' AND '.join(clauses),
127                order=order_by_sql,
128                order_rownum=order_row_number,
129                limit=limit_sql))
130
131    return db_conn.prepared_cursor('SELECT {fields} '
132                                   'FROM jobs '
133                                   'WHERE {where}{order} LIMIT {limit}'.format(
134                                       fields=','.join(fields),
135                                       where=' AND '.join(clauses),
136                                       order=order_by_sql,
137                                       limit=limit_sql))
138
139
140def select_next_job(category,
141                    now=datetime.utcnow(),
142                    only_activities=None,
143                    exclude_activities=None,
144                    only_ingestion=False,
145                    exclude_ingestion=False,
146                    only_filenames=None,
147                    db_conn=None,
148                    lock=False):
149    """Pick the next job to process for `category`.
150    Look for ingestions first, then per-orbit algorithms, then others.
151    For some data types the order of ingestion counts, so we order all ingestions
152    by filename.
153    If activities is a non-empty list of Activity objects, restrict answer to just those activities.
154
155    Note, all these parameters can be safely ignored by alternate implementations of this function.
156    They are only needed if you want to run multiple `worker`s with each one handling different
157    types of jobs.
158
159    We use the seemingly-unnecessary only/exclude_ingestion parameters because it makes the
160    implementation of this function much easier and also makes it easier to set up special
161    ingestion-only workers and processing-only workers.
162
163    Args:
164        `category` (str): Category to process
165        `now` (datetime): Assume the current time is `now`
166        `only_activities` (list of str): Only consider activities in this list
167        `exclude_activities` (list of str): Ignore these activities
168        `only_ingestion` (bool): Only look at ingestion jobs
169        `exclude_ingestion` (bool): Ignore all ingestion activities
170        `only_filenames` (str): Not implemented. This could be used to construct worker processes
171            specific to certain file types.
172        `db_conn` (optional, Connection): Used by non-EPS implementations to lock the JOBS table
173        `lock` (bool): Used by non-EPS implementations to lock the JOBS table
174
175    Returns:
176        A Job object.
177    """
178
179    logger.debug('Find next job, exclude ingestion is {e}'.format(e=exclude_ingestion))
180
181    if exclude_ingestion is not True:
182        # if only_filenames is not None:
183            # raise ConfigError('--only-filenames not implemented')
184
185        # logging.debug('Looking for FDF jobs')
186
187        # logging.debug('Looking for other ingestion jobs')
188        if only_filenames is None:
189            job = find_jobs(
190                fields=job_retrieve_fields(),
191                category=category,
192                status=JobStatus.PENDING,
193                activity='FDF_INGESTER',
194                limit=1).fetchone()
195
196            if job is not None:
197                return Job(row=job)
198
199            job = find_jobs(fields=job_retrieve_fields(),
200                            category=category,
201                            status=JobStatus.PENDING,
202                            filename_ne=None,
203                            limit=1).fetchone()
204
205        else:
206            job = find_jobs(fields=job_retrieve_fields(),
207                            category=category,
208                            status=JobStatus.PENDING,
209                            filename_like=only_filenames,
210                            limit=1).fetchone()
211
212        if job is not None:
213            return Job(row=job)
214
215    if only_ingestion:
216        return None
217
218    # logging.debug('Looking for orbital jobs')
219    job = find_jobs(fields=job_retrieve_fields(),
220                    category=category,
221                    status=JobStatus.PENDING,
222                    orbit_ne=None,
223                    filename=None,
224                    earliest_execution_time_null_le=now,
225                    activity=only_activities,
226                    activity_ne=exclude_activities,
227                    limit=1).fetchone()
228    if job is not None:
229        return Job(row=job)
230
231    # logging.debug('Looking for any jobs')
232    job = find_jobs(fields=job_retrieve_fields(),
233                    category=category,
234                    status=JobStatus.PENDING,
235                    filename=None,
236                    earliest_execution_time_null_le=now,
237                    orbit=None,
238                    activity=only_activities,
239                    activity_ne=exclude_activities,
240                    limit=1).fetchone()
241    if job is not None:
242        return Job(row=job)
243
244    else:
245        return None
246
247
248def find_similar_jobs(job, count, now, only_filenames=None):
249    """Given a primary job `jobid` identify similar jobs that
250    can be performed as a group by a single call to the algorithm,
251    up to a maximum of `count` total jobs.
252    For report activities we return nothing since they are never batched together.
253    """
254
255    if not find_similar_jobs.made_cursors:
256        # locate all other ingestion jobs
257        find_similar_jobs.pdu_cur = prepare_find_jobs(
258            fields=job_retrieve_fields(),
259            status=JobStatus.PENDING,
260            activity_bind='activity',
261            category_bind='category',
262            job_id_ne_bind='jobid',
263            limit_bind='count',
264            order_by='filename')
265
266        # locate other orbital stats jobs based on the same table
267        find_similar_jobs.same_table_cur = prepare_find_jobs(
268            fields=job_retrieve_fields(),
269            status=JobStatus.PENDING,
270            activity_bind='activity',
271            tablename_bind='tablename',
272            earliest_execution_time_null_le_bind='now',
273            category_bind='category',
274            job_id_ne_bind='jobid',
275            order_by=('SCID', 'ORBIT'),
276            limit_bind='count')
277
278        # locate other orbital stats jobs from different tables (test not needed?)
279        find_similar_jobs.other_table_cur = prepare_find_jobs(
280            fields=job_retrieve_fields(),
281            status=JobStatus.PENDING,
282            activity_bind='activity',
283            earliest_execution_time_null_le_bind='now',
284            tablename_ne_bind='tablename',
285            category_bind='category',
286            job_id_ne_bind='jobid',
287            order_by=('TABLENAME', 'SCID', 'ORBIT'),
288            limit_bind='count')
289
290        # locate other jobs with the same activity
291        find_similar_jobs.non_orbital_stats_cur = prepare_find_jobs(
292            fields=job_retrieve_fields(),
293            status=JobStatus.PENDING,
294            earliest_execution_time_null_le_bind='now',
295            activity_bind='activity',
296            sid_bind=True,
297            category_bind='category',
298            job_id_ne_bind='jobid',
299            order_by='ORBIT',
300            limit_bind='count')
301
302        # locate other day/night stats jobs #mja changed SQL to use LIMIT
303        find_similar_jobs.day_night_stats_cur = prepare_find_jobs(
304            fields=job_retrieve_fields(),
305            status=JobStatus.PENDING,
306            earliest_execution_time_null_le_bind='now',
307            activity_bind='activity',
308            category_bind='category',
309            job_id_ne_bind='jobid',
310            limit_bind='count')
311
312    # we expect to have an 'id' member in `job`
313
314    if not job.activity.allow_multiple or job.activity.classname == 'report':
315        # If this is an old-style algorithm with <allow-multiple>false</>
316        # or if it is a report, we only run a single job per process
317        return
318
319    # if 'filename' in job['activity']:
320    # Handle EPS and SAPHIRE ingesters (containing '_INGESTER')
321    # Handle PAROT ingesters ('FDF_GEOEVENT', 'WIMPY', 'STEPMSG', 'DUMPSYNC')
322    if '_INGESTER' in job.activity.name or \
323            job.activity.name in ('FDF_GEOEVENT', 'WIMPY', 'STEPMSG', 'DUMPSYNC'):
324        # in ('PDU_INGESTER', 'MPHR_INGESTER', 'SF00_INGESTER', 'SBAND_INGESTER',
325        # 'DUMPSYNC_INGESTER', 'ADA_INGESTER'):
326        if only_filenames is not None:
327            # select only jobs with the same Activity as the original and filenames matching
328            # only_filenames
329
330            for newjob in find_jobs(fields=job_retrieve_fields(),
331                                    status=JobStatus.PENDING,
332                                    activity=job.activity,
333                                    category=job.category,
334                                    job_id_ne=job.job_id,
335                                    filename_like=only_filenames,
336                                    limit=count,
337                                    order_by='FILENAME'):
338                yield Job(row=newjob)
339
340        else:
341            # order by filename as this matters for HKTM and NOAA GAC
342            # logging.info('cursor ' + find_similar_jobs.pdu_cur.statement)
343            for newjob in find_similar_jobs.pdu_cur.execute(
344                None,
345                activity=job.activity.name,
346                jobid=job.job_id,
347                category=job.category,
348                count=count):
349
350                yield Job(row=newjob)
351
352    elif job.activity.name in ('ORBITAL_STATS', 'HOURLY_STATS'):
353        # logging.debug('seek orbital stats')
354        # pass 1 - yield other orbital stats jobs with the same tablename
355        cc = 0
356
357        # logging.debug('Seek other _stats jobs sametable')
358        # logging.debug(find_similar_jobs.same_table_cur.statement)
359
360        for newjob in find_similar_jobs.same_table_cur.execute(
361            None,
362            now=now,
363            jobid=job.job_id,
364            category=job.category,
365            tablename=job.tablename,
366            activity=job.activity.name,
367            count=count):
368            cc += 1  # dont try using enumerate instead
369
370            yield Job(row=newjob)
371
372        # pass 2 - yield any other orbital stats jobs
373        # logging.debug(find_similar_jobs.other_table_cur.statement)
374
375        # logging.debug('Seek other _stats jobs othertable')
376        # logging.debug(find_similar_jobs.other_table_cur.statement)
377
378        if cc < (count - 1):
379            for newjob in find_similar_jobs.other_table_cur.execute(
380                None,
381                now=now,
382                jobid=job.job_id,
383                category=job.category,
384                tablename=job.tablename,
385                activity=job.activity.name,
386                count=count - cc):
387
388                yield Job(row=newjob)
389
390        # logging.debug('done orbstat')
391
392    elif job.activity.name in ('DAY_STATS', 'NIGHT_STATS'):
393        for newjob in find_similar_jobs.day_night_stats_cur.execute(
394            None,
395            now=now,
396            jobid=job.job_id,
397            category=job.category,
398            activity=job.activity.name,
399            count=count):
400
401            yield Job(row=newjob)
402
403    elif job.orbit is not None:
404        # We filter on scid here as it is awkward to handle multiple scids in algorithms that
405        # raise events.
406        # Also, ordering by orbit is probably safer
407        for newjob in find_similar_jobs.non_orbital_stats_cur.execute(
408            None,
409            now=now,
410            jobid=job.job_id,
411            category=job.category,
412            activity=job.activity.name,
413            scid=job.scid,
414            count=count):
415
416            yield Job(row=newjob)
417
418find_similar_jobs.made_cursors = False