1#!/usr/bin/env python3
  2
  3"""Job processing code. The worker reads entries from the JOBS table
  4and executes them.
  5Each worker only processes jobs from a single category and ignores others.
  6Jobs are selected based on a hard coded priority (ingestion first, then
  7per-orbit algorithms, then daily, then weekly, then other time based jobs.
  8
  9After each job is completed it may trigger other jobs which are placed in the JOBS
 10table.
 11
 12Jobs are batched up according to hard coded rules with similar ingestions
 13and per-orbit algorithms run together.
 14
 15The worker can run either in a single job mode completing an individual specified
 16job, or it can keep rescanning the JOBS table until all work is done and exit,
 17or it can loop indefinitely.
 18
 19Statistics handling
 20-------------------
 21
 22Orbital and daily statistics are handled as a special case, although the have
 23a activity files also.
 24
 25Stats handling hooks into the job control mechanism in the following ways:
 26
 27 * The `find_similar_jobs()` function knows that multiple stats jobs with the
 28   same `tablename` attribute can be processed as a single invocation of the activity.
 29"""
 30
 31import os
 32import time
 33import logging
 34import importlib
 35from datetime import datetime
 36from fnmatch import fnmatch
 37
 38import django
 39
 40# This should not be needed but it is to make worker.log use UTC time
 41# when run under the supervisor on the TST environment on tcprotos
 42# os.environ['TZ'] = 'UTC'
 43# time.tzset()
 44
 45from chart.db.settings import DatabaseEngine
 46from chart.backend.dispatcher import dispatch
 47from chart.common.util import ensure_dir_exists
 48from chart.backend.activity import Activity
 49from chart.backend.activity import CallingConvention
 50from chart.backend.activity import DEFAULT_WEIGHT
 51from chart.common.args import ArgumentParser
 52from chart.backend.job import JobStatus
 53from chart.backend.jobs import update_jobs
 54from chart.common.log import log_exception
 55from chart.common.prettyprint import Table
 56from chart.backend.processes import ProcessStatus
 57from chart.backend.processes import find_processes
 58from chart.backend.processes import insert_process
 59from chart.backend.processes import update_process
 60from chart import settings
 61from chart.backend.jobs import add_job
 62from chart.db.connection import db_connect
 63from chart.common.log import init_log
 64
 65logger = logging.getLogger()
 66
 67# Every worker needs a name for the PROCESSES table. Here we set a default name
 68# which can be overridden with a command line flag
 69DEFAULT_NAME = 'worker'
 70
 71# if there is a database problem pause for this many seconds
 72RETRY_PAUSE = 900
 73
 74# Number of jobs to be batched together into a single subprocess
 75DEFAULT_BATCH_SIZE = 100
 76
 77
 78class BadJob(Exception):
 79    """Exception if client code tries to create an invalid job dict."""
 80
 81    def __init__(self, activity, message):
 82        super(BadJob, self).__init__()
 83        self.activity = activity
 84        self.message = message
 85
 86    def __str__(self):
 87        return '{act}: {message}'.format(act=self.activity.name, message=self.message)
 88
 89
 90def module(module_name):
 91    """Cached load of `module_name` from settings."""
 92    result = module.cache.get(module_name)
 93    if result is not None:
 94        return result
 95
 96    mod = importlib.import_module(module_name)
 97    module.cache[module_name] = mod
 98    return mod
 99
