1#!/usr/bin/env python3
  2
  3"""Read and write to the JOBS table."""
  4
  5import time
  6import logging
  7from datetime import datetime
  8from typing import Tuple
  9
 10from chart.common.traits import is_listlike
 11from chart.common.traits import name_of_thing
 12from chart.backend.job import JobStatus
 13from chart.db.connection import db_connect
 14from chart.db.func import ANY
 15from chart.db.func import DONT_CHANGE
 16from chart.db.func import Func
 17from chart.db.settings import DatabaseEngine
 18from chart.db.exceptions import DatabaseError
 19from chart.db.exceptions import SQLError
 20from chart.project import settings
 21from chart.project import SID
 22from chart.backend.activity import Activity
 23from chart.backend.activity import DEFAULT_PRIORITY
 24from chart.backend.job import Job
 25from chart.db.func import Reversed
 26
 27default_connection = db_connect('JOBS')
 28
 29logger = logging.getLogger()
 30
 31
 32def find_jobs(fields,  # (too many args) pylint: disable=R0913
 33              sid=ANY,
 34              job_id=None,
 35              job_id_ne=None,
 36              category=None,
 37              activity=None,
 38              activity_ne=None,
 39              status=None,
 40              filename=ANY,
 41              filename_ne=ANY,
 42              filename_like=None,
 43              tablename=None,
 44              tablename_like=None,
 45              tablename_ne=None,
 46              sensing_start=None,
 47              sensing_start_ge=None,
 48              sensing_start_lt=None,
 49              sensing_stop=None,
 50              earliest_execution_time_null_le=None,
 51              gen_time_ge=None,
 52              gen_time_gt=None,
 53              gen_time_lt=None,
 54              sensing_start_or_gen_time_ge=None,
 55              sensing_start_or_gen_time_lt=None,
 56              orbit=None,
 57              orbit_ne=None,
 58              process_id=None,
 59              parent=None,
 60              parent_ne=ANY,
 61              group_by=None,
 62              order_by=None,
 63              limit=None,
 64              clauses=None,
 65              bindvars=None,
 66              db_conn=default_connection,
 67              lock=False):
 68    """Return a cursor of rows from the JOBS table.
 69
 70    All parameters mean look for jobs where field equals parameter value, unless the
 71    parameter ends in '_ne' (not equal), '_like' (SQL like), '_ge' (greater than or equal),
 72    '_lt' (less than).
 73
 74    Args:
 75      - `activity` (str or list of str): Literal strings to search for unless they contain
 76          '%' in which case they become LIKE clauses.
 77      - `clauses` (list of str): Raw SQL clauses
 78      - `bindvars` (dict): Bind variables to go with `clauses`
 79
 80    Everything else is a single item of type str, int or datetime as appropriate.
 81
 82    Returns:
 83        Cursor
 84    """
 85
 86    if bindvars is None:
 87        bindvars = {}
 88
 89    if clauses is None:
 90        clauses = []
 91
 92    if job_id is not None:
 93        clauses.append('id=:jobid')
 94        bindvars['jobid'] = job_id
 95
 96    if job_id_ne is not None:
 97        clauses.append('id!=:jobidne')
 98        bindvars['jobidne'] = job_id_ne
 99
