1#!/usr/bin/env python3
2
3"""Job processing code. The worker reads entries from the JOBS table
4and executes them.
5Each worker only processes jobs from a single category and ignores others.
6Jobs are selected based on a hard coded priority (ingestion first, then
7per-orbit algorithms, then daily, then weekly, then other time based jobs.
8
9After each job is completed it may trigger other jobs which are placed in the JOBS
10table.
11
12Jobs are batched up according to hard coded rules with similar ingestions
13and per-orbit algorithms run together.
14
15The worker can run either in a single job mode completing an individual specified
16job, or it can keep rescanning the JOBS table until all work is done and exit,
17or it can loop indefinitely.
18
19Statistics handling
20-------------------
21
22Orbital and daily statistics are handled as a special case, although the have
23a activity files also.
24
25Stats handling hooks into the job control mechanism in the following ways:
26
27 * The `find_similar_jobs()` function knows that multiple stats jobs with the
28 same `tablename` attribute can be processed as a single invocation of the activity.
29"""
30
31import os
32import time
33import logging
34import importlib
35from datetime import datetime
36from fnmatch import fnmatch
37
38import django
39
40# This should not be needed but it is to make worker.log use UTC time
41# when run under the supervisor on the TST environment on tcprotos
42# os.environ['TZ'] = 'UTC'
43# time.tzset()
44
45from chart.db.settings import DatabaseEngine
46from chart.backend.dispatcher import dispatch
47from chart.common.util import ensure_dir_exists
48from chart.backend.activity import Activity
49from chart.backend.activity import CallingConvention
50from chart.backend.activity import DEFAULT_WEIGHT
51from chart.common.args import ArgumentParser
52from chart.backend.job import JobStatus
53from chart.backend.jobs import update_jobs
54from chart.common.log import log_exception
55from chart.common.prettyprint import Table
56from chart.backend.processes import ProcessStatus
57from chart.backend.processes import find_processes
58from chart.backend.processes import insert_process
59from chart.backend.processes import update_process
60from chart import settings
61from chart.backend.jobs import add_job
62from chart.db.connection import db_connect
63from chart.common.log import init_log
64
65logger = logging.getLogger()
66
67# Every worker needs a name for the PROCESSES table. Here we set a default name
68# which can be overridden with a command line flag
69DEFAULT_NAME = 'worker'
70
71# if there is a database problem pause for this many seconds
72RETRY_PAUSE = 900
73
74# Number of jobs to be batched together into a single subprocess
75DEFAULT_BATCH_SIZE = 100
76
77
78class BadJob(Exception):
79 """Exception if client code tries to create an invalid job dict."""
80
81 def __init__(self, activity, message):
82 super(BadJob, self).__init__()
83 self.activity = activity
84 self.message = message
85
86 def __str__(self):
87 return '{act}: {message}'.format(act=self.activity.name, message=self.message)
88
89
90def module(module_name):
91 """Cached load of `module_name` from settings."""
92 result = module.cache.get(module_name)
93 if result is not None:
94 return result
95
96 mod = importlib.import_module(module_name)
97 module.cache[module_name] = mod
98 return mod
99
100module.cache = {}
101
102
103def make_working_directory(program):
104 """Create a working directory inside settings.WORK_DIR."""
105 basedir = settings.WORK_AREA.child(program)
106 prefix = str(os.getpid())
107 # if WORK_DIR.PID exists, try WORK_DIR.PID.x
108 if basedir.child(prefix).exists():
109 cc = 1
110 while basedir.child('{prefix}.{cc}'.format(prefix=prefix, cc=cc)).exists():
111 cc += 1
112
113 prefix = '{prefix}.{cc}'.format(prefix=prefix, cc=cc)
114
115 working_dir = basedir.child(prefix)
116 ensure_dir_exists(working_dir)
117
118 logger.info('Using {d} as working directory'.format(d=working_dir))
119 return working_dir
120
121
122def find_derived_jobs(job):
123 """Run the per-project job chain function."""
124 return module(settings.WORKER_JOB_CHAIN).find_derived_jobs(job)
125
126
127def disallowed_activities(exclude_activities,
128 max_weight):
129 """Return a list of activities not allowed under the given exclusion and max weight.
130
131 The input list `exclude_activities` is modified by this function.
132
133 This is a bit ugly but is done this way to avoid introducing changes that break any projects.
134
135 The combination of options only_activities, exclude_activities, exclude_ingestion and weighting
136 could all be handled more neatly by channelling them all through this function.
137
138 Also list_activities() should really use this function instead of having it's own algorithm.
139 """
140 # This is all a bit ugly.
141 # An activity will end up being listed twice if it's in exclude_activities and also too heavy
142 # but that doesn't really matter
143 if exclude_activities is None:
144 exclude_activities = []
145
146 for a in Activity.all():
147 if max_weight is not None and a.weight > max_weight:
148 exclude_activities.append(a)
149
150 return exclude_activities
151
152
153def select_next_job(category,
154 now=None,
155 only_activities=None,
156 exclude_activities=None,
157 only_ingestion=False,
158 exclude_ingestion=False,
159 only_filenames=None,
160 db_conn=None,
161 lock=False):
162 """Decide what, if anything, to do now."""
163 if db_conn is None:
164 db_conn = db_connect('JOBS')
165
166 if now is None:
167 now = datetime.utcnow()
168
169 return module(settings.WORKER_JOB_PRIORITY).select_next_job(
170 category,
171 now,
172 only_activities,
173 exclude_activities,
174 only_ingestion,
175 exclude_ingestion,
176 only_filenames,
177 db_conn,
178 lock)
179
180
181def find_similar_jobs(job, count, now=None, only_filenames=None):
182 """Assemble a batch of jobs to be executed together."""
183 if now is None:
184 now = datetime.utcnow()
185
186 return module(settings.WORKER_JOB_PRIORITY).find_similar_jobs(
187 job, count, now, only_filenames)
188
189
190def process_jobs(worker_name,
191 jobs,
192 work_dir_base,
193 jobs_lock=None,
194 sendmails=False,
195 db_conn=None):
196 """Perform `jobs`, doing the immediate work, adding derived jobs, then
197 recording the original jobs as complete.
198 Job must all be similar, so must have the same category, activity, and
199 the same tablename if statistics jobs.
200 The working directory used will be a subdirectory of `work_dir_base`.
201
202 Args:
203
204 `worker_name` (str): Name of the worker process for the PROCESSES table
205 `jobs` (list of Job): Jobs to be executed as a batch
206 `work_dir_base` (Path): Parent of any working directories to be made
207 `jobs_lock` (Lock): If in use, mutex lock for touching the JOBS table
208
209 Returns:
210 None.
211 """
212 # Remove circular import
213 from chart.users.role import RoleManager
214
215 if db_conn is None:
216 db_conn = db_connect('PROCESSES')
217
218 # Insert an entry to the PROCESSES table
219 proc_id = insert_process(worker_name, db_conn=db_conn, commit=False)
220
221 work_dir = work_dir_base.absolute().child(
222 '{act}_{proc}'.format(act=jobs[0].activity.name, proc=proc_id))
223
224 update_process(process_id=proc_id, set_working_dir=work_dir, db_conn=db_conn, commit=False)
225 logging.info('Processing {cc} jobs with activity {act} assigning process ID {id}'.format(
226 cc=len(jobs),
227 act=jobs[0].activity.name,
228 id=proc_id))
229
230 # mark the jobs we are about to run as being in progress
231 update_jobs(job=jobs, set_process_id=proc_id, set_status=JobStatus.IN_PROGRESS, db_conn=db_conn)
232
233 if jobs_lock is not None:
234 jobs_lock.release()
235
236 old_dir = os.getcwd()
237
238 # Run the jobs
239 # if settings.DEBUG:
240 # process_res = dispatch(
241 # jobs, work_dir=work_dir, ingest=True, use_subprocess=True)
242
243 # else:
244 try:
245 process_res = dispatch(
246 jobs, work_dir=work_dir, ingest=True, use_subprocess=True, sendmails=True)
247 except Exception as e:
248 logging.error('Worker failed to execute jobs {e}'.format(e=e))
249 # Something went badly wrong, mark all jobs as failed
250 os.chdir(old_dir) # ?
251 log_exception(logging.error)
252
253 if isinstance(e, IOError):
254 # sometimes /leo claims it is full when it isn't
255 new_status = JobStatus.RETRY
256
257 else:
258 new_status = JobStatus.FAILED
259
260 update_jobs(job=jobs, set_status=new_status, db_conn=db_conn)
261
262 # writing JobStatus into PROCESS.STATUS
263 update_process(process_id=proc_id,
264 set_status=new_status,
265 set_execute_stop=datetime.utcnow())
266
267 logging.info('{jobs} jobs and process {proc_id} marked as {new_status}'.format(
268 jobs=len(jobs),
269 proc_id=proc_id,
270 new_status=new_status.name))
271 return
272
273 os.chdir(old_dir)
274
275 # Count how many non-successful jobs there were so we can send notifications,
276 # and compute and queue derived jobs
277 non_complete = 0
278 completed = 0
279 new_jobs = 0
280 dup_jobs = 0
281 for job in jobs:
282 if job.status not in (JobStatus.COMPLETED,
283 JobStatus.FAILED,
284 JobStatus.RETRY,
285 JobStatus.TIMEOUT):
286 raise ValueError('Bad status {status}'.format(status=job.status.name))
287
288 if job.status is JobStatus.COMPLETED:
289 # if job.get('sid')=='N19':
290 # logging.debug('Searching for jobs derived from '+str(job['id']))
291 # try:
292 for derived_job in find_derived_jobs(job):
293 # if job.get('sid')=='N19':
294 # logging.debug(' derived job '+str(derived_job))
295 # start = time.time()
296 if settings.DATABASE_JOBS_TABLE_PARENT:
297 derived_job.parent = job.job_id
298
299 new_jobs += 1
300 if add_job(derived_job)[0] is False:
301 dup_jobs += 1
302 # note add_job does not commit
303 # logging.debug('add_job took {t}'.format(t=time.time()-start))
304 # except NoSuchTable as e:
305 # if the result file claims it wrote to a table which doesn't exist,
306 # trap it here
307 # Note, 'FAILED' is not perfect since normally that means the algorithm itself
308 # failed. It would be clearer to be able to mark a job as failed due to
309 # infrastructure errors
310 # logging.warn('Marking job {id} as FAILED since it claims to have written to '
311 # '{table} which does not exist'.format(
312 # id=job.job_id, table=e.table_name))
313 # job.status = JobStatus.FAILED.name
314 completed += 1
315
316 else:
317 non_complete += 1
318
319 # Now all the db updates are finished, handle notification emails for any failures
320 # or end of failure state
321 RoleManager.instance().handle_job_state(job, sendmails=sendmails, log=True)
322
323 # wait for commit ...
324 # update_jobs(job=job, set_status=job.status, commit=False)
325
326 # Assign the full process log file to each job
327 # This is a hack and gives bad results with jobs batching
328 # The DispatcherResult object should return a dict of job IDs against individual
329 # log file analysis dicts to fix this properly
330 if process_res.log_file_analysis:
331 critical_count = process_res.log_file_analysis['CRITICAL']
332 error_count = process_res.log_file_analysis['ERROR']
333 warning_count = process_res.log_file_analysis['WARNING']
334 info_count = process_res.log_file_analysis['INFO']
335
336 else:
337 critical_count = None
338 error_count = None
339 warning_count = None
340 info_count = None
341
342 update_jobs(job=job,
343 set_status=job.status,
344 set_critical_count=critical_count,
345 set_error_count=error_count,
346 set_warning_count=warning_count,
347 set_info_count=info_count,
348 db_conn=db_conn)
349
350 # ... commit here
351 # (writing JobStatus into PROCESS.STATUS...)
352
353 update_process(process_id=proc_id,
354 set_status=process_res.process_status,
355 set_execute_stop=datetime.utcnow(),
356 set_critical_count=critical_count,
357 set_error_count=error_count,
358 set_warning_count=warning_count,
359 set_info_count=info_count)
360 # insert new products into the PRODUCTS table if we are using filename
361 # calling convention
362
363 logging.info('Batch complete {complete} jobs completed {new} new jobs raised '
364 'of which {dups} duplicates were discarded'.format(
365 complete=completed, new=new_jobs, dups=dup_jobs))
366
367 # Try to avoid occasional failures in the duplicates test
368 db_conn.commit()
369
370
371def tidy_up(worker_name):
372 """Tidy up any records left by a terminated instance."""
373 logging.info('Searching for any terminated processes')
374
375 for proc_id, in find_processes(fields=('ID',),
376 status='RUNNING',
377 worker=worker_name):
378 logging.info('Tidying up terminated process {id}'.format(id=proc_id))
379 update_jobs(process_id=proc_id, set_status=JobStatus.PENDING, commit=False)
380 update_process(process_id=proc_id, set_status=ProcessStatus.TERMINATED, commit=False)
381
382 db_connect('JOBS').commit()
383
384
385def run(worker_name, # (dangerous default value) pylint: disable=W0102
386 category,
387 work_area_base,
388 batch_size=DEFAULT_BATCH_SIZE,
389 only_activities=None,
390 only_filenames=None,
391 exclude_activities=None,
392 only_ingestion=False,
393 exclude_ingestion=False,
394 jobs_lock=None,
395 exit_on_idle=False):
396 """Complete all processable JOBS including derived jobs, then return.
397
398 Args:
399 `worker_name` (str): Name to insert into the `worker` field of the PROCESSES table.
400 `category` (str): Only process jobs from `category`
401 `work_area` (str): Base directory for working directories
402 `only_activities` (list of str): Activities to process
403 `only_filenames` (list of str): When running ingestion jobs only ingest matching files
404 `exclude_activities` (list str): Ignore given activities
405
406 Returns:
407 None.
408
409 Raises:
410 None.
411 """
412
413 logging.info('Worker process entering main loop')
414 while True:
415 work_area = work_area_base.child(datetime.utcnow().date().strftime('%Y%m%d'))
416
417 while True:
418 now = datetime.utcnow()
419
420 db_conn = db_connect('JOBS')
421
422 # jobs lock is used only for Oracle projects
423 if jobs_lock is not None:
424 res = None
425 from chart.db.drivers import oracle_lock
426 while res != oracle_lock.Lock.OK:
427 res = jobs_lock.acquire(60)
428 if res == oracle_lock.Lock.TIMEOUT:
429 logging.info('Jobs lock timeout')
430
431 logging.info('Seeking next job')
432 job = select_next_job(category=category,
433 now=now,
434 only_activities=only_activities,
435 exclude_activities=exclude_activities,
436 only_filenames=only_filenames,
437 only_ingestion=only_ingestion,
438 exclude_ingestion=exclude_ingestion,
439 db_conn=db_conn,
440 lock=True)
441
442 if job is None:
443 logging.info('No pending jobs')
444 if jobs_lock is not None:
445 jobs_lock.release()
446
447 break
448
449 logging.info('Selected {id} as next primary job'.format(id=job.job_id))
450 jobs = [job] + list(find_similar_jobs(job, batch_size - 1, now, only_filenames))
451 # jobs.sort(key=operator.itemgetter('filename', 'sensing_start'))
452
453 # logging.info('Batch of {cc} jobs created'.format(cc=len(jobs)))
454
455 # process_jobs() drops the jobs table lock
456 process_jobs(worker_name=worker_name,
457 jobs=jobs,
458 work_dir_base=work_area,
459 jobs_lock=jobs_lock,
460 sendmails=category == 'SCHEDULER',
461 db_conn=db_conn)
462
463 if exit_on_idle:
464 break
465
466 else:
467 logging.info('Sleeping for {delay}s'.format(delay=settings.WORKER_SLEEP))
468 time.sleep(settings.WORKER_SLEEP)
469
470
471def list_activities(only_activities,
472 exclude_activities,
473 only_ingestion,
474 exclude_ingestion,
475 max_weight):
476 """`only_ingestion` and `exclude_ingestion` are bools.
477 `only_activites` and `exclude_activities` are lists of wildcard strings,
478 with any '*' preconverted to '%' and all chars in upper case.
479 Unfortunately this is not the same code the actually decides whether to schedule
480 jobs so it's not really guaranteed to be accurate.
481 """
482 if only_activities is not None and not exclude_ingestion:
483 logger.warning('Set --exclude-ingestion to exclude ingestion jobs. Omitting them from '
484 '--only-activity is not enough')
485
486 # if only_activities is not None:
487 # only_activities = [o.replace('%', '*') for o in only_activities]
488
489 # if exclude_activities is not None:
490 # exclude_activities = [e.replace('%', '*') for e in exclude_activities]
491
492 t = Table(headings=('Activity', 'Priority', 'Status'))
493 for a in Activity.all():
494 status = None
495
496 if a.convention is CallingConvention.FILENAME:
497 if only_activities is not None:
498 for o in only_activities:
499 if fnmatch(a.name, o):
500 logging.warning(
501 'Activity {a} is matched by --only-activity {o} but is an ingestion '
502 'activity - use --only-ingestion instead'.format(a=a.name, o=o))
503
504 # if exclude_activities is not None:
505 # for e in exclude_activities:
506 # if fnmatch(a.name, e):
507 # logging.warning(
508 # 'Activity {a} is matched by --exclude-activity {e} but is an ingestion '
509 # 'activity - use --exclude-ingestion instead'.format(
510 # a=a.name, e=e))
511
512 if only_ingestion:
513 status = 'active (from --only-ingestion)'
514
515 elif not exclude_ingestion:
516 status = 'active'
517
518 else:
519 status = 'excluded (by --exclude-ingestion)'
520
521 else:
522 if only_ingestion:
523 status = 'excluded (by --only-ingestion)'
524
525 if only_activities is not None:
526 if any(fnmatch(a.name, o) for o in only_activities):
527 if only_ingestion:
528 logger.warning('Activity {a} is disabled by --only-ingestion but enabled '
529 'with --only-actvities'.format(a=a.name))
530
531 status = 'active (included in --only-activities)'
532
533 else:
534 status = 'excluded (not in --only-activities)'
535
536 if exclude_activities is not None:
537 # if any(fnmatch(a.name, e) for e in exclude_activities):
538 # if status is not None:
539 # logger.warning('Activity {a} is both included and excluded')
540 status = 'excluded (from --exclude-activities {e})'.format(
541 e=exclude_activities)
542
543 if status is None:
544 status = 'active'
545
546 if max_weight is not None and a.weight > max_weight:
547 status = 'excluded since weight {a} is over limit {m}'.format(
548 a=a.weight, m=max_weight)
549
550 t.append((a.name, a.priority, status))
551
552 t.write()
553
554
555def active_workers(category:str):
556 """Yield names of all workers with jobs in `category`."""
557 db_conn = db_connect('JOBS')
558 for row in db_conn.query(('SELECT DISTINCT p.worker FROM processes p, jobs j WHERE p.id=j.process_id AND j.category=:category ORDER BY p.worker'),
559 category=category):
560 yield row[0]
561
562
563def worker_activity(category:str, worker:str, limit:int):
564 """Yield information about most recent jobs a worker did."""
565 db_conn = db_connect('JOBS')
566 for row in db_conn.query((
567 'SELECT * FROM (SELECT j.id, j.status, j.activity, j.filename, j.tablename, p.execute_start, p.execute_stop '
568 'FROM processes p, jobs j '
569 'WHERE p.id=j.process_id AND '
570 'j.category=:category '
571 'AND p.worker=:worker '
572 'ORDER BY p.execute_start DESC) q WHERE rownum<:limit'),
573 category=category,
574 worker=worker,
575 limit=limit):
576 yield row
577
578
579def show_worker_status(category:str, limit:int):
580 """Display recent workers activity."""
581 table = Table(headings=('Worker', 'Status', 'ID', 'Activity', 'Filename or table', 'Exe start', 'Exe stop'))
582 prev_worker = None
583 for worker in active_workers(category=category):
584 # print('Worker {w}'.format(w=worker))
585 for job_id, status, activity, filename, tablename, execute_start, execute_stop in worker_activity(
586 category=category, worker=worker, limit=limit):
587 table.append((
588 worker if worker != prev_worker else '',
589 status,
590 job_id,
591 activity,
592 filename if filename is None else tablename,
593 execute_start,
594 execute_stop))
595 prev_worker = worker
596
597 table.write()
598
599
600def main():
601 """Command line entry point."""
602 # os.environ['TZ'] = 'UTC'
603 # time.tzset()
604 parser = ArgumentParser()
605 parser.add_argument('--log-file',
606 help='Single logfile location')
607 parser.add_argument('--rotating-log-file',
608 help='Rotating logfile location')
609 parser.add_argument('--db',
610 metavar='CONN',
611 help='Use database connection CONNECTION')
612 parser.add_argument('--category',
613 default=settings.DEFAULT_CATEGORY,
614 # required=True,
615 help='Job category. All other categories will be ignored.')
616 parser.add_argument('--work-area', '-o',
617 type=ArgumentParser.output_dir,
618 help='Base for working directories. If not given a temp directory will be '
619 'assigned')
620 parser.add_argument('--worker-name', '--name',
621 help=('Worker name. Should be unique otherwise the tidy up code '
622 'may do the wrong thing. A unique name will be assigned if not '
623 'specified'))
624 # default=DEFAULT_NAME)
625 # parser.add_argument('--one-shot', '--exit-on-idle', '--once',
626 # action='store_true',
627 # help='Exit when there are no jobs to be run. Otherwise we wait for more')
628 parser.add_argument('--loop',
629 action='store_true',
630 help='Keep looking for more jobs after all current jobs are completed')
631 parser.add_argument(
632 '--only-activities',
633 nargs='*',
634 # type=ArgumentParser.activity,
635 # make this type Activity
636 help=('Only execute the listed activities. This does not affect scheduling of ingestion '
637 'jobs so use --exclude-ingestion to disable those. Use \'%%\' as a wildcard'))
638 parser.add_argument(
639 '--exclude-activities',
640 nargs='*',
641 help='Do not execute jobs in the prescribed activities. This has no affect on ingestion '
642 'jobs. Use "%%" as a wildcard')
643 parser.add_argument('--exclude-ingestion',
644 action='store_true',
645 help='Ignore ingestion jobs')
646 parser.add_argument('--only-ingestion',
647 action='store_true',
648 help='Only process ingestion jobs')
649 parser.add_argument('--batch-size',
650 type=int,
651 default=DEFAULT_BATCH_SIZE,
652 metavar='SIZE',
653 help='Process SIZE jobs at a time')
654 parser.add_argument('--only-filenames', '--filenames',
655 # nargs='*',
656 metavar='FILE',
657 help='Only run activities whose filenames match FILE (wildcard)')
658 parser.add_argument('--list-activities', '--list', '-l',
659 action='store_true',
660 help='Show status of all activity files including disabled and excluded '
661 'activities')
662 parser.add_argument('--timeout',
663 type=ArgumentParser.timedelta,
664 help='Force a different timeout value for activities (default {default})'.
665 format(default=settings.DISPATCHER_TIMEOUT))
666 parser.add_argument('--reingest',
667 help='Force some algorithms to delete data before ingesting',
668 action='store_true')
669 parser.add_argument('--show-first-job',
670 action='store_true',
671 help=('Just show the highest priority job to run, according to all other '
672 'args'))
673 parser.add_argument('--max-weight',
674 type=float,
675 help=('Maximum weight of activities this worker will attempt'))
676 parser.add_argument('--project-id',
677 type=int,
678 metavar='ID',
679 help='Force PROJECT for multi-project JOBS table function')
680 parser.add_argument('--status',
681 action='store_true',
682 help='Show current status of known workers')
683 parser.add_argument('--limit',
684 type=int,
685 help='Number of jobs to display if --status is used',
686 default=10)
687
688 # (note this argument cannot be used to exclude ingestion activities (PDU_INGESTER etc.).
689 # Use --exclude-ingestion for this.
690 # (note --only-filenames requires --only-activities)
691 # --exclude-ingestion
692 # --only-ingestion
693
694 # postpone log file init until we do it explicitly later
695 args = parser.parse_args(no_log_init=True)
696
697 if args.rotating_log_file:
698 settings.ROTATING_LOG_FILE = args.rotating_log_file
699
700 if args.log_file:
701 settings.SINGLE_LOG_FILE = args.single_log_file
702
703 init_log()
704 # os.environ['TZ'] = 'UTC'
705 # time.tzset()
706
707 # initialise django templating otherwise we can get a crash on sending emails
708 django.setup()
709
710 # switch databases (must be first option handled)
711 if args.db:
712 settings.set_db_name(args.db)
713
714 if args.status:
715 show_worker_status(args.category, args.limit)
716 parser.exit()
717
718 if args.project_id:
719 settings.DATABASE_PROJECT_ID = args.project_id
720
721 if args.timeout is not None:
722 settings.DISPATCHER_TIMEOUT = args.timeout
723
724 if args.reingest:
725 from chart.common.env import set_env
726 set_env('{ENV_PREFIX}REINGEST', True)
727
728 if args.only_ingestion and args.exclude_ingestion:
729 parser.error('Cannot set both --only-ingestion and --exclude-ingestion')
730
731 if args.exclude_ingestion and not args.only_activities:
732 print('Warning: If you set --exclude-ingestion you probably also want to use '
733 '--only-activities too otherwise you will still get ingestion jobs running.')
734
735 if args.worker_name is None:
736 args.worker_name = 'worker.{pid}'.format(pid=os.getpid())
737 logging.info('Using {name} as worker name'.format(name=args.worker_name))
738
739 # dont construct Activity objects here as they could be wildcards
740 if args.only_activities is not None:
741 only_activities = [o.upper().replace('*', '%') for o in args.only_activities]
742
743 else:
744 only_activities = None
745
746 if args.exclude_activities is not None:
747 exclude_activities = [e.upper().replace('*', '%') for e in args.exclude_activities]
748
749 else:
750 exclude_activities = None
751
752 if len(settings.EXCLUDE_ACTIVITIES) > 0:
753 if exclude_activities is None:
754 exclude_activities = []
755
756 for a in settings.EXCLUDE_ACTIVITIES:
757 if a not in exclude_activities:
758 exclude_activities.append(a)
759
760 if args.list_activities:
761 list_activities(only_activities=only_activities,
762 exclude_activities=exclude_activities,
763 only_ingestion=args.only_ingestion,
764 exclude_ingestion=args.exclude_ingestion,
765 max_weight=args.max_weight)
766 parser.exit()
767
768 exclude_activities = disallowed_activities(exclude_activities, args.max_weight)
769
770 # Jobs lock is a database-wide mutex used to lock the JOBS and PROCESSES table when one
771 # worker is using it to prevent other workers from interfering.
772 # It can be set to None if only one worker is in used.
773 if db_connect('JOBS').engine is DatabaseEngine.ORACLE:
774 from chart.db.drivers import oracle_lock
775 locker = oracle_lock.Lock('CHART.jobs_table_lock')
776
777 else:
778 locker = None
779
780 tidy_up(worker_name=args.worker_name)
781
782 if args.category is None:
783 parser.error('No --category specified')
784
785 if args.show_first_job:
786 print(select_next_job(category=args.category.upper(),
787 only_activities=only_activities,
788 exclude_activities=exclude_activities,
789 only_filenames=args.only_filenames,
790 only_ingestion=args.only_ingestion,
791 exclude_ingestion=args.exclude_ingestion))
792 parser.exit()
793
794 if args.work_area:
795 work_area = args.work_area.expand()
796
797 elif settings.WORK_AREA:
798 work_area = settings.WORK_AREA
799
800 else:
801 work_area = make_working_directory('worker')
802
803 ensure_dir_exists(work_area)
804
805 run(args.worker_name,
806 args.category.upper(),
807 work_area_base=work_area,
808 batch_size=args.batch_size,
809 only_activities=only_activities,
810 only_filenames=args.only_filenames,
811 exclude_activities=exclude_activities,
812 only_ingestion=args.only_ingestion,
813 exclude_ingestion=args.exclude_ingestion,
814 jobs_lock=locker,
815 exit_on_idle=not args.loop)
816
817 # run_forever(worker_name=args.worker_name,
818 # category=args.category.upper(),
819 # work_area_base=args.work_area,
820 # batch_size=args.batch_size,
821 # only_activities=
822 # only_filenames=None, # args.only_filenames,
823 # only_ingestion=args.only_ingestion,
824 # exclude_ingestion=args.exclude_ingestion,
825 # exclude_activities=args.exclude_activities,
826 # jobs_lock=db.Lock('CHART.jobs_table_lock'))
827
828if __name__ == '__main__':
829 main()