100module.cache = {}
101
102
103def make_working_directory(program):
104    """Create a working directory inside settings.WORK_DIR."""
105    basedir = settings.WORK_AREA.child(program)
106    prefix = str(os.getpid())
107    # if WORK_DIR.PID exists, try WORK_DIR.PID.x
108    if basedir.child(prefix).exists():
109        cc = 1
110        while basedir.child('{prefix}.{cc}'.format(prefix=prefix, cc=cc)).exists():
111            cc += 1
112
113        prefix = '{prefix}.{cc}'.format(prefix=prefix, cc=cc)
114
115    working_dir = basedir.child(prefix)
116    ensure_dir_exists(working_dir)
117
118    logger.info('Using {d} as working directory'.format(d=working_dir))
119    return working_dir
120
121
122def find_derived_jobs(job):
123    """Run the per-project job chain function."""
124    return module(settings.WORKER_JOB_CHAIN).find_derived_jobs(job)
125
126
127def disallowed_activities(exclude_activities,
128                          max_weight):
129    """Return a list of activities not allowed under the given exclusion and max weight.
130
131    The input list `exclude_activities` is modified by this function.
132
133    This is a bit ugly but is done this way to avoid introducing changes that break any projects.
134
135    The combination of options only_activities, exclude_activities, exclude_ingestion and weighting
136    could all be handled more neatly by channelling them all through this function.
137
138    Also list_activities() should really use this function instead of having it's own algorithm.
139    """
140    # This is all a bit ugly.
141    # An activity will end up being listed twice if it's in exclude_activities and also too heavy
142    # but that doesn't really matter
143    if exclude_activities is None:
144        exclude_activities = []
145
146    for a in Activity.all():
147        if max_weight is not None and a.weight > max_weight:
148            exclude_activities.append(a)
149
150    return exclude_activities
151
152
153def select_next_job(category,
154                    now=None,
155                    only_activities=None,
156                    exclude_activities=None,
157                    only_ingestion=False,
158                    exclude_ingestion=False,
159                    only_filenames=None,
160                    db_conn=None,
161                    lock=False):
162    """Decide what, if anything, to do now."""
163    if db_conn is None:
164        db_conn = db_connect('JOBS')
165
166    if now is None:
167        now = datetime.utcnow()
168
169    return module(settings.WORKER_JOB_PRIORITY).select_next_job(
170        category,
171        now,
172        only_activities,
173        exclude_activities,
174        only_ingestion,
175        exclude_ingestion,
176        only_filenames,
177        db_conn,
178        lock)
179
180
181def find_similar_jobs(job, count, now=None, only_filenames=None):
182    """Assemble a batch of jobs to be executed together."""
183    if now is None:
184        now = datetime.utcnow()
185
186    return module(settings.WORKER_JOB_PRIORITY).find_similar_jobs(
187        job, count, now, only_filenames)
188
189
190def process_jobs(worker_name,
191                 jobs,
192                 work_dir_base,
193                 jobs_lock=None,
194                 sendmails=False,
195                 db_conn=None):
196    """Perform `jobs`, doing the immediate work, adding derived jobs, then
197    recording the original jobs as complete.
198    Job must all be similar, so must have the same category, activity, and
199    the same tablename if statistics jobs.
200    The working directory used will be a subdirectory of `work_dir_base`.
201
202    Args:
203
204        `worker_name` (str): Name of the worker process for the PROCESSES table
205        `jobs` (list of Job): Jobs to be executed as a batch
206        `work_dir_base` (Path): Parent of any working directories to be made
207        `jobs_lock` (Lock): If in use, mutex lock for touching the JOBS table
208
209    Returns:
210        None.
211    """
212    # Remove circular import
213    from chart.users.role import RoleManager
214
215    if db_conn is None:
216        db_conn = db_connect('PROCESSES')
217
218    # Insert an entry to the PROCESSES table
219    proc_id = insert_process(worker_name, db_conn=db_conn, commit=False)
220
221    work_dir = work_dir_base.absolute().child(
222        '{act}_{proc}'.format(act=jobs[0].activity.name, proc=proc_id))
223
224    update_process(process_id=proc_id, set_working_dir=work_dir, db_conn=db_conn, commit=False)
225    logging.info('Processing {cc} jobs with activity {act} assigning process ID {id}'.format(
226            cc=len(jobs),
227            act=jobs[0].activity.name,
228            id=proc_id))
229
230    # mark the jobs we are about to run as being in progress
231    update_jobs(job=jobs, set_process_id=proc_id, set_status=JobStatus.IN_PROGRESS, db_conn=db_conn)
232
233    if jobs_lock is not None:
234        jobs_lock.release()
235
236    old_dir = os.getcwd()
237
238    # Run the jobs
239    # if settings.DEBUG:
240        # process_res = dispatch(
241            # jobs, work_dir=work_dir, ingest=True, use_subprocess=True)
242
243    # else:
244    try:
245        process_res = dispatch(
246            jobs, work_dir=work_dir, ingest=True, use_subprocess=True, sendmails=True)
247    except Exception as e:
248        logging.error('Worker failed to execute jobs {e}'.format(e=e))
249        # Something went badly wrong, mark all jobs as failed
250        os.chdir(old_dir)  # ?
251        log_exception(logging.error)
252
253        if isinstance(e, IOError):
254            # sometimes /leo claims it is full when it isn't
255            new_status = JobStatus.RETRY
256
257        else:
258            new_status = JobStatus.FAILED
259
260        update_jobs(job=jobs, set_status=new_status, db_conn=db_conn)
261
262        # writing JobStatus into PROCESS.STATUS
263        update_process(process_id=proc_id,
264                       set_status=new_status,
265                       set_execute_stop=datetime.utcnow())
266
267        logging.info('{jobs} jobs and process {proc_id} marked as {new_status}'.format(
268                jobs=len(jobs),
269                proc_id=proc_id,
270                new_status=new_status.name))
271        return
272
273    os.chdir(old_dir)
274
275    # Count how many non-successful jobs there were so we can send notifications,
276    # and compute and queue derived jobs
277    non_complete = 0
278    completed = 0
279    new_jobs = 0
280    dup_jobs = 0
281    for job in jobs:
282        if job.status not in (JobStatus.COMPLETED,
283                              JobStatus.FAILED,
284                              JobStatus.RETRY,
285                              JobStatus.TIMEOUT):
286            raise ValueError('Bad status {status}'.format(status=job.status.name))
287
288        if job.status is JobStatus.COMPLETED:
289            # if job.get('sid')=='N19':
290            # logging.debug('Searching for jobs derived from '+str(job['id']))
291            # try:
292            for derived_job in find_derived_jobs(job):
293                # if job.get('sid')=='N19':
294                    # logging.debug('    derived job '+str(derived_job))
295                # start = time.time()
296                if settings.DATABASE_JOBS_TABLE_PARENT:
297                    derived_job.parent = job.job_id
298
299                new_jobs += 1
300                if add_job(derived_job)[0] is False:
301                    dup_jobs += 1
302                # note add_job does not commit
303                # logging.debug('add_job took {t}'.format(t=time.time()-start))
304        # except NoSuchTable as e:
305            # if the result file claims it wrote to a table which doesn't exist,
306            # trap it here
307            # Note, 'FAILED' is not perfect since normally that means the algorithm itself
308            # failed. It would be clearer to be able to mark a job as failed due to
309            # infrastructure errors
310            # logging.warn('Marking job {id} as FAILED since it claims to have written to '
311                    # '{table} which does not exist'.format(
312                    # id=job.job_id, table=e.table_name))
313            # job.status = JobStatus.FAILED.name
314            completed += 1
315
316        else:
317            non_complete += 1
318
319        # Now all the db updates are finished, handle notification emails for any failures
320        # or end of failure state
321        RoleManager.instance().handle_job_state(job, sendmails=sendmails, log=True)
322
323        # wait for commit ...
324        # update_jobs(job=job, set_status=job.status, commit=False)
325
326        # Assign the full process log file to each job
327        # This is a hack and gives bad results with jobs batching
328        # The DispatcherResult object should return a dict of job IDs against individual
329        # log file analysis dicts to fix this properly
330        if process_res.log_file_analysis:
331            critical_count = process_res.log_file_analysis['CRITICAL']
332            error_count = process_res.log_file_analysis['ERROR']
333            warning_count = process_res.log_file_analysis['WARNING']
334            info_count = process_res.log_file_analysis['INFO']
335
336        else:
337            critical_count = None
338            error_count = None
339            warning_count = None
340            info_count = None
341
342        update_jobs(job=job,
343                    set_status=job.status,
344                    set_critical_count=critical_count,
345                    set_error_count=error_count,
346                    set_warning_count=warning_count,
347                    set_info_count=info_count,
348                    db_conn=db_conn)
349
350    # ... commit here
351    # (writing JobStatus into PROCESS.STATUS...)
352
353    update_process(process_id=proc_id,
354                   set_status=process_res.process_status,
355                   set_execute_stop=datetime.utcnow(),
356                   set_critical_count=critical_count,
357                   set_error_count=error_count,
358                   set_warning_count=warning_count,
359                   set_info_count=info_count)
360    # insert new products into the PRODUCTS table if we are using filename
361    # calling convention
362
363    logging.info('Batch complete {complete} jobs completed {new} new jobs raised '
364                 'of which {dups} duplicates were discarded'.format(
365            complete=completed, new=new_jobs, dups=dup_jobs))
366
367    # Try to avoid occasional failures in the duplicates test
368    db_conn.commit()
369
370
371def tidy_up(worker_name):
372    """Tidy up any records left by a terminated instance."""
373    logging.info('Searching for any terminated processes')
374
375    for proc_id, in find_processes(fields=('ID',),
376                                   status='RUNNING',
377                                   worker=worker_name):
378        logging.info('Tidying up terminated process {id}'.format(id=proc_id))
379        update_jobs(process_id=proc_id, set_status=JobStatus.PENDING, commit=False)
380        update_process(process_id=proc_id, set_status=ProcessStatus.TERMINATED, commit=False)
381
382    db_connect('JOBS').commit()
383
384
385def run(worker_name,  # (dangerous default value) pylint: disable=W0102
386        category,
387        work_area_base,
388        batch_size=DEFAULT_BATCH_SIZE,
389        only_activities=None,
390        only_filenames=None,
391        exclude_activities=None,
392        only_ingestion=False,
393        exclude_ingestion=False,
394        jobs_lock=None,
395        exit_on_idle=False):
396    """Complete all processable JOBS including derived jobs, then return.
397
398    Args:
399        `worker_name` (str): Name to insert into the `worker` field of the PROCESSES table.
400        `category` (str): Only process jobs from `category`
401        `work_area` (str): Base directory for working directories
402        `only_activities` (list of str): Activities to process
403        `only_filenames` (list of str): When running ingestion jobs only ingest matching files
404        `exclude_activities` (list str): Ignore given activities
405
406    Returns:
407        None.
408
409    Raises:
410        None.
411    """
412
413    logging.info('Worker process entering main loop')
414    while True:
415        work_area = work_area_base.child(datetime.utcnow().date().strftime('%Y%m%d'))
416
417        while True:
418            now = datetime.utcnow()
419
420            db_conn = db_connect('JOBS')
421
422            # jobs lock is used only for Oracle projects
423            if jobs_lock is not None:
424                res = None
425                from chart.db.drivers import oracle_lock
426                while res != oracle_lock.Lock.OK:
427                    res = jobs_lock.acquire(60)
428                    if res == oracle_lock.Lock.TIMEOUT:
429                        logging.info('Jobs lock timeout')
430
431            logging.info('Seeking next job')
432            job = select_next_job(category=category,
433                                  now=now,
434                                  only_activities=only_activities,
435                                  exclude_activities=exclude_activities,
436                                  only_filenames=only_filenames,
437                                  only_ingestion=only_ingestion,
438                                  exclude_ingestion=exclude_ingestion,
439                                  db_conn=db_conn,
440                                  lock=True)
441
442            if job is None:
443                logging.info('No pending jobs')
444                if jobs_lock is not None:
445                    jobs_lock.release()
446
447                break
448
449            logging.info('Selected {id} as next primary job'.format(id=job.job_id))
450            jobs = [job] + list(find_similar_jobs(job, batch_size - 1, now, only_filenames))
451            # jobs.sort(key=operator.itemgetter('filename', 'sensing_start'))
452
453            # logging.info('Batch of {cc} jobs created'.format(cc=len(jobs)))
454
455            # process_jobs() drops the jobs table lock
456            process_jobs(worker_name=worker_name,
457                         jobs=jobs,
458                         work_dir_base=work_area,
459                         jobs_lock=jobs_lock,
460                         sendmails=category == 'SCHEDULER',
461                         db_conn=db_conn)
462
463        if exit_on_idle:
464            break
465
466        else:
467            logging.info('Sleeping for {delay}s'.format(delay=settings.WORKER_SLEEP))
468            time.sleep(settings.WORKER_SLEEP)
469
470
471def list_activities(only_activities,
472                    exclude_activities,
473                    only_ingestion,
474                    exclude_ingestion,
475                    max_weight):
476    """`only_ingestion` and `exclude_ingestion` are bools.
477    `only_activites` and `exclude_activities` are lists of wildcard strings,
478    with any '*' preconverted to '%' and all chars in upper case.
479    Unfortunately this is not the same code the actually decides whether to schedule
480    jobs so it's not really guaranteed to be accurate.
481    """
482    if only_activities is not None and not exclude_ingestion:
483        logger.warning('Set --exclude-ingestion to exclude ingestion jobs. Omitting them from '
484                       '--only-activity is not enough')
485
486    # if only_activities is not None:
487        # only_activities = [o.replace('%', '*') for o in only_activities]
488
489    # if exclude_activities is not None:
490        # exclude_activities = [e.replace('%', '*') for e in exclude_activities]
491
492    t = Table(headings=('Activity', 'Priority', 'Status'))
493    for a in Activity.all():
494        status = None
495
496        if a.convention is CallingConvention.FILENAME:
497            if only_activities is not None:
498                for o in only_activities:
499                    if fnmatch(a.name, o):
500                        logging.warning(
501                            'Activity {a} is matched by --only-activity {o} but is an ingestion '
502                            'activity - use --only-ingestion instead'.format(a=a.name, o=o))
503
504            # if exclude_activities is not None:
505                # for e in exclude_activities:
506                    # if fnmatch(a.name, e):
507                        # logging.warning(
508                            # 'Activity {a} is matched by --exclude-activity {e} but is an ingestion '
509                            # 'activity - use --exclude-ingestion instead'.format(
510                                            # a=a.name, e=e))
511
512            if only_ingestion:
513                status = 'active (from --only-ingestion)'
514
515            elif not exclude_ingestion:
516                status = 'active'
517
518            else:
519                status = 'excluded (by --exclude-ingestion)'
520
521        else:
522            if only_ingestion:
523                status = 'excluded (by --only-ingestion)'
524
525            if only_activities is not None:
526                if any(fnmatch(a.name, o) for o in only_activities):
527                    if only_ingestion:
528                        logger.warning('Activity {a} is disabled by --only-ingestion but enabled '
529                                       'with --only-actvities'.format(a=a.name))
530
531                    status = 'active (included in --only-activities)'
532
533                else:
534                    status = 'excluded (not in --only-activities)'
535
536            if exclude_activities is not None:
537                # if any(fnmatch(a.name, e) for e in exclude_activities):
538                    # if status is not None:
539                        # logger.warning('Activity {a} is both included and excluded')
540                status = 'excluded (from --exclude-activities {e})'.format(
541                        e=exclude_activities)
542
543            if status is None:
544                status = 'active'
545
546        if max_weight is not None and a.weight > max_weight:
547            status = 'excluded since weight {a} is over limit {m}'.format(
548                a=a.weight, m=max_weight)
549
550        t.append((a.name, a.priority, status))
551
552    t.write()
553
554
555def active_workers(category:str):
556    """Yield names of all workers with jobs in `category`."""
557    db_conn = db_connect('JOBS')
558    for row in db_conn.query(('SELECT DISTINCT p.worker FROM processes p, jobs j WHERE p.id=j.process_id AND j.category=:category ORDER BY p.worker'),
559                             category=category):
560        yield row[0]
561
562
563def worker_activity(category:str, worker:str, limit:int):
564    """Yield information about most recent jobs a worker did."""
565    db_conn = db_connect('JOBS')
566    for row in db_conn.query((
567            'SELECT * FROM (SELECT j.id, j.status, j.activity, j.filename, j.tablename, p.execute_start, p.execute_stop '
568            'FROM processes p, jobs j '
569            'WHERE p.id=j.process_id AND '
570            'j.category=:category '
571            'AND p.worker=:worker '
572            'ORDER BY p.execute_start DESC) q WHERE rownum<:limit'),
573                             category=category,
574                             worker=worker,
575                             limit=limit):
576        yield row
577
578
579def show_worker_status(category:str, limit:int):
580    """Display recent workers activity."""
581    table = Table(headings=('Worker', 'Status', 'ID', 'Activity', 'Filename or table', 'Exe start', 'Exe stop'))
582    prev_worker = None
583    for worker in active_workers(category=category):
584        # print('Worker {w}'.format(w=worker))
585        for job_id, status, activity, filename, tablename, execute_start, execute_stop in worker_activity(
586                category=category, worker=worker, limit=limit):
587            table.append((
588                worker if worker != prev_worker else '',
589                status,
590                job_id,
591                activity,
592                filename if filename is None else tablename,
593                execute_start,
594                execute_stop))
595            prev_worker = worker
596
597    table.write()
598
599
600def main():
601    """Command line entry point."""
602    # os.environ['TZ'] = 'UTC'
603    # time.tzset()
604    parser = ArgumentParser()
605    parser.add_argument('--log-file',
606                        help='Single logfile location')
607    parser.add_argument('--rotating-log-file',
608                        help='Rotating logfile location')
609    parser.add_argument('--db',
610                        metavar='CONN',
611                        help='Use database connection CONNECTION')
612    parser.add_argument('--category',
613                        default=settings.DEFAULT_CATEGORY,
614                        # required=True,
615                        help='Job category. All other categories will be ignored.')
616    parser.add_argument('--work-area', '-o',
617                        type=ArgumentParser.output_dir,
618                        help='Base for working directories. If not given a temp directory will be '
619                        'assigned')
620    parser.add_argument('--worker-name', '--name',
621                        help=('Worker name. Should be unique otherwise the tidy up code '
622                              'may do the wrong thing. A unique name will be assigned if not '
623                              'specified'))
624                        # default=DEFAULT_NAME)
625    # parser.add_argument('--one-shot', '--exit-on-idle', '--once',
626                        # action='store_true',
627                        # help='Exit when there are no jobs to be run. Otherwise we wait for more')
628    parser.add_argument('--loop',
629                        action='store_true',
630                        help='Keep looking for more jobs after all current jobs are completed')
631    parser.add_argument(
632        '--only-activities',
633        nargs='*',
634        # type=ArgumentParser.activity,
635        # make this type Activity
636        help=('Only execute the listed activities. This does not affect scheduling of ingestion '
637              'jobs so use --exclude-ingestion to disable those. Use \'%%\' as a wildcard'))
638    parser.add_argument(
639        '--exclude-activities',
640        nargs='*',
641        help='Do not execute jobs in the prescribed activities. This has no affect on ingestion '
642        'jobs. Use "%%" as a wildcard')
643    parser.add_argument('--exclude-ingestion',
644                        action='store_true',
645                        help='Ignore ingestion jobs')
646    parser.add_argument('--only-ingestion',
647                        action='store_true',
648                        help='Only process ingestion jobs')
649    parser.add_argument('--batch-size',
650                        type=int,
651                        default=DEFAULT_BATCH_SIZE,
652                        metavar='SIZE',
653                        help='Process SIZE jobs at a time')
654    parser.add_argument('--only-filenames', '--filenames',
655                        # nargs='*',
656                        metavar='FILE',
657                        help='Only run activities whose filenames match FILE (wildcard)')
658    parser.add_argument('--list-activities', '--list', '-l',
659                        action='store_true',
660                        help='Show status of all activity files including disabled and excluded '
661                        'activities')
662    parser.add_argument('--timeout',
663                        type=ArgumentParser.timedelta,
664                        help='Force a different timeout value for activities (default {default})'.
665                        format(default=settings.DISPATCHER_TIMEOUT))
666    parser.add_argument('--reingest',
667                        help='Force some algorithms to delete data before ingesting',
668                        action='store_true')
669    parser.add_argument('--show-first-job',
670                        action='store_true',
671                        help=('Just show the highest priority job to run, according to all other '
672                              'args'))
673    parser.add_argument('--max-weight',
674                        type=float,
675                        help=('Maximum weight of activities this worker will attempt'))
676    parser.add_argument('--project-id',
677                        type=int,
678                        metavar='ID',
679                        help='Force PROJECT for multi-project JOBS table function')
680    parser.add_argument('--status',
681                        action='store_true',
682                        help='Show current status of known workers')
683    parser.add_argument('--limit',
684                        type=int,
685                        help='Number of jobs to display if --status is used',
686                        default=10)
687
688    # (note this argument cannot be used to exclude ingestion activities (PDU_INGESTER etc.).
689    # Use --exclude-ingestion for this.
690    # (note --only-filenames requires --only-activities)
691    # --exclude-ingestion
692    # --only-ingestion
693
694    # postpone log file init until we do it explicitly later
695    args = parser.parse_args(no_log_init=True)
696
697    if args.rotating_log_file:
698        settings.ROTATING_LOG_FILE = args.rotating_log_file
699
700    if args.log_file:
701        settings.SINGLE_LOG_FILE = args.single_log_file
702
703    init_log()
704    # os.environ['TZ'] = 'UTC'
705    # time.tzset()
706
707    # initialise django templating otherwise we can get a crash on sending emails
708    django.setup()
709
710    # switch databases (must be first option handled)
711    if args.db:
712        settings.set_db_name(args.db)
713
714    if args.status:
715        show_worker_status(args.category, args.limit)
716        parser.exit()
717
718    if args.project_id:
719        settings.DATABASE_PROJECT_ID = args.project_id
720
721    if args.timeout is not None:
722        settings.DISPATCHER_TIMEOUT = args.timeout
723
724    if args.reingest:
725        from chart.common.env import set_env
726        set_env('{ENV_PREFIX}REINGEST', True)
727
728    if args.only_ingestion and args.exclude_ingestion:
729        parser.error('Cannot set both --only-ingestion and --exclude-ingestion')
730
731    if args.exclude_ingestion and not args.only_activities:
732        print('Warning: If you set --exclude-ingestion you probably also want to use '
733              '--only-activities too otherwise you will still get ingestion jobs running.')
734
735    if args.worker_name is None:
736        args.worker_name = 'worker.{pid}'.format(pid=os.getpid())
737        logging.info('Using {name} as worker name'.format(name=args.worker_name))
738
739    # dont construct Activity objects here as they could be wildcards
740    if args.only_activities is not None:
741        only_activities = [o.upper().replace('*', '%') for o in args.only_activities]
742
743    else:
744        only_activities = None
745
746    if args.exclude_activities is not None:
747        exclude_activities = [e.upper().replace('*', '%') for e in args.exclude_activities]
748
749    else:
750        exclude_activities = None
751
752    if len(settings.EXCLUDE_ACTIVITIES) > 0:
753        if exclude_activities is None:
754            exclude_activities = []
755
756        for a in settings.EXCLUDE_ACTIVITIES:
757            if a not in exclude_activities:
758                exclude_activities.append(a)
759
760    if args.list_activities:
761        list_activities(only_activities=only_activities,
762                        exclude_activities=exclude_activities,
763                        only_ingestion=args.only_ingestion,
764                        exclude_ingestion=args.exclude_ingestion,
765                        max_weight=args.max_weight)
766        parser.exit()
767
768    exclude_activities = disallowed_activities(exclude_activities, args.max_weight)
769
770    # Jobs lock is a database-wide mutex used to lock the JOBS and PROCESSES table when one
771    # worker is using it to prevent other workers from interfering.
772    # It can be set to None if only one worker is in used.
773    if db_connect('JOBS').engine is DatabaseEngine.ORACLE:
774        from chart.db.drivers import oracle_lock
775        locker = oracle_lock.Lock('CHART.jobs_table_lock')
776
777    else:
778        locker = None
779
780    tidy_up(worker_name=args.worker_name)
781
782    if args.category is None:
783        parser.error('No --category specified')
784
785    if args.show_first_job:
786        print(select_next_job(category=args.category.upper(),
787                              only_activities=only_activities,
788                              exclude_activities=exclude_activities,
789                              only_filenames=args.only_filenames,
790                              only_ingestion=args.only_ingestion,
791                              exclude_ingestion=args.exclude_ingestion))
792        parser.exit()
793
794    if args.work_area:
795        work_area = args.work_area.expand()
796
797    elif settings.WORK_AREA:
798        work_area = settings.WORK_AREA
799
800    else:
801        work_area = make_working_directory('worker')
802
803    ensure_dir_exists(work_area)
804
805    run(args.worker_name,
806        args.category.upper(),
807        work_area_base=work_area,
808        batch_size=args.batch_size,
809        only_activities=only_activities,
810        only_filenames=args.only_filenames,
811        exclude_activities=exclude_activities,
812        only_ingestion=args.only_ingestion,
813        exclude_ingestion=args.exclude_ingestion,
814        jobs_lock=locker,
815        exit_on_idle=not args.loop)
816
817    # run_forever(worker_name=args.worker_name,
818    #             category=args.category.upper(),
819    #             work_area_base=args.work_area,
820    #             batch_size=args.batch_size,
821    #             only_activities=
822    #             only_filenames=None,  # args.only_filenames,
823    #             only_ingestion=args.only_ingestion,
824    #             exclude_ingestion=args.exclude_ingestion,
825    #             exclude_activities=args.exclude_activities,
826    #             jobs_lock=db.Lock('CHART.jobs_table_lock'))
827
828if __name__ == '__main__':
829    main()