1#!/usr/bin/env python3
2
3"""Read and write to the JOBS table."""
4
5import time
6import logging
7from datetime import datetime
8from typing import Tuple
9
10from chart.common.traits import is_listlike
11from chart.common.traits import name_of_thing
12from chart.backend.job import JobStatus
13from chart.db.connection import db_connect
14from chart.db.func import ANY
15from chart.db.func import DONT_CHANGE
16from chart.db.func import Func
17from chart.db.settings import DatabaseEngine
18from chart.db.exceptions import DatabaseError
19from chart.db.exceptions import SQLError
20from chart.project import settings
21from chart.project import SID
22from chart.backend.activity import Activity
23from chart.backend.activity import DEFAULT_PRIORITY
24from chart.backend.job import Job
25from chart.db.func import Reversed
26
27default_connection = db_connect('JOBS')
28
29logger = logging.getLogger()
30
31
32def find_jobs(fields, # (too many args) pylint: disable=R0913
33 sid=ANY,
34 job_id=None,
35 job_id_ne=None,
36 category=None,
37 activity=None,
38 activity_ne=None,
39 status=None,
40 filename=ANY,
41 filename_ne=ANY,
42 filename_like=None,
43 tablename=None,
44 tablename_like=None,
45 tablename_ne=None,
46 sensing_start=None,
47 sensing_start_ge=None,
48 sensing_start_lt=None,
49 sensing_stop=None,
50 earliest_execution_time_null_le=None,
51 gen_time_ge=None,
52 gen_time_gt=None,
53 gen_time_lt=None,
54 sensing_start_or_gen_time_ge=None,
55 sensing_start_or_gen_time_lt=None,
56 orbit=None,
57 orbit_ne=None,
58 process_id=None,
59 parent=None,
60 parent_ne=ANY,
61 group_by=None,
62 order_by=None,
63 limit=None,
64 clauses=None,
65 bindvars=None,
66 db_conn=default_connection,
67 lock=False):
68 """Return a cursor of rows from the JOBS table.
69
70 All parameters mean look for jobs where field equals parameter value, unless the
71 parameter ends in '_ne' (not equal), '_like' (SQL like), '_ge' (greater than or equal),
72 '_lt' (less than).
73
74 Args:
75 - `activity` (str or list of str): Literal strings to search for unless they contain
76 '%' in which case they become LIKE clauses.
77 - `clauses` (list of str): Raw SQL clauses
78 - `bindvars` (dict): Bind variables to go with `clauses`
79
80 Everything else is a single item of type str, int or datetime as appropriate.
81
82 Returns:
83 Cursor
84 """
85
86 if bindvars is None:
87 bindvars = {}
88
89 if clauses is None:
90 clauses = []
91
92 if job_id is not None:
93 clauses.append('id=:jobid')
94 bindvars['jobid'] = job_id
95
96 if job_id_ne is not None:
97 clauses.append('id!=:jobidne')
98 bindvars['jobidne'] = job_id_ne
99
100 if settings.DATABASE_PROJECT_ID is not None:
101 clauses.append('project=:project')
102 bindvars['project'] = settings.DATABASE_PROJECT_ID
103
104 if category is not None:
105 clauses.append('category=:category')
106 bindvars['category'] = category.upper()
107
108 elif job_id is None and process_id is None:
109 logger.warning('find_jobs called with no CATEGORY set')
110
111 if activity is not None:
112 # activity could be an Activity object, a simple string or a string with a '%' char
113 # for a LIKE clause, or a list of all these things.
114 if isinstance(activity, str):
115 # activity is a simple string, test for wildcards
116 if '%' in activity:
117 clauses.append('ACTIVITY LIKE :activity')
118
119 else:
120 clauses.append('activity=:activity')
121
122 bindvars['activity'] = activity
123
124 elif isinstance(activity, Activity):
125 # activity is a simple Activity object
126 clauses.append('activity=:activity')
127 bindvars['activity'] = activity.name
128
129 elif len(activity) > 0:
130 # activity is a list. Build a set of activity specific clauses.
131 # activity_clauses is used to accumulate separate activity LIKE clauses
132 activity_clauses = []
133 # plain activities are bunched up and used to build a single activity in list clause
134 # at the end
135 plain_activities = []
136
137 # bind variable count.
138 cc = [1] # mutable in inner function
139
140 def act_bind(value):
141 """Add a new bind variable just for activities."""
142 new_name = 'activity_{cc}'.format(cc=cc[0])
143 cc[0] += 1
144 bindvars[new_name] = value
145 return ':{name}'.format(name=new_name)
146
147 for a in activity:
148 if isinstance(a, Activity):
149 plain_activities.append(a.name)
150
151 elif '%' in a:
152 activity_clauses.append('ACTIVITY LIKE {a}'.format(a=act_bind(a)))
153
154 else:
155 plain_activities.append(a)
156
157 # assemble the non-wildcard requests into a single clause (either activity in list
158 # or activity = x)
159 # The combined plain activity(s) clause is appended to the wildcard activities clauses
160 if len(plain_activities) > 1:
161 activity_clauses.append('ACTIVITY IN ({acts})'.format(
162 acts=','.join(act_bind(a) for a in plain_activities)))
163
164 elif len(plain_activities) == 1:
165 activity_clauses.append('ACTIVITY={a}'.format(a=act_bind(plain_activities[0])))
166
167 # this is just to avoid create unnecessary brackets if there is only one clause...
168 if len(activity_clauses) > 1:
169 clauses.append('({all})'.format(all=' OR '.join(activity_clauses)))
170
171 else:
172 clauses.append(activity_clauses[0])
173
174 if activity_ne is not None:
175 # activity could be an Activity object, a simple string or a string with a '%' char
176 # for a LIKE clause, or a list of all these things.
177 if isinstance(activity, str):
178 if '%' in activity:
179 clauses.append('ACTIVITY NOT LIKE :activity')
180
181 else:
182 clauses.append('activity!=:activity')
183
184 bindvars['activity'] = activity
185
186 elif isinstance(activity, Activity):
187 clauses.append('activity!=:activity')
188 bindvars['activity'] = activity.name
189
190 else:
191 activity_ne_clauses = []
192 plain_activities = []
193
194 cc = [1] # mutable in inner function
195
196 def act_ne_bind(value):
197 """Add a new bind variable."""
198 new_name = 'activity_ne_{cc}'.format(cc=cc[0])
199 cc[0] += 1
200 bindvars[new_name] = value
201 return ':{name}'.format(name=new_name)
202
203 for a in activity_ne:
204 if isinstance(a, Activity):
205 plain_activities.append(a.name)
206
207 elif '%' in a:
208 activity_ne_clauses.append('ACTIVITY NOT LIKE {a}'.format(a=act_ne_bind(a)))
209
210 else:
211 plain_activities.append(a)
212
213 if len(plain_activities) > 1:
214 activity_ne_clauses.append('ACTIVITY NOT IN ({acts})'.format(
215 acts=','.join(act_ne_bind(a) for a in plain_activities)))
216
217 elif len(plain_activities) == 1:
218 activity_ne_clauses.append('ACTIVITY!={a}'.format(
219 a=act_ne_bind(plain_activities[0])))
220
221 if len(activity_ne_clauses) > 1:
222 clauses.append('({all})'.format(all=' AND '.join(activity_ne_clauses)))
223
224 elif len(activity_ne_clauses) == 1:
225 clauses.append(activity_ne_clauses[0])
226
227 if sid is not ANY:
228 # may be SIDLESS
229 clauses.append(SID.sql_sys_where('JOBS', sid))
230
231 if filename is not ANY:
232 if filename is None:
233 clauses.append('FILENAME IS NULL')
234
235 else:
236 clauses.append('FILENAME=:filename')
237 bindvars['filename'] = filename
238
239 if filename_ne is not ANY:
240 if filename_ne is None:
241 clauses.append('FILENAME IS NOT NULL')
242
243 else:
244 clauses.append('FILENAME!=:filename_ne')
245 bindvars['filename_ne'] = filename_ne
246
247 if filename_like is not None:
248 clauses.append('FILENAME LIKE :filename_like')
249 bindvars['filename_like'] = filename_like
250
251 if tablename is not None:
252 clauses.append('TABLENAME=:tablename')
253 bindvars['tablename'] = name_of_thing(tablename)
254
255 if tablename_like is not None:
256 clauses.append('TABLENAME LIKE :tablename_like')
257 bindvars['tablename_like'] = name_of_thing(tablename_like)
258
259 if tablename_ne is not None:
260 clauses.append('TABLENAME!=:tablename_ne')
261 bindvars['tablename_ne'] = name_of_thing(tablename_ne)
262
263 if sensing_start is not None:
264 clauses.append('SENSING_START=:sens')
265 bindvars['sens'] = sensing_start
266
267 if sensing_start_ge is not None:
268 clauses.append('SENSING_START>=:min_sens')
269 bindvars['min_sens'] = sensing_start_ge
270
271 if sensing_start_lt is not None:
272 clauses.append('SENSING_START<:max_sens')
273 bindvars['max_sens'] = sensing_start_lt
274
275 if sensing_stop is not None:
276 clauses.append('SENSING_STOP=:sens_stop')
277 bindvars['sens_stop'] = sensing_stop
278
279 if earliest_execution_time_null_le is not None:
280 clauses.append('(EARLIEST_EXECUTION_TIME IS NULL OR '
281 'EARLIEST_EXECUTION_TIME<=:early_exe_time)')
282 bindvars['early_exe_time'] = earliest_execution_time_null_le
283
284 if gen_time_ge is not None:
285 clauses.append('GEN_TIME>=:min_gen')
286 bindvars['min_gen'] = gen_time_ge
287
288 if gen_time_gt is not None:
289 clauses.append('GEN_TIME>:min_gen2')
290 bindvars['min_gen2'] = gen_time_gt
291
292 if gen_time_lt is not None:
293 clauses.append('GEN_TIME<:max_gen')
294 bindvars['max_gen'] = gen_time_lt
295
296 if sensing_start_or_gen_time_ge is not None:
297 clauses.append('((SENSING_START IS NOT NULL AND SENSING_START>=:sens_gen_ge) OR '
298 '(GEN_TIME>=:sens_gen_ge))')
299 bindvars['sens_gen_ge'] = sensing_start_or_gen_time_ge
300
301 if sensing_start_or_gen_time_lt is not None:
302 clauses.append('((SENSING_START IS NOT NULL AND SENSING_START<:sens_gen_lt) OR '
303 '(GEN_TIME<:sens_gen_lt))')
304 bindvars['sens_gen_lt'] = sensing_start_or_gen_time_lt
305
306 if sensing_start_lt is not None:
307 clauses.append('SENSING_START<:max_sens')
308 bindvars['max_sens'] = sensing_start_lt
309
310 if settings.ORBIT_IN_JOBS_TABLE:
311 if orbit is not None:
312 clauses.append('ORBIT=:orbit')
313 bindvars['orbit'] = orbit
314
315 if orbit_ne is not None:
316 clauses.append('ORBIT!=:orbit_ne')
317 bindvars['orbit_ne'] = orbit_ne
318
319 if process_id is not None:
320 clauses.append('PROCESS_ID=:process_id')
321 bindvars['process_id'] = process_id
322
323 if status is not None:
324 if status is JobStatus.PENDING:
325 clauses.append('STATUS IS NULL')
326
327 else:
328 clauses.append('STATUS=:status')
329 bindvars['status'] = name_of_thing(status)
330
331 if parent is not None:
332 clauses.append('PARENT=:parent')
333 bindvars['parent'] = parent
334
335 if parent_ne is not ANY:
336 if parent_ne is None:
337 clauses.append('PARENT is NULL')
338
339 else:
340 raise NotImplementedError()
341
342 if group_by is not None:
343 if not is_listlike(group_by):
344 group_by = (group_by, )
345
346 group_by = ' GROUP BY {groups}'.format(groups=','.join(group_by))
347
348 else:
349 group_by = ''
350
351 if order_by is not None:
352 if is_listlike(order_by):
353 order_reqs = order_by
354
355 else:
356 order_reqs = (order_by, )
357
358 order_clauses = []
359 for order_req in order_reqs:
360 if isinstance(order_req, str):
361 order_clauses.append(order_req)
362
363 else:
364 order_clauses.append(order_req())
365
366 order_by = ' ORDER BY {orders}'.format(orders=','.join(order_clauses))
367
368 elif limit is None:
369 order_by = ''
370
371 else:
372 non_default_priorities = []
373 for a in Activity.all():
374 if a.priority != DEFAULT_PRIORITY:
375 non_default_priorities.append(a)
376
377 if len(non_default_priorities) == 0:
378 order_by = ' ORDER BY ID'
379
380 else:
381 order_by = ' ORDER BY CASE activity {clauses} ELSE {default} END, ID'.format(
382 clauses=''.join(' WHEN \'{name}\' THEN -{priority} '.format(
383 name=a.name, priority=a.priority) for a in non_default_priorities),
384 default=DEFAULT_PRIORITY)
385
386 # logger.debug('find_jobs lock {l}'.format(l=lock))
387 if lock is True and db_conn.engine is not DatabaseEngine.ORACLE:
388 sql_lock = ' FOR UPDATE'
389
390 else:
391 sql_lock = ''
392
393 # Each element of `fields` can be a plain string column name or a Func object
394 # with an sql() member to return it's actual SQL code
395
396 if limit is None:
397 return db_conn.query('SELECT {fields} FROM JOBS WHERE {where}{group}{order}{lock}'.format(
398 fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
399 where=' AND '.join(clauses),
400 group=group_by,
401 order=order_by,
402 lock=sql_lock), **bindvars)
403
404 bindvars['limit'] = limit
405 if db_conn.engine is DatabaseEngine.ORACLE:
406 # oracle doesn't have 'LIMIT'
407 # mysql doesn't have ROW_NUMBER
408 # postgres accepts both
409 return db_conn.query(
410 'SELECT {fields} FROM ('
411 'SELECT {fields},ROW_NUMBER() OVER ({order}) rn '
412 'FROM JOBS '
413 'WHERE {where}{group}{order}) q '
414 'WHERE rn<=:limit'.format(
415 fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
416 where=' AND '.join(clauses),
417 group=group_by,
418 order=order_by),
419 **bindvars)
420
421 return db_conn.query('SELECT {fields} '
422 'FROM JOBS '
423 'WHERE {where}{group}{order} LIMIT :limit{lock}'.format(
424 fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
425 where=' AND '.join(clauses),
426 group=group_by,
427 order=order_by,
428 lock=sql_lock),
429 **bindvars)
430
431
432def count_jobs(**kwargs):
433 """Count the number of jobs, using the same criteria as find_jobs()."""
434 cursor = find_jobs(fields=('count(*)', ),
435 **kwargs)
436 result = cursor.fetchone()[0]
437 cursor.close()
438 del cursor
439 return result
440
441
442def delete_jobs(category,
443 gen_time_lt,
444 commit=True,
445 db_conn=default_connection):
446 """Delete jobs.
447 Args:
448
449 `category` (str): Category
450 `gen_time_lt` (datetime): Delete jobs whose gen_time is lower than this value.
451 `commit` (bool): Commit db changes
452
453 Returns:
454 Number of jobs deleted.
455 """
456
457 cur = db_conn.query('DELETE FROM jobs WHERE category=:category AND gen_time<:gen_time_lt',
458 category=category,
459 gen_time_lt=gen_time_lt)
460 if commit:
461 db_conn.commit()
462
463 return cur.rowcount
464
465
466def update_jobs(job=None,
467 job_id=None,
468 process_id=None,
469 category=None,
470 status=None,
471 activity=None,
472 table=None,
473 sid=None,
474 sensing_start_ge=None,
475 sensing_start_lt=None,
476 filename=None,
477 orbit=None,
478 set_status=DONT_CHANGE,
479 set_process_id=DONT_CHANGE,
480 set_critical_count=None,
481 set_error_count=None,
482 set_warning_count=None,
483 set_info_count=None,
484 commit=True,
485 db_conn=default_connection):
486 """Change JOBS table entries matching args.
487
488 Args:
489 `job` (Job or list of Job): Jobs to modify
490 `job_id` (int): Job id to change
491 `process_id` (int): Search for any existing job of this ID
492 `status` (JobStatus): Status search filter
493 `activity` (Activity or str or list of Activity or str): Activity search filter
494 `table` (str): Table name to filter for
495 `sid` (SID): Filter for source ID
496 `sensing_start_ge` (datetime): Min start time
497 `sensing_start_lt` (datetime): Max start time
498 `filename` (str): Filter for filename field
499 `orbit` (int): Filter for orbit
500 `set_process_id` (int): New process id
501 `set_status` (JobStatus|None): New job status
502
503 Returns:
504 Number of rows changed.
505 """
506 wheres = []
507 updates = []
508 bindvars = {}
509 # desc_affected = []
510 # desc_changes = []
511
512 # search clauses
513 if job_id is not None:
514 if is_listlike(job_id):
515 wheres.append('ID IN ({list})'.format(list=','.join(str(j) for j in job_id)))
516 # desc_affected.extend(job_id)
517
518 elif isinstance(job_id, int):
519 wheres.append('ID=:job_id')
520 bindvars['job_id'] = job_id
521 # desc_affected.append(job_id)
522
523 else:
524 raise ValueError('job_id must be an integer')
525
526 if job is not None:
527 if is_listlike(job):
528 wheres.append('ID IN ({list})'.format(list=','.join(str(j.job_id) for j in job)))
529 # desc_affected.extend([j.job_id for j in job])
530
531 else:
532 wheres.append('ID=:job_id')
533 bindvars['job_id'] = job.job_id
534 # desc_affected.append(job.job_id)
535
536 if sid is not None:
537 wheres.append(SID.sql_sys_where('JOBS', sid))
538 # desc_affected.append('sid={s}'.format(s=sid))
539
540 if process_id is not None:
541 wheres.append('PROCESS_ID=:process_id')
542 bindvars['process_id'] = process_id
543 # desc_affected.append('process_id={v}'.format(v=process_id))
544
545 if category is not None:
546 wheres.append('CATEGORY=:category')
547 bindvars['category'] = category
548 # desc_affected.append('category_id={v}'.format(v=category))
549
550 if filename is not None:
551 wheres.append('FILENAME=:filename')
552 bindvars['filename'] = filename
553
554 if activity is not None:
555 if is_listlike(activity):
556 if len(activity) > 0:
557 wheres.append('ACTIVITY IN ({bits})'.format(
558 bits=','.join(('\'{a}\''.format(a=name_of_thing(a)) for a in activity))))
559
560 else:
561 wheres.append('ACTIVITY=:activity')
562 bindvars['activity'] = name_of_thing(activity)
563
564 if sensing_start_ge is not None:
565 wheres.append('SENSING_START>=:ge')
566 bindvars['ge'] = sensing_start_ge
567
568 if sensing_start_lt is not None:
569 wheres.append('SENSING_START<:lt')
570 bindvars['lt'] = sensing_start_lt
571
572 if table is not None:
573 wheres.append('TABLENAME=:tablename')
574 bindvars['tablename'] = name_of_thing(table)
575 # desc_affected.append('tablename={v}'.format(v=table))
576
577 if orbit is not None:
578 wheres.append('ORBIT=:orbit')
579 bindvars['orbit'] = orbit
580
581 if status is not None:
582 if status is JobStatus.PENDING:
583 wheres.append('STATUS IS NULL')
584 # desc_affected.append('status is null')
585
586 else:
587 wheres.append('STATUS=:status')
588 bindvars['status'] = name_of_thing(status)
589 # desc_affected.append('status={v}'.format(v=status))
590
591 # change clauses
592 if set_status is not DONT_CHANGE:
593 updates.append('status=:set_status')
594 if set_status is JobStatus.PENDING:
595 bindvars['set_status'] = None
596
597 elif set_status in JobStatus:
598 bindvars['set_status'] = set_status.name
599 # desc_changes.append('status={c}'.format(c=set_status.name))
600
601 else:
602 bindvars['set_status'] = set_status
603 # desc_changes.append('status={c}'.format(c=set_status))
604
605 if set_process_id is not DONT_CHANGE:
606 updates.append('process_id=:set_process_id')
607 bindvars['set_process_id'] = set_process_id
608 # desc_changes.append('process_id={c}'.format(c=set_process_id))
609
610 if set_critical_count is not None:
611 updates.append('LOG_CRITICAL_COUNT=:set_critical_count')
612 bindvars['set_critical_count'] = set_critical_count
613 # desc_changes.append('critical_count={c}'.format(c=set_critical_count))
614
615 if set_error_count is not None:
616 updates.append('LOG_ERROR_COUNT=:set_error_count')
617 bindvars['set_error_count'] = set_error_count
618 # desc_changes.append('error_count={c}'.format(c=set_error_count))
619
620 if set_warning_count is not None:
621 updates.append('LOG_WARNING_COUNT=:set_warning_count')
622 bindvars['set_warning_count'] = set_warning_count
623 # desc_changes.append('warning_count={c}'.format(c=set_warning_count))
624
625 if set_info_count is not None:
626 updates.append('LOG_INFO_COUNT=:set_info_count')
627 bindvars['set_info_count'] = set_info_count
628 # desc_changes.append('info_count={c}'.format(c=set_info_count))
629
630 try:
631 cur = db_conn.query('UPDATE JOBS SET {updates} WHERE {where}'.format(
632 updates=','.join(updates), where=' AND '.join(wheres)),
633 **bindvars)
634 rowcount = cur.rowcount
635 except SQLError as e:
636 logger.warning('Could not write to JOBS table: {e}'.format(e=e))
637 rowcount = 0
638 if 'log_critical_count' in str(e):
639 # As a courtesy, since it's easy to miss breaking changes warnings
640 # in the Changelog file, we allow a partial write to the JOBS table
641 # even if the log file analysis columns are missing
642 logger.info('Attempting to write to JOBS table again without log file analysis')
643 # updates = updates[:-4] # too much hack
644 rowcount = update_jobs(job=job,
645 job_id=job_id,
646 process_id=process_id,
647 category=category,
648 status=status,
649 activity=activity,
650 table=table,
651 sid=sid,
652 sensing_start_ge=sensing_start_ge,
653 sensing_start_lt=sensing_start_lt,
654 filename=filename,
655 orbit=orbit,
656 set_status=set_status,
657 set_process_id=set_process_id,
658 commit=commit,
659 db_conn=db_conn)
660
661 if commit:
662 db_conn.commit()
663
664 return rowcount
665
666
667def insert_job(job,
668 db_conn=default_connection):
669 """Insert a single job, returning its job ID. No duplicate checks are performed."""
670 if insert_job.sql is None:
671 sid_fields, sid_binds = SID.sql_sys_insert('JOBS')
672 insert_columns = ['CATEGORY',
673 'ACTIVITY',
674 'FILENAME',
675 'DIRNAME',
676 'SENSING_START',
677 'SENSING_STOP',
678 'TABLENAME',
679 'EARLIEST_EXECUTION_TIME',
680 'STATUS']
681 if settings.ORBIT_IN_JOBS_TABLE:
682 insert_columns += ['ORBIT']
683
684 if settings.DATABASE_PROJECT_ID is not None:
685 insert_columns += ['PROJECT']
686
687 if settings.DATABASE_JOBS_TABLE_PARENT is not None:
688 insert_columns += ['PARENT']
689
690 binds = [':' + c.lower() for c in insert_columns]
691 insert_job.sql = ('INSERT INTO JOBS ({columns}{sid_columns}) '
692 'VALUES ({values}{sid_binds})').format(
693 columns=','.join(insert_columns),
694 sid_columns=sid_fields,
695 values=','.join(binds),
696 sid_binds=sid_binds)
697
698 values = {'category': job.category,
699 'activity': job.activity.name,
700 'filename': None if job.filename is None else job.filename.name,
701 'dirname': None if job.filename is None else str(job.filename.parent),
702 'sensing_start': job.sensing_start,
703 'sensing_stop': job.sensing_stop,
704 'tablename': job.tablename,
705 'earliest_execution_time': job.earliest_execution_time,
706 'status': (None if job.status is None or job.status is JobStatus.PENDING
707 else job.status.name)}
708 if settings.ORBIT_IN_JOBS_TABLE:
709 values['orbit'] = job.orbit
710
711 if settings.DATABASE_PROJECT_ID is not None:
712 values['project'] = settings.DATABASE_PROJECT_ID
713
714 if settings.DATABASE_JOBS_TABLE_PARENT:
715 values['parent'] = job.parent
716
717 values.update(SID.bind_sys_insert('JOBS', job.sid))
718
719 job_id = db_conn.query_insert_with_return(insert_job.sql, 'ID', **values)
720 return job_id
721
722insert_job.sql = None
723
724
725def add_job(job:Job,
726 commit:bool=False,
727 db_conn:object=default_connection) -> Tuple[bool, Job]:
728 """Higher level function than insert_job. This checks for duplicates, except in the case
729 of ingestion jobs.
730 Return value is a tuple of True if a new job was added, False is the job was already present.
731 The second return value is the ID of the existing job if a duplicate was found,
732 otherwise the ID of the newly created job.
733 """
734 if job.filename is not None:
735 job_id = insert_job(job, db_conn=db_conn)
736 if commit:
737 db_conn.commit()
738
739 logger.info('Added ingestion job {jid} {filename}'.format(
740 jid=job_id,
741 filename=job.filename.name))
742
743 return True, job_id
744
745 # time how long the duplicate check takes
746 dup_start_time = time.time()
747 find_kwargs = {}
748
749 # this may avoid ORA-01858 errors
750 # if job.earliest_execution_time is not None:
751 # job.earliest_execution_time = job.earliest_execution_time.replace(microsecond=0)
752
753 if job.sensing_start is not None and job.orbit is None:
754 # don't bother searching on times if we have orbit
755 if isinstance(job.sensing_start, datetime):
756 find_kwargs['sensing_start'] = job.sensing_start.replace(microsecond=0)
757
758 else:
759 # note, sensing_time can be a date object w/ daily algorithms
760 find_kwargs['sensing_start'] = job.sensing_start
761
762 if job.sensing_stop is not None and job.orbit is None:
763 if isinstance(job.sensing_stop, datetime):
764 find_kwargs['sensing_stop'] = job.sensing_stop.replace(microsecond=0)
765
766 else:
767 find_kwargs['sensing_stop'] = job.sensing_stop
768
769 # Note we don't check for matches on earliest_execution_time, since we don't touch existing
770 # jobs if that is the only difference
771 # Nor do we check dirname since filename should be unique
772 # This count_jobs/find_jobs double is probably a performance optimisation
773 # that probably doesn't do much
774 logger.debug('jobs counting existing jobs prior to inserting {j}'.format(j=job))
775 if count_jobs(category=job.category,
776 status=JobStatus.PENDING,
777 sid=job.sid,
778 activity=job.activity.name,
779 filename=None if job.filename is None else job.filename.name,
780 orbit=job.orbit,
781 tablename=job.tablename,
782 db_conn=db_conn,
783 **find_kwargs) > 0:
784 logger.debug('Duplicate job {job}'.format(job=job))
785 existing_id = find_jobs(fields=('ID',),
786 category=job.category,
787 status=JobStatus.PENDING,
788 sid=job.sid,
789 activity=job.activity.name,
790 filename=None if job.filename is None else job.filename.name,
791 orbit=job.orbit,
792 tablename=job.tablename,
793 db_conn=db_conn,
794 **find_kwargs).fetchone()[0]
795 return False, existing_id
796
797 logger.debug('No a duplicate')
798
799 dup_dur = time.time() - dup_start_time
800 if dup_dur > 1.0:
801 logger.debug('Duplicate test took {dur:.3f}s'.format(dur=dup_dur))
802
803 ins_start_time = time.time()
804
805 print_params = ('filename', 'sid', 'orbit', 'sensing_start', 'sensing_stop', 'tablename',
806 'earliest_execution_time')
807
808 job_id = insert_job(job, db_conn=db_conn)
809 if commit:
810 db_conn.commit()
811
812 logger.info('Added job {job} {cat} {act} {params}'.format(
813 job=job_id,
814 cat=job.category,
815 act=name_of_thing(job.activity),
816 params=' '.join(('{k}:{v}'.format(
817 k=k,
818 v=getattr(job, k)) for k in print_params
819 if getattr(job, k) is not None))))
820 return True, job_id