1#!/usr/bin/env python3
2
3"""Each time the `worker` wakes up, it has to select the next job to execute."""
4
5import logging
6from datetime import datetime
7
8from chart.project import settings
9from chart.backend.job import job_retrieve_fields
10from chart.backend.job import Job
11from chart.backend.jobs import find_jobs
12from chart.backend.jobs import prepare_find_jobs
13from chart.backend.job import JobStatus
14
15logger = logging.getLogger()
16
17
18def prepare_find_jobs(fields,
19 category_bind,
20 status=None,
21 sid_bind=False,
22 activity=None,
23 activity_bind=None,
24 job_id_ne_bind=None,
25 earliest_execution_time_null_le_bind=None,
26 tablename_bind=None,
27 tablename_ne_bind=None,
28 filename_bind=None,
29 order_by=None,
30 limit=None,
31 limit_bind=None,
32 db_conn=default_connection):
33 """Construct a prepared cursor for reading `fields` from the JOBS table.
34 Some clauses in the query are represented using bind variables (set by the _bind
35 parameters) and other clauses may be set at query compilation time.
36
37 Args:
38 `fields` (list of str): Fields to select
39 `category_bind` (str): Name of bind variable to use for "CATEGORY=:bind" clause
40 `status` (str): If set, search for this STATUS only.
41 `sid_bind` (bool): If true include SID determined bind variables for the SID
42 `activity_bind` (str): Name of bind variable to use for "ACTIVITY=:bind" clause
43 `jobid_ne_bind` (str): Name of bind variable for "JOBID!=:bind" clause
44 `earliest_execution_time_null_le_bind` (str): Bind variable to use for
45 "(EARLIEST_EXECUTION_TIME IS NOT NULL AND EARLIEST_EXECUTION_TIME<:bind)" clause
46 `tablename_bind`: (str): Bind variable to use in "TABLENAME=:bind" clause
47 `order_by` (str): Sort field
48 `limit` (str): Limit results to absolute number
49 `limit_bind` (str): Bind variable to use in LIMIT clause
50 (or LIMIT-like clause for Oracle db)
51 """
52
53 if not is_listlike(fields):
54 fields = (fields, )
55
56 clauses = ['category=:{bind}'.format(bind=category_bind)]
57
58 if status is not None:
59 assert status is JobStatus.PENDING, 'Only PENDING jobs can be requested in this function'
60 if status is JobStatus.PENDING:
61 clauses.append('STATUS IS NULL')
62
63 if activity is not None:
64 clauses.append('ACTIVITY=\'{activity}\''.format(activity=activity))
65
66 if activity_bind is not None:
67 clauses.append('ACTIVITY=:{bind}'.format(bind=activity_bind))
68
69 if sid_bind:
70 # clauses.append('SID=:{bind}'.format(bind=sid_bind))
71 clauses.append(SID.sql_sys_where_bind('JOBS'))
72
73 if job_id_ne_bind is not None:
74 clauses.append('ID!=:{bind}'.format(bind=job_id_ne_bind))
75
76 if earliest_execution_time_null_le_bind is not None:
77 clauses.append('(EARLIEST_EXECUTION_TIME IS NULL OR '
78 'EARLIEST_EXECUTION_TIME<=:{bind})'.format(
79 bind=earliest_execution_time_null_le_bind))
80
81 if tablename_bind is not None:
82 clauses.append('TABLENAME=:{bind}'.format(bind=tablename_bind))
83
84 if tablename_ne_bind is not None:
85 clauses.append('TABLENAME!=:{bind}'.format(bind=tablename_ne_bind))
86
87 if filename_bind is not None:
88 clauses.append('FILENAME=:{bind}'.format(bind=filename_bind))
89
90 if order_by is None:
91 order_by_sql = ''
92
93 elif is_listlike(order_by):
94 order_by_sql = ' ORDER BY {o}'.format(o=','.join(order_by))
95
96 else:
97 order_by_sql = ' ORDER BY {o}'.format(o=order_by)
98
99 if order_by is None:
100 order_row_number = ' ORDER BY filename'
101
102 elif is_listlike(order_by):
103 order_row_number = ' ORDER BY {o}'.format(o=','.join(order_by))
104
105 else:
106 order_row_number = ' ORDER BY {o}'.format(o=order_by)
107
108 if limit is None and limit_bind is None:
109 return db_conn.prepared_cursor('SELECT {fields} '
110 'FROM jobs '
111 'WHERE {where}{order}'.format(
112 fields=','.join(fields),
113 where=' AND '.join(clauses),
114 order=order_by_sql))
115
116 # make either a reference to a bind variable, or hard wire an actual limit
117 limit_sql = ':{bind}'.format(bind=limit_bind) if limit is None else limit
118 if db_conn.engine is DatabaseEngine.ORACLE:
119 return db_conn.prepared_cursor(
120 'SELECT {fields} FROM ('
121 ' SELECT {fields}, row_number() OVER ({order_rownum}) rn '
122 ' FROM jobs '
123 ' WHERE {where}{order}) q '
124 'WHERE rn<={limit}'.format(
125 fields=','.join(fields),
126 where=' AND '.join(clauses),
127 order=order_by_sql,
128 order_rownum=order_row_number,
129 limit=limit_sql))
130
131 return db_conn.prepared_cursor('SELECT {fields} '
132 'FROM jobs '
133 'WHERE {where}{order} LIMIT {limit}'.format(
134 fields=','.join(fields),
135 where=' AND '.join(clauses),
136 order=order_by_sql,
137 limit=limit_sql))
138
139
140def select_next_job(category,
141 now=datetime.utcnow(),
142 only_activities=None,
143 exclude_activities=None,
144 only_ingestion=False,
145 exclude_ingestion=False,
146 only_filenames=None,
147 db_conn=None,
148 lock=False):
149 """Pick the next job to process for `category`.
150 Look for ingestions first, then per-orbit algorithms, then others.
151 For some data types the order of ingestion counts, so we order all ingestions
152 by filename.
153 If activities is a non-empty list of Activity objects, restrict answer to just those activities.
154
155 Note, all these parameters can be safely ignored by alternate implementations of this function.
156 They are only needed if you want to run multiple `worker`s with each one handling different
157 types of jobs.
158
159 We use the seemingly-unnecessary only/exclude_ingestion parameters because it makes the
160 implementation of this function much easier and also makes it easier to set up special
161 ingestion-only workers and processing-only workers.
162
163 Args:
164 `category` (str): Category to process
165 `now` (datetime): Assume the current time is `now`
166 `only_activities` (list of str): Only consider activities in this list
167 `exclude_activities` (list of str): Ignore these activities
168 `only_ingestion` (bool): Only look at ingestion jobs
169 `exclude_ingestion` (bool): Ignore all ingestion activities
170 `only_filenames` (str): Not implemented. This could be used to construct worker processes
171 specific to certain file types.
172 `db_conn` (optional, Connection): Used by non-EPS implementations to lock the JOBS table
173 `lock` (bool): Used by non-EPS implementations to lock the JOBS table
174
175 Returns:
176 A Job object.
177 """
178
179 logger.debug('Find next job, exclude ingestion is {e}'.format(e=exclude_ingestion))
180
181 if exclude_ingestion is not True:
182 # if only_filenames is not None:
183 # raise ConfigError('--only-filenames not implemented')
184
185 # logging.debug('Looking for FDF jobs')
186
187 # logging.debug('Looking for other ingestion jobs')
188 if only_filenames is None:
189 job = find_jobs(
190 fields=job_retrieve_fields(),
191 category=category,
192 status=JobStatus.PENDING,
193 activity='FDF_INGESTER',
194 limit=1).fetchone()
195
196 if job is not None:
197 return Job(row=job)
198
199 job = find_jobs(fields=job_retrieve_fields(),
200 category=category,
201 status=JobStatus.PENDING,
202 filename_ne=None,
203 limit=1).fetchone()
204
205 else:
206 job = find_jobs(fields=job_retrieve_fields(),
207 category=category,
208 status=JobStatus.PENDING,
209 filename_like=only_filenames,
210 limit=1).fetchone()
211
212 if job is not None:
213 return Job(row=job)
214
215 if only_ingestion:
216 return None
217
218 # logging.debug('Looking for orbital jobs')
219 job = find_jobs(fields=job_retrieve_fields(),
220 category=category,
221 status=JobStatus.PENDING,
222 orbit_ne=None,
223 filename=None,
224 earliest_execution_time_null_le=now,
225 activity=only_activities,
226 activity_ne=exclude_activities,
227 limit=1).fetchone()
228 if job is not None:
229 return Job(row=job)
230
231 # logging.debug('Looking for any jobs')
232 job = find_jobs(fields=job_retrieve_fields(),
233 category=category,
234 status=JobStatus.PENDING,
235 filename=None,
236 earliest_execution_time_null_le=now,
237 orbit=None,
238 activity=only_activities,
239 activity_ne=exclude_activities,
240 limit=1).fetchone()
241 if job is not None:
242 return Job(row=job)
243
244 else:
245 return None
246
247
248def find_similar_jobs(job, count, now, only_filenames=None):
249 """Given a primary job `jobid` identify similar jobs that
250 can be performed as a group by a single call to the algorithm,
251 up to a maximum of `count` total jobs.
252 For report activities we return nothing since they are never batched together.
253 """
254
255 if not find_similar_jobs.made_cursors:
256 # locate all other ingestion jobs
257 find_similar_jobs.pdu_cur = prepare_find_jobs(
258 fields=job_retrieve_fields(),
259 status=JobStatus.PENDING,
260 activity_bind='activity',
261 category_bind='category',
262 job_id_ne_bind='jobid',
263 limit_bind='count',
264 order_by='filename')
265
266 # locate other orbital stats jobs based on the same table
267 find_similar_jobs.same_table_cur = prepare_find_jobs(
268 fields=job_retrieve_fields(),
269 status=JobStatus.PENDING,
270 activity_bind='activity',
271 tablename_bind='tablename',
272 earliest_execution_time_null_le_bind='now',
273 category_bind='category',
274 job_id_ne_bind='jobid',
275 order_by=('SCID', 'ORBIT'),
276 limit_bind='count')
277
278 # locate other orbital stats jobs from different tables (test not needed?)
279 find_similar_jobs.other_table_cur = prepare_find_jobs(
280 fields=job_retrieve_fields(),
281 status=JobStatus.PENDING,
282 activity_bind='activity',
283 earliest_execution_time_null_le_bind='now',
284 tablename_ne_bind='tablename',
285 category_bind='category',
286 job_id_ne_bind='jobid',
287 order_by=('TABLENAME', 'SCID', 'ORBIT'),
288 limit_bind='count')
289
290 # locate other jobs with the same activity
291 find_similar_jobs.non_orbital_stats_cur = prepare_find_jobs(
292 fields=job_retrieve_fields(),
293 status=JobStatus.PENDING,
294 earliest_execution_time_null_le_bind='now',
295 activity_bind='activity',
296 sid_bind=True,
297 category_bind='category',
298 job_id_ne_bind='jobid',
299 order_by='ORBIT',
300 limit_bind='count')
301
302 # locate other day/night stats jobs #mja changed SQL to use LIMIT
303 find_similar_jobs.day_night_stats_cur = prepare_find_jobs(
304 fields=job_retrieve_fields(),
305 status=JobStatus.PENDING,
306 earliest_execution_time_null_le_bind='now',
307 activity_bind='activity',
308 category_bind='category',
309 job_id_ne_bind='jobid',
310 limit_bind='count')
311
312 # we expect to have an 'id' member in `job`
313
314 if not job.activity.allow_multiple or job.activity.classname == 'report':
315 # If this is an old-style algorithm with <allow-multiple>false</>
316 # or if it is a report, we only run a single job per process
317 return
318
319 # if 'filename' in job['activity']:
320 # Handle EPS and SAPHIRE ingesters (containing '_INGESTER')
321 # Handle PAROT ingesters ('FDF_GEOEVENT', 'WIMPY', 'STEPMSG', 'DUMPSYNC')
322 if '_INGESTER' in job.activity.name or \
323 job.activity.name in ('FDF_GEOEVENT', 'WIMPY', 'STEPMSG', 'DUMPSYNC'):
324 # in ('PDU_INGESTER', 'MPHR_INGESTER', 'SF00_INGESTER', 'SBAND_INGESTER',
325 # 'DUMPSYNC_INGESTER', 'ADA_INGESTER'):
326 if only_filenames is not None:
327 # select only jobs with the same Activity as the original and filenames matching
328 # only_filenames
329
330 for newjob in find_jobs(fields=job_retrieve_fields(),
331 status=JobStatus.PENDING,
332 activity=job.activity,
333 category=job.category,
334 job_id_ne=job.job_id,
335 filename_like=only_filenames,
336 limit=count,
337 order_by='FILENAME'):
338 yield Job(row=newjob)
339
340 else:
341 # order by filename as this matters for HKTM and NOAA GAC
342 # logging.info('cursor ' + find_similar_jobs.pdu_cur.statement)
343 for newjob in find_similar_jobs.pdu_cur.execute(
344 None,
345 activity=job.activity.name,
346 jobid=job.job_id,
347 category=job.category,
348 count=count):
349
350 yield Job(row=newjob)
351
352 elif job.activity.name in ('ORBITAL_STATS', 'HOURLY_STATS'):
353 # logging.debug('seek orbital stats')
354 # pass 1 - yield other orbital stats jobs with the same tablename
355 cc = 0
356
357 # logging.debug('Seek other _stats jobs sametable')
358 # logging.debug(find_similar_jobs.same_table_cur.statement)
359
360 for newjob in find_similar_jobs.same_table_cur.execute(
361 None,
362 now=now,
363 jobid=job.job_id,
364 category=job.category,
365 tablename=job.tablename,
366 activity=job.activity.name,
367 count=count):
368 cc += 1 # dont try using enumerate instead
369
370 yield Job(row=newjob)
371
372 # pass 2 - yield any other orbital stats jobs
373 # logging.debug(find_similar_jobs.other_table_cur.statement)
374
375 # logging.debug('Seek other _stats jobs othertable')
376 # logging.debug(find_similar_jobs.other_table_cur.statement)
377
378 if cc < (count - 1):
379 for newjob in find_similar_jobs.other_table_cur.execute(
380 None,
381 now=now,
382 jobid=job.job_id,
383 category=job.category,
384 tablename=job.tablename,
385 activity=job.activity.name,
386 count=count - cc):
387
388 yield Job(row=newjob)
389
390 # logging.debug('done orbstat')
391
392 elif job.activity.name in ('DAY_STATS', 'NIGHT_STATS'):
393 for newjob in find_similar_jobs.day_night_stats_cur.execute(
394 None,
395 now=now,
396 jobid=job.job_id,
397 category=job.category,
398 activity=job.activity.name,
399 count=count):
400
401 yield Job(row=newjob)
402
403 elif job.orbit is not None:
404 # We filter on scid here as it is awkward to handle multiple scids in algorithms that
405 # raise events.
406 # Also, ordering by orbit is probably safer
407 for newjob in find_similar_jobs.non_orbital_stats_cur.execute(
408 None,
409 now=now,
410 jobid=job.job_id,
411 category=job.category,
412 activity=job.activity.name,
413 scid=job.scid,
414 count=count):
415
416 yield Job(row=newjob)
417
418find_similar_jobs.made_cursors = False