100    if settings.DATABASE_PROJECT_ID is not None:
101        clauses.append('project=:project')
102        bindvars['project'] = settings.DATABASE_PROJECT_ID
103
104    if category is not None:
105        clauses.append('category=:category')
106        bindvars['category'] = category.upper()
107
108    elif job_id is None and process_id is None:
109        logger.warning('find_jobs called with no CATEGORY set')
110
111    if activity is not None:
112        # activity could be an Activity object, a simple string or a string with a '%' char
113        # for a LIKE clause, or a list of all these things.
114        if isinstance(activity, str):
115            # activity is a simple string, test for wildcards
116            if '%' in activity:
117                clauses.append('ACTIVITY LIKE :activity')
118
119            else:
120                clauses.append('activity=:activity')
121
122            bindvars['activity'] = activity
123
124        elif isinstance(activity, Activity):
125            # activity is a simple Activity object
126            clauses.append('activity=:activity')
127            bindvars['activity'] = activity.name
128
129        elif len(activity) > 0:
130            # activity is a list. Build a set of activity specific clauses.
131            # activity_clauses is used to accumulate separate activity LIKE clauses
132            activity_clauses = []
133            # plain activities are bunched up and used to build a single activity in list clause
134            # at the end
135            plain_activities = []
136
137            # bind variable count.
138            cc = [1]  # mutable in inner function
139
140            def act_bind(value):
141                """Add a new bind variable just for activities."""
142                new_name = 'activity_{cc}'.format(cc=cc[0])
143                cc[0] += 1
144                bindvars[new_name] = value
145                return ':{name}'.format(name=new_name)
146
147            for a in activity:
148                if isinstance(a, Activity):
149                    plain_activities.append(a.name)
150
151                elif '%' in a:
152                    activity_clauses.append('ACTIVITY LIKE {a}'.format(a=act_bind(a)))
153
154                else:
155                    plain_activities.append(a)
156
157            # assemble the non-wildcard requests into a single clause (either activity in list
158            # or activity = x)
159            # The combined plain activity(s) clause is appended to the wildcard activities clauses
160            if len(plain_activities) > 1:
161                activity_clauses.append('ACTIVITY IN ({acts})'.format(
162                        acts=','.join(act_bind(a) for a in plain_activities)))
163
164            elif len(plain_activities) == 1:
165                activity_clauses.append('ACTIVITY={a}'.format(a=act_bind(plain_activities[0])))
166
167            # this is just to avoid create unnecessary brackets if there is only one clause...
168            if len(activity_clauses) > 1:
169                clauses.append('({all})'.format(all=' OR '.join(activity_clauses)))
170
171            else:
172                clauses.append(activity_clauses[0])
173
174    if activity_ne is not None:
175        # activity could be an Activity object, a simple string or a string with a '%' char
176        # for a LIKE clause, or a list of all these things.
177        if isinstance(activity, str):
178            if '%' in activity:
179                clauses.append('ACTIVITY NOT LIKE :activity')
180
181            else:
182                clauses.append('activity!=:activity')
183
184            bindvars['activity'] = activity
185
186        elif isinstance(activity, Activity):
187            clauses.append('activity!=:activity')
188            bindvars['activity'] = activity.name
189
190        else:
191            activity_ne_clauses = []
192            plain_activities = []
193
194            cc = [1]  # mutable in inner function
195
196            def act_ne_bind(value):
197                """Add a new bind variable."""
198                new_name = 'activity_ne_{cc}'.format(cc=cc[0])
199                cc[0] += 1
200                bindvars[new_name] = value
201                return ':{name}'.format(name=new_name)
202
203            for a in activity_ne:
204                if isinstance(a, Activity):
205                    plain_activities.append(a.name)
206
207                elif '%' in a:
208                    activity_ne_clauses.append('ACTIVITY NOT LIKE {a}'.format(a=act_ne_bind(a)))
209
210                else:
211                    plain_activities.append(a)
212
213            if len(plain_activities) > 1:
214                activity_ne_clauses.append('ACTIVITY NOT IN ({acts})'.format(
215                        acts=','.join(act_ne_bind(a) for a in plain_activities)))
216
217            elif len(plain_activities) == 1:
218                activity_ne_clauses.append('ACTIVITY!={a}'.format(
219                        a=act_ne_bind(plain_activities[0])))
220
221            if len(activity_ne_clauses) > 1:
222                clauses.append('({all})'.format(all=' AND '.join(activity_ne_clauses)))
223
224            elif len(activity_ne_clauses) == 1:
225                clauses.append(activity_ne_clauses[0])
226
227    if sid is not ANY:
228        # may be SIDLESS
229        clauses.append(SID.sql_sys_where('JOBS', sid))
230
231    if filename is not ANY:
232        if filename is None:
233            clauses.append('FILENAME IS NULL')
234
235        else:
236            clauses.append('FILENAME=:filename')
237            bindvars['filename'] = filename
238
239    if filename_ne is not ANY:
240        if filename_ne is None:
241            clauses.append('FILENAME IS NOT NULL')
242
243        else:
244            clauses.append('FILENAME!=:filename_ne')
245            bindvars['filename_ne'] = filename_ne
246
247    if filename_like is not None:
248        clauses.append('FILENAME LIKE :filename_like')
249        bindvars['filename_like'] = filename_like
250
251    if tablename is not None:
252        clauses.append('TABLENAME=:tablename')
253        bindvars['tablename'] = name_of_thing(tablename)
254
255    if tablename_like is not None:
256        clauses.append('TABLENAME LIKE :tablename_like')
257        bindvars['tablename_like'] = name_of_thing(tablename_like)
258
259    if tablename_ne is not None:
260        clauses.append('TABLENAME!=:tablename_ne')
261        bindvars['tablename_ne'] = name_of_thing(tablename_ne)
262
263    if sensing_start is not None:
264        clauses.append('SENSING_START=:sens')
265        bindvars['sens'] = sensing_start
266
267    if sensing_start_ge is not None:
268        clauses.append('SENSING_START>=:min_sens')
269        bindvars['min_sens'] = sensing_start_ge
270
271    if sensing_start_lt is not None:
272        clauses.append('SENSING_START<:max_sens')
273        bindvars['max_sens'] = sensing_start_lt
274
275    if sensing_stop is not None:
276        clauses.append('SENSING_STOP=:sens_stop')
277        bindvars['sens_stop'] = sensing_stop
278
279    if earliest_execution_time_null_le is not None:
280        clauses.append('(EARLIEST_EXECUTION_TIME IS NULL OR '
281                       'EARLIEST_EXECUTION_TIME<=:early_exe_time)')
282        bindvars['early_exe_time'] = earliest_execution_time_null_le
283
284    if gen_time_ge is not None:
285        clauses.append('GEN_TIME>=:min_gen')
286        bindvars['min_gen'] = gen_time_ge
287
288    if gen_time_gt is not None:
289        clauses.append('GEN_TIME>:min_gen2')
290        bindvars['min_gen2'] = gen_time_gt
291
292    if gen_time_lt is not None:
293        clauses.append('GEN_TIME<:max_gen')
294        bindvars['max_gen'] = gen_time_lt
295
296    if sensing_start_or_gen_time_ge is not None:
297        clauses.append('((SENSING_START IS NOT NULL AND SENSING_START>=:sens_gen_ge) OR '
298                       '(GEN_TIME>=:sens_gen_ge))')
299        bindvars['sens_gen_ge'] = sensing_start_or_gen_time_ge
300
301    if sensing_start_or_gen_time_lt is not None:
302        clauses.append('((SENSING_START IS NOT NULL AND SENSING_START<:sens_gen_lt) OR '
303                       '(GEN_TIME<:sens_gen_lt))')
304        bindvars['sens_gen_lt'] = sensing_start_or_gen_time_lt
305
306    if sensing_start_lt is not None:
307        clauses.append('SENSING_START<:max_sens')
308        bindvars['max_sens'] = sensing_start_lt
309
310    if settings.ORBIT_IN_JOBS_TABLE:
311        if orbit is not None:
312            clauses.append('ORBIT=:orbit')
313            bindvars['orbit'] = orbit
314
315        if orbit_ne is not None:
316            clauses.append('ORBIT!=:orbit_ne')
317            bindvars['orbit_ne'] = orbit_ne
318
319    if process_id is not None:
320        clauses.append('PROCESS_ID=:process_id')
321        bindvars['process_id'] = process_id
322
323    if status is not None:
324        if status is JobStatus.PENDING:
325            clauses.append('STATUS IS NULL')
326
327        else:
328            clauses.append('STATUS=:status')
329            bindvars['status'] = name_of_thing(status)
330
331    if parent is not None:
332        clauses.append('PARENT=:parent')
333        bindvars['parent'] = parent
334
335    if parent_ne is not ANY:
336        if parent_ne is None:
337            clauses.append('PARENT is NULL')
338
339        else:
340            raise NotImplementedError()
341
342    if group_by is not None:
343        if not is_listlike(group_by):
344            group_by = (group_by, )
345
346        group_by = ' GROUP BY {groups}'.format(groups=','.join(group_by))
347
348    else:
349        group_by = ''
350
351    if order_by is not None:
352        if is_listlike(order_by):
353            order_reqs = order_by
354
355        else:
356            order_reqs = (order_by, )
357
358        order_clauses = []
359        for order_req in order_reqs:
360            if isinstance(order_req, str):
361                order_clauses.append(order_req)
362
363            else:
364                order_clauses.append(order_req())
365
366        order_by = ' ORDER BY {orders}'.format(orders=','.join(order_clauses))
367
368    elif limit is None:
369        order_by = ''
370
371    else:
372        non_default_priorities = []
373        for a in Activity.all():
374            if a.priority != DEFAULT_PRIORITY:
375                non_default_priorities.append(a)
376
377        if len(non_default_priorities) == 0:
378            order_by = ' ORDER BY ID'
379
380        else:
381            order_by = ' ORDER BY CASE activity {clauses} ELSE {default} END, ID'.format(
382                clauses=''.join(' WHEN \'{name}\' THEN -{priority} '.format(
383                    name=a.name, priority=a.priority) for a in non_default_priorities),
384                default=DEFAULT_PRIORITY)
385
386    # logger.debug('find_jobs lock {l}'.format(l=lock))
387    if lock is True and db_conn.engine is not DatabaseEngine.ORACLE:
388        sql_lock = ' FOR UPDATE'
389
390    else:
391        sql_lock = ''
392
393    # Each element of `fields` can be a plain string column name or a Func object
394    # with an sql() member to return it's actual SQL code
395
396    if limit is None:
397        return db_conn.query('SELECT {fields} FROM JOBS WHERE {where}{group}{order}{lock}'.format(
398            fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
399            where=' AND '.join(clauses),
400            group=group_by,
401            order=order_by,
402            lock=sql_lock), **bindvars)
403
404    bindvars['limit'] = limit
405    if db_conn.engine is DatabaseEngine.ORACLE:
406        # oracle doesn't have 'LIMIT'
407        # mysql doesn't have ROW_NUMBER
408        # postgres accepts both
409        return db_conn.query(
410            'SELECT {fields} FROM ('
411            'SELECT {fields},ROW_NUMBER() OVER ({order}) rn '
412            'FROM JOBS '
413            'WHERE {where}{group}{order}) q '
414            'WHERE rn<=:limit'.format(
415                fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
416                where=' AND '.join(clauses),
417                group=group_by,
418                order=order_by),
419            **bindvars)
420
421    return db_conn.query('SELECT {fields} '
422                         'FROM JOBS '
423                         'WHERE {where}{group}{order} LIMIT :limit{lock}'.format(
424                             fields=','.join(f.sql() if isinstance(f, Func) else f for f in fields),
425                             where=' AND '.join(clauses),
426                             group=group_by,
427                             order=order_by,
428                             lock=sql_lock),
429                         **bindvars)
430
431
432def count_jobs(**kwargs):
433    """Count the number of jobs, using the same criteria as find_jobs()."""
434    cursor = find_jobs(fields=('count(*)', ),
435                       **kwargs)
436    result = cursor.fetchone()[0]
437    cursor.close()
438    del cursor
439    return result
440
441
442def delete_jobs(category,
443                gen_time_lt,
444                commit=True,
445                db_conn=default_connection):
446    """Delete jobs.
447    Args:
448
449        `category` (str): Category
450        `gen_time_lt` (datetime): Delete jobs whose gen_time is lower than this value.
451        `commit` (bool): Commit db changes
452
453    Returns:
454        Number of jobs deleted.
455    """
456
457    cur = db_conn.query('DELETE FROM jobs WHERE category=:category AND gen_time<:gen_time_lt',
458                        category=category,
459                        gen_time_lt=gen_time_lt)
460    if commit:
461        db_conn.commit()
462
463    return cur.rowcount
464
465
466def update_jobs(job=None,
467                job_id=None,
468                process_id=None,
469                category=None,
470                status=None,
471                activity=None,
472                table=None,
473                sid=None,
474                sensing_start_ge=None,
475                sensing_start_lt=None,
476                filename=None,
477                orbit=None,
478                set_status=DONT_CHANGE,
479                set_process_id=DONT_CHANGE,
480                set_critical_count=None,
481                set_error_count=None,
482                set_warning_count=None,
483                set_info_count=None,
484                commit=True,
485                db_conn=default_connection):
486    """Change JOBS table entries matching args.
487
488    Args:
489        `job` (Job or list of Job): Jobs to modify
490        `job_id` (int): Job id to change
491        `process_id` (int): Search for any existing job of this ID
492        `status` (JobStatus): Status search filter
493        `activity` (Activity or str or list of Activity or str): Activity search filter
494        `table` (str): Table name to filter for
495        `sid` (SID): Filter for source ID
496        `sensing_start_ge` (datetime): Min start time
497        `sensing_start_lt` (datetime): Max start time
498        `filename` (str): Filter for filename field
499        `orbit` (int): Filter for orbit
500        `set_process_id` (int): New process id
501        `set_status` (JobStatus|None): New job status
502
503    Returns:
504        Number of rows changed.
505    """
506    wheres = []
507    updates = []
508    bindvars = {}
509    # desc_affected = []
510    # desc_changes = []
511
512    # search clauses
513    if job_id is not None:
514        if is_listlike(job_id):
515            wheres.append('ID IN ({list})'.format(list=','.join(str(j) for j in job_id)))
516            # desc_affected.extend(job_id)
517
518        elif isinstance(job_id, int):
519            wheres.append('ID=:job_id')
520            bindvars['job_id'] = job_id
521            # desc_affected.append(job_id)
522
523        else:
524            raise ValueError('job_id must be an integer')
525
526    if job is not None:
527        if is_listlike(job):
528            wheres.append('ID IN ({list})'.format(list=','.join(str(j.job_id) for j in job)))
529            # desc_affected.extend([j.job_id for j in job])
530
531        else:
532            wheres.append('ID=:job_id')
533            bindvars['job_id'] = job.job_id
534            # desc_affected.append(job.job_id)
535
536    if sid is not None:
537        wheres.append(SID.sql_sys_where('JOBS', sid))
538        # desc_affected.append('sid={s}'.format(s=sid))
539
540    if process_id is not None:
541        wheres.append('PROCESS_ID=:process_id')
542        bindvars['process_id'] = process_id
543        # desc_affected.append('process_id={v}'.format(v=process_id))
544
545    if category is not None:
546        wheres.append('CATEGORY=:category')
547        bindvars['category'] = category
548        # desc_affected.append('category_id={v}'.format(v=category))
549
550    if filename is not None:
551        wheres.append('FILENAME=:filename')
552        bindvars['filename'] = filename
553
554    if activity is not None:
555        if is_listlike(activity):
556            if len(activity) > 0:
557                wheres.append('ACTIVITY IN ({bits})'.format(
558                    bits=','.join(('\'{a}\''.format(a=name_of_thing(a)) for a in activity))))
559
560        else:
561            wheres.append('ACTIVITY=:activity')
562            bindvars['activity'] = name_of_thing(activity)
563
564    if sensing_start_ge is not None:
565        wheres.append('SENSING_START>=:ge')
566        bindvars['ge'] = sensing_start_ge
567
568    if sensing_start_lt is not None:
569        wheres.append('SENSING_START<:lt')
570        bindvars['lt'] = sensing_start_lt
571
572    if table is not None:
573        wheres.append('TABLENAME=:tablename')
574        bindvars['tablename'] = name_of_thing(table)
575        # desc_affected.append('tablename={v}'.format(v=table))
576
577    if orbit is not None:
578        wheres.append('ORBIT=:orbit')
579        bindvars['orbit'] = orbit
580
581    if status is not None:
582        if status is JobStatus.PENDING:
583            wheres.append('STATUS IS NULL')
584            # desc_affected.append('status is null')
585
586        else:
587            wheres.append('STATUS=:status')
588            bindvars['status'] = name_of_thing(status)
589            # desc_affected.append('status={v}'.format(v=status))
590
591    # change clauses
592    if set_status is not DONT_CHANGE:
593        updates.append('status=:set_status')
594        if set_status is JobStatus.PENDING:
595            bindvars['set_status'] = None
596
597        elif set_status in JobStatus:
598            bindvars['set_status'] = set_status.name
599            # desc_changes.append('status={c}'.format(c=set_status.name))
600
601        else:
602            bindvars['set_status'] = set_status
603            # desc_changes.append('status={c}'.format(c=set_status))
604
605    if set_process_id is not DONT_CHANGE:
606        updates.append('process_id=:set_process_id')
607        bindvars['set_process_id'] = set_process_id
608        # desc_changes.append('process_id={c}'.format(c=set_process_id))
609
610    if set_critical_count is not None:
611        updates.append('LOG_CRITICAL_COUNT=:set_critical_count')
612        bindvars['set_critical_count'] = set_critical_count
613        # desc_changes.append('critical_count={c}'.format(c=set_critical_count))
614
615    if set_error_count is not None:
616        updates.append('LOG_ERROR_COUNT=:set_error_count')
617        bindvars['set_error_count'] = set_error_count
618        # desc_changes.append('error_count={c}'.format(c=set_error_count))
619
620    if set_warning_count is not None:
621        updates.append('LOG_WARNING_COUNT=:set_warning_count')
622        bindvars['set_warning_count'] = set_warning_count
623        # desc_changes.append('warning_count={c}'.format(c=set_warning_count))
624
625    if set_info_count is not None:
626        updates.append('LOG_INFO_COUNT=:set_info_count')
627        bindvars['set_info_count'] = set_info_count
628        # desc_changes.append('info_count={c}'.format(c=set_info_count))
629
630    try:
631        cur = db_conn.query('UPDATE JOBS SET {updates} WHERE {where}'.format(
632            updates=','.join(updates), where=' AND '.join(wheres)),
633                            **bindvars)
634        rowcount = cur.rowcount
635    except SQLError as e:
636        logger.warning('Could not write to JOBS table: {e}'.format(e=e))
637        rowcount = 0
638        if 'log_critical_count' in str(e):
639            # As a courtesy, since it's easy to miss breaking changes warnings
640            # in the Changelog file, we allow a partial write to the JOBS table
641            # even if the log file analysis columns are missing
642            logger.info('Attempting to write to JOBS table again without log file analysis')
643            # updates = updates[:-4]  # too much hack
644            rowcount = update_jobs(job=job,
645                                   job_id=job_id,
646                                   process_id=process_id,
647                                   category=category,
648                                   status=status,
649                                   activity=activity,
650                                   table=table,
651                                   sid=sid,
652                                   sensing_start_ge=sensing_start_ge,
653                                   sensing_start_lt=sensing_start_lt,
654                                   filename=filename,
655                                   orbit=orbit,
656                                   set_status=set_status,
657                                   set_process_id=set_process_id,
658                                   commit=commit,
659                                   db_conn=db_conn)
660
661    if commit:
662        db_conn.commit()
663
664    return rowcount
665
666
667def insert_job(job,
668               db_conn=default_connection):
669    """Insert a single job, returning its job ID. No duplicate checks are performed."""
670    if insert_job.sql is None:
671        sid_fields, sid_binds = SID.sql_sys_insert('JOBS')
672        insert_columns = ['CATEGORY',
673                          'ACTIVITY',
674                          'FILENAME',
675                          'DIRNAME',
676                          'SENSING_START',
677                          'SENSING_STOP',
678                          'TABLENAME',
679                          'EARLIEST_EXECUTION_TIME',
680                          'STATUS']
681        if settings.ORBIT_IN_JOBS_TABLE:
682            insert_columns += ['ORBIT']
683
684        if settings.DATABASE_PROJECT_ID is not None:
685            insert_columns += ['PROJECT']
686
687        if settings.DATABASE_JOBS_TABLE_PARENT is not None:
688            insert_columns += ['PARENT']
689
690        binds = [':' + c.lower() for c in insert_columns]
691        insert_job.sql = ('INSERT INTO JOBS ({columns}{sid_columns}) '
692                          'VALUES ({values}{sid_binds})').format(
693                              columns=','.join(insert_columns),
694                              sid_columns=sid_fields,
695                              values=','.join(binds),
696                              sid_binds=sid_binds)
697
698    values = {'category': job.category,
699              'activity': job.activity.name,
700              'filename': None if job.filename is None else job.filename.name,
701              'dirname': None if job.filename is None else str(job.filename.parent),
702              'sensing_start': job.sensing_start,
703              'sensing_stop': job.sensing_stop,
704              'tablename': job.tablename,
705              'earliest_execution_time': job.earliest_execution_time,
706              'status': (None if job.status is None or job.status is JobStatus.PENDING
707                         else job.status.name)}
708    if settings.ORBIT_IN_JOBS_TABLE:
709        values['orbit'] = job.orbit
710
711    if settings.DATABASE_PROJECT_ID is not None:
712        values['project'] = settings.DATABASE_PROJECT_ID
713
714    if settings.DATABASE_JOBS_TABLE_PARENT:
715        values['parent'] = job.parent
716
717    values.update(SID.bind_sys_insert('JOBS', job.sid))
718
719    job_id = db_conn.query_insert_with_return(insert_job.sql, 'ID', **values)
720    return job_id
721
722insert_job.sql = None
723
724
725def add_job(job:Job,
726            commit:bool=False,
727            db_conn:object=default_connection) -> Tuple[bool, Job]:
728    """Higher level function than insert_job. This checks for duplicates, except in the case
729    of ingestion jobs.
730    Return value is a tuple of True if a new job was added, False is the job was already present.
731    The second return value is the ID of the existing job if a duplicate was found,
732    otherwise the ID of the newly created job.
733    """
734    if job.filename is not None:
735        job_id = insert_job(job, db_conn=db_conn)
736        if commit:
737            db_conn.commit()
738
739        logger.info('Added ingestion job {jid} {filename}'.format(
740            jid=job_id,
741            filename=job.filename.name))
742
743        return True, job_id
744
745    # time how long the duplicate check takes
746    dup_start_time = time.time()
747    find_kwargs = {}
748
749    # this may avoid ORA-01858 errors
750    # if job.earliest_execution_time is not None:
751        # job.earliest_execution_time = job.earliest_execution_time.replace(microsecond=0)
752
753    if job.sensing_start is not None and job.orbit is None:
754        # don't bother searching on times if we have orbit
755        if isinstance(job.sensing_start, datetime):
756            find_kwargs['sensing_start'] = job.sensing_start.replace(microsecond=0)
757
758        else:
759            # note, sensing_time can be a date object w/ daily algorithms
760            find_kwargs['sensing_start'] = job.sensing_start
761
762    if job.sensing_stop is not None and job.orbit is None:
763        if isinstance(job.sensing_stop, datetime):
764            find_kwargs['sensing_stop'] = job.sensing_stop.replace(microsecond=0)
765
766        else:
767            find_kwargs['sensing_stop'] = job.sensing_stop
768
769    # Note we don't check for matches on earliest_execution_time, since we don't touch existing
770    # jobs if that is the only difference
771    # Nor do we check dirname since filename should be unique
772    # This count_jobs/find_jobs double is probably a performance optimisation
773    # that probably doesn't do much
774    logger.debug('jobs counting existing jobs prior to inserting {j}'.format(j=job))
775    if count_jobs(category=job.category,
776                  status=JobStatus.PENDING,
777                  sid=job.sid,
778                  activity=job.activity.name,
779                  filename=None if job.filename is None else job.filename.name,
780                  orbit=job.orbit,
781                  tablename=job.tablename,
782                  db_conn=db_conn,
783                  **find_kwargs) > 0:
784        logger.debug('Duplicate job {job}'.format(job=job))
785        existing_id = find_jobs(fields=('ID',),
786                                category=job.category,
787                                status=JobStatus.PENDING,
788                                sid=job.sid,
789                                activity=job.activity.name,
790                                filename=None if job.filename is None else job.filename.name,
791                                orbit=job.orbit,
792                                tablename=job.tablename,
793                                db_conn=db_conn,
794                                **find_kwargs).fetchone()[0]
795        return False, existing_id
796
797    logger.debug('No a duplicate')
798
799    dup_dur = time.time() - dup_start_time
800    if dup_dur > 1.0:
801        logger.debug('Duplicate test took {dur:.3f}s'.format(dur=dup_dur))
802
803    ins_start_time = time.time()
804
805    print_params = ('filename', 'sid', 'orbit', 'sensing_start', 'sensing_stop', 'tablename',
806                    'earliest_execution_time')
807
808    job_id = insert_job(job, db_conn=db_conn)
809    if commit:
810        db_conn.commit()
811
812    logger.info('Added job {job} {cat} {act} {params}'.format(
813        job=job_id,
814        cat=job.category,
815        act=name_of_thing(job.activity),
816        params=' '.join(('{k}:{v}'.format(
817            k=k,
818            v=getattr(job, k)) for k in print_params
819                         if getattr(job, k) is not None))))
820    return True, job_id