1#!/usr/bin/env python3
2
3"""Tool for manually examining or modifying the JOBS table, containing pending,
4in progress or failed jobs."""
5
6import os
7import sys
8import tty
9import glob
10import fnmatch
11import logging
12import termios
13from datetime import datetime, timedelta
14from copy import copy
15
16from chart.common.path import Path
17from chart.alg.settings import LOG_FILENAME
18from chart.alg.settings import WORKORDER_FILENAME
19from chart.alg.settings import RESULT_FILENAME
20from chart.project import settings
21from chart.common.xml import load_xml
22from chart.common.args import ArgumentParser
23from chart.backend.activity import Activity
24from chart.common.prettyprint import Table
25from chart.common.prettyprint import show_time_m
26from chart.common.prettyprint import show_timedelta_relaxed
27from chart.backend import worker
28from chart.backend.worker import make_working_directory
29from chart.common.exceptions import ConfigError
30from chart.backend.jobs import delete_jobs
31from chart.backend.jobs import find_jobs
32from chart.backend.job import job_retrieve_fields
33from chart.backend.job import Job
34from chart.db.func import ANY
35from chart.db.func import Min, Max, Count
36from chart.db.connection import db_connect
37from chart.common.traits import name_of_thing
38from chart.common.xml import xml_to_timedelta
39from chart.backend.processes import find_processes
40from chart.backend.job import JobStatus
41from chart.backend.jobs import update_jobs
42from chart.backend.jobcreator import JobCreator
43from chart.common.timerange import TimeRange
44from chart.backend.jobs import add_job
45from chart.project import SID
46from chart.db.model.table import TableInfo
47from chart.db.db_tool import keyboard_confirm
48from chart.products.fdf.orbit import NoSuchOrbit
49from chart.backend.result import Result
50from chart.backend.workorder import WorkOrder
51from chart.alg import settings as alg_settings
52from chart.products.fdf.orbit import get_orbit_times
53from chart.common.xml import datetime_to_xml
54from chart.backend.jobcreator import ProductAttributes
55
56db_conn = db_connect() # JOBS, PROCESSES
57
58
59# colouring for the jobs table
60STATUS_COLOURS = {
61 'FAILED': 'red',
62 'IN_PROGRESS': 'blue',
63 'TIMEOUT': 'magenta',
64 'RETRY': 'magenta',
65 'COMPLETED': 'cyan',
66 None: 'yellow',
67}
68
69# Jobs marked IN_PROGRESS but which have been running for more than
70# OLD_THRESHOLD are considered dead
71OLD_THRESHOLD = timedelta(hours=1)
72# OLD_THRESHOLD = timedelta(days=1)
73
74
75def draw_jobs_table(title, jobs, target=sys.stdout):
76 """Draw a nicely formatted table of a list of jobs.
77 Jobs may come from the database in which case they have `id` and
78 `gen_time` attributes or may come from code.
79 """
80
81 jobs = list(jobs) # in case its a generator
82 headings = ['Status', 'Activity', 'Attributes']
83 if len(jobs) > 0 and jobs[0].job_id is not None:
84 headings = ['Id', 'GenTime'] + headings
85
86 t = Table(
87 title=title,
88 headings=headings,
89 header_underline='=',
90 header_blanklines=1,
91 cross=' ',
92 column_split=' ',
93 header_split=' ',
94 )
95 # border_fg_col='cyan')
96
97 for job in jobs:
98 if job.filename is not None:
99 att = job.filename
100
101 else:
102 att = ''
103 if job.sid is not None:
104 att += '{s} '.format(s=job.sid)
105
106 if job.tablename is not None:
107 att += job.tablename + ' '
108
109 if job.orbit is not None:
110 att += 'orb {orbit} from {range}'.format(
111 orbit=job.orbit,
112 range=TimeRange(job.sensing_start, job.sensing_stop),
113 )
114
115 elif job.sensing_start is not None:
116 att += '{start} to {stop}'.format(
117 start=job.sensing_start, stop=job.sensing_stop
118 )
119
120 if job.earliest_execution_time is not None:
121 att += ' after ' + show_time_m(job.earliest_execution_time)
122
123 row = [
124 {'text': job.status.name, 'markup': job.status.name},
125 {'text': job.activity.name, 'markup': 'activity'},
126 att,
127 ]
128 if job.job_id is not None:
129 row = [
130 {'text': job.job_id, 'markup': 'id'},
131 show_time_m(job.gen_time),
132 ] + row
133
134 t.append(row)
135
136 t.write(target)
137
138
139def list_jobs(category,
140 activities,
141 start_time,
142 stop_time,
143 sid=None,
144 status=None,
145 table=None,
146 filename=None,
147 orbit=None,
148 only_missing_files=False):
149 """Display JOBS table entries.
150
151 If `only_missing_files` is set then only display jobs with a filename that is missing.
152 """
153 jobs = []
154 for row in find_jobs(fields=job_retrieve_fields(),
155 status=status,
156 category=category,
157 activity=activities,
158 sid=sid,
159 # gen_time_ge=start_time,
160 # gen_time_lt=stop_time,
161 sensing_start_ge=start_time,
162 sensing_start_lt=stop_time,
163 # don't use gen_time tests otherwise --list and --rescheule
164 # operate on different jobs
165 # sensing_start_or_gen_time_ge=start_time,
166 # sensing_start_or_gen_time_lt=stop_time,
167 tablename=table,
168 filename_like=filename,
169 orbit=orbit,
170 order_by=('activity', 'sensing_start', 'id')):
171 job = Job(row)
172 if only_missing_files and (job.filename is None or job.filename.exists()):
173 continue
174
175 jobs.append(job)
176
177 draw_jobs_table('Jobs', jobs)
178
179
180def list_jobs_by_process(execute_start=None,
181 execute_stop=None,
182 activity=None):
183 """A rather basic display showing the jobs performed across all workers for a time range."""
184 print('Jobs by process')
185 last_worker = None
186 # Find all processes in time range
187 for p in find_processes(fields=('id',
188 'worker',
189 'execute_start',
190 'execute_stop',
191 'status',
192 'working_dir'),
193 execute_start_ge=execute_start,
194 execute_stop_le=execute_stop,
195 ordering=('worker', 'execute_start')):
196 # Print a divider line between different processes
197 if p[1] != last_worker:
198 last_worker = p[1]
199 print(last_worker)
200
201 # Print process submmary
202 print(' ',datetime_to_xml(p[2]), datetime_to_xml(p[3]))
203 # Find jobs in the batch, filtering by activity type
204 for j in find_jobs(fields=('activity', 'sensing_start', 'sensing_stop', 'status'),
205 process_id=p[0],
206 activity=activity):
207 # Basic display of job
208 print(' ',j[0], j[1], j[2], j[3])
209
210
211
212def add_or_sim_job(job, simulate=False): # , return_id=False):
213 """Utility function to either add a job to the database or just display it.
214
215 Returns:
216 None if job was discarded for being a duplicate, True if added.
217 """
218 if simulate:
219 logging.info(
220 'Simulating {cat} {act} {params}'.format(
221 cat=job.category,
222 act=job.activity,
223 params=', '.join(
224 '{k}:{v}'.format(k=k, v=v)
225 for k, v in job.items()
226 if k not in ('category', 'activity') and v is not None
227 ),
228 )
229 )
230 return True, None
231
232 else:
233 return add_job(job) # , return_id=return_id)
234
235
236def find_activity_and_parser(filename):
237 """For a given filename, try to find a suitable Activity to ingest it."""
238 if hasattr(find_activity_and_parser, 'last_pattern') and fnmatch.fnmatch(
239 filename.name, getattr(find_activity_and_parser, 'last_pattern')
240 ):
241 return getattr(find_activity_and_parser, 'last_activity')
242
243 for jobcreator in JobCreator.all():
244 trigger = jobcreator.trigger
245 if 'pattern' not in trigger:
246 continue
247
248 pattern = trigger['pattern']
249 if fnmatch.fnmatch(filename.name, pattern + '*'):
250 activity = list(jobcreator.responses())[0]['activity']
251 parser = trigger['parser']
252 result = (activity, parser)
253 setattr(find_activity_and_parser, 'last_activity', result)
254 return result
255
256 return None
257
258
259def make_jobs(
260 category, # (too many arguments) pylint: disable=R0913
261 activities=None,
262 filenames=None,
263 sid=None,
264 orbit=None,
265 start_time=None,
266 stop_time=None,
267 table=None,
268 hourly=False,
269 orbital=False,
270 daily=False,
271 weekly=False,
272 monthly=False,
273 daynight=False,
274 tod=None,
275 status=JobStatus.PENDING,
276 earliest_execution_time=None,
277):
278 """Insert one (or multiple, if ingestion) jobs to the JOBS table.
279 This is only used for manual adding of jobs, never by the scheduler or worker.
280 This is a high-level function that can do some parameter substitution and
281 may actually create multiple jobs.
282 """
283 res = []
284
285 if filenames is not None:
286 if orbit is not None or start_time is not None or stop_time is not None:
287 raise ValueError(
288 'If a filename job is added do not specify --orbit, '
289 '--start-time or --stop-time'
290 )
291
292 for f in filenames:
293 f = Path(f)
294 norm_f = f.absolute()
295 if activities is None or len(activities) == 0:
296 a_p = find_activity_and_parser(f)
297 if a_p is None:
298 raise ConfigError(
299 'Activity cannot be guessed for filename {filename}'.format(
300 filename=f
301 )
302 )
303 a, p = a_p
304
305 elif len(activities) == 1:
306 a = activities[0]
307
308 else:
309 raise ValueError('Must specify activity for ingestion jobs')
310
311 # if its a PDU run it through the distiller orbit extraction code to
312 # insert entries in the ORBITS table.
313 # if a == 'PDU_INGESTER':
314 # prod = eps.EPSReader(f)
315 # distiller.orbit_check(prod)
316
317 # Call the scheduler file parser function to read SID and sensing time from
318 # the file
319 attributes = p(f)
320 sensing_start = None
321 if isinstance(attributes, ProductAttributes):
322 if sid is None:
323 sid = attributes.sid
324
325 sensing_start = attributes.sensing_start
326
327 res.append(
328 Job(
329 category=category,
330 activity=a,
331 sensing_start=sensing_start,
332 filename=norm_f,
333 sid=sid,
334 status=status,
335 )
336 )
337
338 return res
339
340 if activities is None:
341 raise ValueError(
342 'Activity name must be specified unless adding an ingestion job'
343 )
344
345 # activity = Activity(activityname)
346
347 if orbit is not None:
348 # adding a single orbit for an orbital activity
349 if start_time is not None or stop_time is not None:
350 raise ValueError(
351 'Cannot use either --start or --stop if an --orbit is specified'
352 )
353
354 if orbital or daynight:
355 raise ValueError('Cannot use --orbit together with --orbital or --daynight')
356
357 if sid is None:
358 raise ValueError('Must specify SID when adding an orbital algorithm')
359
360 times = get_orbit_times(sid, orbit)
361 if times is None:
362 raise ValueError('Unknown orbit')
363
364 job = Job(
365 category=category,
366 sid=sid,
367 orbit=orbit,
368 sensing_start=times[0],
369 sensing_stop=times[1],
370 status=status,
371 )
372
373 if table is not None:
374 job.tablename = table.name
375
376 res.append(job)
377
378 else:
379 if hourly + orbital + daily + daynight + weekly + monthly > 1:
380 raise ValueError(
381 'Can only use one of --orbital, --daily, --weekly, '
382 '--monthly and --daynight'
383 )
384
385 if orbital:
386 if sid is None:
387 raise ValueError('SID must be specified for per-orbit jobs')
388
389 # add one job per orbit within the given timerange
390 start_orbit = sid.orbit.find_orbit(start_time)
391 stop_orbit = sid.orbit.find_orbit(stop_time)
392
393 if start_orbit is None or stop_orbit is None:
394 raise ValueError('No orbits known for {sid}'.format(sid=sid))
395
396 for o in range(start_orbit, stop_orbit + 1):
397 try:
398 start, stop = sid.orbit.get_orbit_times(o)
399 except NoSuchOrbit:
400 logging.error('Orbit {o} not found'.format(o=o))
401 continue
402
403 job = Job(
404 category=category,
405 sid=sid,
406 orbit=o,
407 sensing_start=start,
408 sensing_stop=stop,
409 status=status,
410 )
411
412 if table is not None:
413 job.tablename = table
414
415 res.append(job)
416
417 elif hourly:
418 while start_time < stop_time:
419 job = Job(
420 category=category,
421 sid=sid,
422 sensing_start=start_time,
423 sensing_stop=start_time + timedelta(hours=1),
424 status=status,
425 )
426
427 if table is not None:
428 job.tablename = table
429
430 res.append(job)
431
432 start_time += timedelta(hours=1)
433
434 elif daily:
435 # clip start time to start of that day
436 start_time = datetime(start_time.year, start_time.month, start_time.day)
437 if tod is not None:
438 start_time += tod
439 while start_time < stop_time:
440 job = Job(
441 category=category,
442 sid=sid,
443 sensing_start=start_time,
444 sensing_stop=start_time + timedelta(days=1),
445 status=status,
446 )
447
448 if table is not None:
449 job.tablename = table.name
450
451 res.append(job)
452
453 start_time += timedelta(days=1)
454
455 elif weekly:
456 DAYS_PER_WEEK = 7
457 if start_time.weekday() != 0:
458 start_time += timedelta(days=DAYS_PER_WEEK - start_time.weekday())
459
460 while True:
461 report_end_time = start_time + timedelta(days=DAYS_PER_WEEK)
462
463 if report_end_time > stop_time:
464 break
465
466 job = Job(
467 category=category,
468 sid=sid,
469 sensing_start=start_time,
470 sensing_stop=report_end_time,
471 status=status,
472 )
473
474 if table is not None:
475 job.tablename = table.name
476
477 res.append(job)
478
479 start_time = report_end_time
480
481 elif monthly:
482 while True:
483 if start_time.month < 12:
484 report_stop_time = start_time.replace(month=start_time.month + 1)
485
486 else:
487 report_stop_time = start_time.replace(
488 month=1, year=start_time.year + 1
489 )
490 # print(start_time, report_stop_time)
491
492 if report_stop_time > stop_time:
493 break
494
495 job = Job(
496 category=category,
497 sid=sid,
498 sensing_start=start_time,
499 sensing_stop=report_stop_time,
500 status=status,
501 )
502
503 if table is not None:
504 job.tablename = table.name
505
506 res.append(job)
507
508 start_time = report_stop_time
509
510 elif daynight:
511 raise NotImplementedError()
512 # logging.info('activity ignored')
513 # for region in worker.find_daynight_regions(sid, start_time, stop_time):
514 # job = Job(category=category,
515 # activity='DAY_STATS' if region['type'] == 'day' else 'NIGHT_STATS',
516 # sid=sid,
517 # tablename=table.name,
518 # orbit=region['orbit'],
519 # sensing_start=region['start'],
520 # sensing_stop=region['stop'])
521
522 # res.append(job)
523
524 # return res
525
526 else:
527 # just add a single job for the given time range
528 job = Job(
529 category=category,
530 sid=sid,
531 sensing_start=start_time,
532 sensing_stop=stop_time,
533 status=status
534 )
535
536 if table is not None:
537 job.tablename = table
538
539 res.append(job)
540
541 # now we have a basic list of jobs, with no activities set
542 jobs_with_activities = []
543 for job in res:
544 for a in activities:
545 job_with_activity = copy(job)
546 job_with_activity.activity = a
547 job_with_activity.earliest_execution_time = earliest_execution_time
548 jobs_with_activities.append(job_with_activity)
549
550 return jobs_with_activities
551
552
553def retrieve_job(jobid):
554 """Should be unused now."""
555 row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
556
557 return Job(row=row)
558
559
560def show_single_job(jobid, show_job, show_process):
561 """Display information from the JOBS and PROCESSES tables for `jobid`.
562 Also display process log if available and computes derived jobs.
563 If `show_job` is set show the JOBS table fields only.
564 """
565
566 row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
567 if row is None:
568 raise ValueError('No such job {id}'.format(id=jobid))
569
570 job = Job(row=row)
571
572 if show_job or not show_process:
573 t = Table(title='Single job')
574 for f in (
575 'Job_ID',
576 'Category',
577 'Status',
578 'Gen_time',
579 'Activity',
580 'Filename',
581 'SID',
582 'Orbit',
583 'Sensing_start',
584 'Sensing_stop',
585 'Earliest_execution_time',
586 'Process_ID',
587 'Parent',
588 'Tablename',
589 ):
590 t.append((f, name_of_thing(getattr(job, f.lower()))))
591
592 t.write()
593
594 # if job.status is None:
595 # print('')
596 # draw_jobs_table('Related jobs',
597 # worker.find_similar_jobs(job, 10, datetime(2000, 1, 1)))
598
599 if job.process_id is not None and (show_process or not show_job):
600 print('')
601 show_proc(proc_id=job.process_id, show_log=not show_job and not show_process)
602
603
604def show_proc(proc_id, show_log):
605 """Show all information for process `proc_id`."""
606
607 t = Table(title='Process ' + str(proc_id), headings=('Attribute', 'Value'))
608
609 row = find_processes(
610 fields=(
611 'WORKER',
612 'PID',
613 'EXECUTE_START',
614 'EXECUTE_STOP',
615 'STATUS',
616 'WORKING_DIR',
617 ),
618 process_id=proc_id,
619 ).fetchone()
620
621 if row is None:
622 raise ValueError('No such process id {id}'.format(id=proc_id))
623
624 for kv in zip(
625 ('Worker', 'PID', 'Execute start', 'Execute stop', 'Status', 'Work dir'), row
626 ):
627 t.append(kv)
628
629 jobs_brief = find_jobs(fields=('ID', 'ACTIVITY'), process_id=proc_id).fetchall()
630
631 if len(jobs_brief) > 0:
632 t.append(
633 (
634 'Computed work dir',
635 os.path.join(
636 row[2].date().strftime('%Y%m%d'),
637 jobs_brief[0][1] + '_' + str(proc_id),
638 ),
639 )
640 )
641
642 t.write()
643
644 if not show_log:
645 return
646
647 print('')
648 draw_jobs_table('Jobs', [retrieve_job(job[0]) for job in jobs_brief])
649
650 print('\nLog\n===\n')
651 if row[5] is None:
652 print('Working dir not set')
653
654 else:
655 wd_path = Path(row[5])
656 log_path = wd_path.child(LOG_FILENAME)
657 if log_path.exists():
658 print(open(str(log_path), 'r').read())
659
660 else:
661 print('Log file {f} not found'.format(f=log_path))
662
663
664def show_summary(
665 category,
666 # activity,
667 status,
668 sid,
669 start_time,
670 stop_time,
671):
672 """Display a summary table for a range of jobs showing counts of jobs
673 grouped by status, activity and sid."""
674 t = Table(
675 title='Summary of jobs',
676 headings=(
677 'Activity',
678 'SID',
679 'Status',
680 'Count',
681 'Max gen time',
682 'Max earliest exec time',
683 ),
684 )
685
686 last_activity = None
687
688 for row in find_jobs(
689 fields=[
690 'ACTIVITY',
691 'STATUS',
692 'count(*)',
693 'max(GEN_TIME)',
694 'max(EARLIEST_EXECUTION_TIME)',
695 ]
696 + SID.sql_sys_select('JOBS'),
697 # fields=('ACTIVITY', 'SCID', 'STATUS', 'count(*)', 'max(GEN_TIME)',
698 # 'max(EARLIEST_EXECUTION_TIME)'),
699 category=category,
700 status=status,
701 sid=sid,
702 sensing_start_or_gen_time_ge=start_time,
703 sensing_start_or_gen_time_lt=stop_time,
704 order_by=['ACTIVITY'] + SID.sql_sys_select('JOBS') + ['STATUS'],
705 group_by=['ACTIVITY'] + SID.sql_sys_select('JOBS') + ['STATUS'],
706 ):
707 activity = row[0]
708 status = row[1]
709 count = row[2]
710 max_gen_time = row[3]
711 max_early = row[4]
712 sid = SID.from_sys_select('JOBS', row[5:])
713
714 if activity == last_activity:
715 activity = ''
716
717 else:
718 activity = {'text': activity, 'markup': 'activity'}
719
720 status = {'text': status, 'markup': status}
721 # print_row[2] = {'colour': STATUS_COLOURS.get(print_row[2]), 'text': print_row[2]}
722
723 t.append((activity, sid, status, count, max_gen_time, max_early))
724
725 last_activity = activity
726
727 t.write()
728
729
730def show_global_summary():
731 """Show brief summary of all categories."""
732 for category, count in find_jobs(
733 fields=('CATEGORY', Count()), group_by='CATEGORY', order_by='CATEGORY'
734 ):
735
736 t = Table(title='Category {cat}'.format(cat=category))
737 t.append(('Total jobs', count))
738 for status, count in find_jobs(
739 fields=('STATUS', Count()),
740 category=category,
741 group_by='STATUS',
742 order_by='STATUS',
743 ):
744
745 if status is None:
746 status = JobStatus.PENDING
747 count = find_jobs(
748 fields=(Count(),), category=category, status=status,
749 ).fetchone()[0]
750
751 elif status == JobStatus.IN_PROGRESS.name:
752 old_cutoff = datetime.utcnow() - OLD_THRESHOLD
753 old_count = db_conn.query(
754 "SELECT count(*) "
755 "FROM jobs, processes "
756 "WHERE jobs.process_id=processes.id "
757 "AND jobs.category=:category "
758 "AND jobs.status='IN_PROGRESS' "
759 "AND processes.execute_start<:cutoff",
760 category=category,
761 cutoff=old_cutoff,
762 ).fetchone()[0]
763 count = '{count} ({old_count} jobs are probably orphaned)'.format(
764 count=count, old_count=old_count
765 )
766
767 t.append(('{status} jobs'.format(status=status), count))
768
769 early, late = find_jobs(
770 fields=(Min('GEN_TIME'), Max('GEN_TIME')), category=category
771 ).fetchone()
772 t.append(('Earliest gen_time', early))
773 t.append(('Latest gen_time', late))
774 t.write()
775
776
777def cleanup(category):
778 """Convert TIMEOUT, RETRY and old IN_PROGRESS jobs to PENDING (null status)."""
779 retry = update_jobs(category=category, status=JobStatus.RETRY, set_status=None)
780 if retry > 0:
781 logging.info('Changed {cc} jobs from RETRY to PENDING'.format(cc=retry))
782
783 timeout = update_jobs(category=category, status=JobStatus.TIMEOUT, set_status=None)
784 if timeout > 0:
785 logging.info('Changed {cc} jobs from TIMEOUT to PENDING'.format(cc=timeout))
786
787 total_in_progress = find_jobs(
788 fields=(Count(),), category=category, status=JobStatus.IN_PROGRESS
789 ).fetchone()[0]
790 in_progress = 0
791 for (jobid,) in db_conn.query(
792 "SELECT jobs.id "
793 "FROM jobs, processes "
794 "WHERE category=:category "
795 "AND jobs.process_id=processes.id "
796 "AND jobs.status='IN_PROGRESS' "
797 "AND processes.execute_start<:cutoff",
798 category=category,
799 cutoff=datetime.utcnow() - OLD_THRESHOLD,
800 ):
801 # logging.debug('Not changing job {i}'.format(i=jobid))
802 update_jobs(job_id=jobid, set_status=None, commit=False)
803 in_progress += 1
804
805 if (total_in_progress - in_progress) > 0:
806 logging.info(
807 '{cc} jobs are IN_PROGRESS and started recently (within {threshold})'.format(
808 cc=total_in_progress - in_progress,
809 threshold=show_timedelta_relaxed(OLD_THRESHOLD),
810 )
811 )
812
813 if in_progress > 0:
814 logging.info(
815 'Changed {cc} jobs from IN_PROGRESS to PENDING'.format(cc=in_progress)
816 )
817
818 if retry == 0 and timeout == 0 and in_progress == 0:
819 logging.info('No jobs to be cleaned up')
820
821 failed = find_jobs(
822 fields=(Count(),), category=category, status=JobStatus.FAILED
823 ).fetchone()[0]
824 if failed > 0:
825 logging.info('{cc} FAILED jobs were not modified'.format(cc=failed))
826
827 db_conn.commit()
828
829
830def purge(category, min_time):
831 """Delete all jobs in `category` older than `min_time`."""
832
833 count = delete_jobs(category=category, gen_time_lt=min_time)
834 logging.info(
835 'Deleted {cc} jobs older than {cutoff}'.format(cc=count, cutoff=min_time)
836 )
837
838
839def show_next_job(category, batch=False, target=sys.stdout):
840 """Display the next job [batch] that the worker would run."""
841 target.write('Next job to be executed in category {cat}:\n'.format(cat=category))
842 job = worker.select_next_job(category)
843 if job is None:
844 target.write('No upcoming jobs\n')
845 return
846
847 target.write('{job}\n'.format(job=job))
848
849 if batch:
850 draw_jobs_table(
851 'Batch',
852 worker.find_similar_jobs(job, count=worker.DEFAULT_BATCH_SIZE),
853 target,
854 )
855
856
857def reschedule(
858 job_ids,
859 category=None,
860 status=None,
861 activity=None,
862 table=None,
863 sid=None,
864 start_time=None,
865 stop_time=None,
866 filename=None,
867 orbit=None,
868):
869 """Reset job statuses to null."""
870 if name_of_thing(activity).endswith('_STATS') and table is None:
871 print('Warning: STATS job selected but no table filter')
872
873 # if is_listlike(activity):
874 # if len(activity) == 1:
875 # activity = activity[0]
876 # else:
877 # raise ValueError('Use only one activity for reschedule')
878 # if activity is not None:
879 # activity = Activity(activity)
880
881 if len(job_ids) > 0:
882 cc = update_jobs(job_id=job_ids, set_status=JobStatus.PENDING, commit=False)
883
884 else:
885 cc = update_jobs(
886 category=category,
887 activity=activity,
888 status=status,
889 table=table,
890 sid=sid,
891 sensing_start_ge=start_time,
892 sensing_start_lt=stop_time,
893 filename=filename,
894 orbit=orbit,
895 set_status=JobStatus.PENDING,
896 commit=False,
897 )
898
899 print('This command will update {cc} jobs'.format(cc=cc))
900 if keyboard_confirm():
901 print('Committing')
902 db_conn.commit()
903
904 else:
905 print('Not committing')
906
907
908def expand_activities(activities):
909 """Expand a wildcard such as --activity *_REPORT to a list of Activity objects."""
910 result = None
911 if activities is None:
912 return []
913
914 result = set()
915 for name in activities:
916 if '%' in name:
917 # raise ValueError('Use "*" for wildcards')
918 # allow SQL-style wildcards
919 name = name.replace('%', '*')
920
921 if '-' in name:
922 print('Found \'-\' character in activity name, did you mean \'_\' instead?')
923
924 if '*' in name or '?' in name or '[' in name:
925 matches = 0
926 for a in Activity.all():
927 if a.enabled and fnmatch.fnmatch(a.name, name.upper()):
928 result.add(a)
929 matches += 1
930
931 if matches == 0:
932 raise ValueError('No matches found for {name}'.format(name=name))
933
934 else:
935 try:
936 activity = Activity(name)
937 except Activity.UnknownActivity:
938 raise ValueError('Unknown activity {act}'.format(act=name))
939
940 result.add(activity)
941
942 return list(result)
943
944
945def show_derived_jobs(working_dir, target=sys.stdout):
946 """Compute and show but not run derived jobs."""
947 if not working_dir.is_dir():
948 raise ValueError('Input value must be a working directory containing '
949 'work order and result XML files')
950
951 result_file = Result(working_dir.joinpath(alg_settings.RESULT_FILENAME))
952 wo_file = WorkOrder(working_dir.joinpath(alg_settings.WORKORDER_FILENAME))
953 wo_jobs = {}
954 for wo_job in wo_file.read_jobs():
955 print('wo job', wo_job)
956 wo_jobs[wo_job.job_id] = wo_job
957
958 for job in result_file.read_jobs():
959 print('result job', job)
960 job.filename = wo_jobs.get(job.job_id).filename
961 # print(res)
962 # assert wo.job_id == res.job_id
963 # wo.status = res.status
964 # wo.activity = res.activity
965 # wo.aux_outputs = res.aux_outputs
966 # wo.tables = res.tables
967 target.write('Examining job {j} for derived:\n'.format(j=job))
968 # target.write('Examining job {id} {act}:'.format(id=wo.job_id, act=wo.activity))
969 # count = 0
970 for derived in worker.find_derived_jobs(job):
971 # print(' derived {job}'.format(job=der))
972 # count += 1
973 target.write(' {d}\n'.format(d=derived))
974
975 # if count == 0:
976 # print(' no derived jobs')
977
978
979def main():
980 """Command line entry point."""
981 parser = ArgumentParser()
982 category = parser.add_argument_group('Category')
983 category.add_argument(
984 '--category',
985 default=settings.DEFAULT_CATEGORY,
986 help='Job category. All other categories will be ignored.',
987 )
988
989 actions = parser.add_argument_group('Actions', 'Actions to perform')
990 actions.add_argument(
991 '--list',
992 '-l',
993 const=None,
994 nargs='?',
995 metavar='STATUS',
996 help='Display current jobs, optionally filtering by STATUS',
997 )
998 actions.add_argument('--show', '--info', type=int, help='Display a single job')
999 actions.add_argument(
1000 '--show-derived',
1001 type=ArgumentParser.input_dirname,
1002 metavar='RESULT',
1003 help=(
1004 'Compute and display derived jobs from work order and result xml file '
1005 'within working directory'
1006 ),
1007 ),
1008 # actions.add_argument('--delete', '--delete-job',
1009 # type=int,
1010 # nargs='*',
1011 # help='Delete one or more jobs')
1012 actions.add_argument(
1013 '--summary',
1014 action='store_true',
1015 help='Show a summary view of a single category',
1016 )
1017 actions.add_argument(
1018 '--all-summary',
1019 action='store_true',
1020 help='Show a global summary over all categories',
1021 )
1022 # actions.add_argument('--global-summary', '--all-summary',
1023 # action='store_true',
1024 # help='Show general status for all categories. No other flags can be '
1025 # 'used.')
1026 actions.add_argument(
1027 '--cleanup',
1028 action='store_true',
1029 help='Convert RETRY, TIMEOUT and IN_PROGRESS jobs more than {dur} old to '
1030 'PENDING'.format(dur=OLD_THRESHOLD),
1031 )
1032 actions.add_argument(
1033 '--db', metavar='CONNECTION', help='Use database connection CONNECTION'
1034 )
1035 actions.add_argument('--project-id',
1036 type=int,
1037 metavar='ID',
1038 help='Force PROJECT for multi-project JOBS table function')
1039 actions.add_argument(
1040 '--purge',
1041 metavar='AGE',
1042 help='Delete all jobs in category where generation time is older than AGE',
1043 type=ArgumentParser.datetime,
1044 )
1045
1046 filter_group = parser.add_argument_group('Filter', 'Filtering results')
1047 filter_group.add_argument('--execute-start',
1048 type=ArgumentParser.start_time,
1049 help='Filter for executed jobs')
1050 filter_group.add_argument('--execute-stop',
1051 type=ArgumentParser.stop_time,
1052 help='Filter for executed jobs')
1053
1054 add = parser.add_argument_group('Add', 'Adding jobs')
1055 add.add_argument(
1056 '--add',
1057 # action='store_true',
1058 const=True,
1059 nargs='?',
1060 help='Create new job(s)',
1061 )
1062
1063 add.add_argument(
1064 '--hourly',
1065 action='store_true',
1066 help='If a time range is specified add one job per day',
1067 )
1068 add.add_argument(
1069 '--orbital',
1070 action='store_true',
1071 help='If a time range is specified add one job per orbit',
1072 )
1073 add.add_argument(
1074 '--daily',
1075 action='store_true',
1076 help='If a time range is specified add one job per day',
1077 )
1078 add.add_argument(
1079 '--tod',
1080 # type=timedelta,
1081 help='For daily only specify time of day, if not midnight',
1082 )
1083 add.add_argument(
1084 '--weekly',
1085 action='store_true',
1086 help='If a time range is specified add one job per week at Monday 00:00',
1087 )
1088 add.add_argument(
1089 '--monthly',
1090 action='store_true',
1091 help='If a time range is specified add one job per month on the 1st, 00:00',
1092 )
1093 add.add_argument(
1094 '--daynight', action='store_true', help='Insert jobs for day/night regions'
1095 )
1096 add.add_argument(
1097 '--no-prompt', action='store_true', help='Do not prompt before creating jobs'
1098 )
1099 add.add_argument(
1100 '--earliest-execution-time',
1101 help='Set the earliest execution time for the jobs',
1102 )
1103
1104 run = parser.add_argument_group('Run', 'Options for executing jobs')
1105 run.add_argument(
1106 '--run',
1107 type=int,
1108 metavar='JOBID',
1109 nargs='*',
1110 help='Process JOBID. Derived jobs will be added to the jobs table '
1111 'but not processed',
1112 )
1113 # run.add_argument('--run-all',
1114 # action='store_true',
1115 # help='Process all jobs of the current category, including any '
1116 # 'derived jobs')
1117 # run.add_argument('--batch-size',
1118 # type=int,
1119 # default=DEFAULT_BATCH_SIZE,
1120 # help='Number of jobs to batch together for --run-all option')
1121 run.add_argument(
1122 '--outdir',
1123 '-o',
1124 type=ArgumentParser.output_dir,
1125 help='Working directory, if processing is done',
1126 )
1127 run.add_argument(
1128 '--worker-name',
1129 '--worker',
1130 default='manual',
1131 help="Worker name to be placed into the 'worker' field "
1132 "of the processes table",
1133 )
1134 run.add_argument(
1135 '--replace',
1136 action='store_true',
1137 help='Tell the ingester process to delete existing data before insertion',
1138 )
1139 run.add_argument(
1140 '--reschedule',
1141 type=int,
1142 nargs='*',
1143 # const=True,
1144 metavar='JOBID',
1145 help='Clear the status of JOBID(s) so they will be re-run by the worker',
1146 )
1147 run.add_argument(
1148 '--next', action='store_true', help='Just show the next job to be performed'
1149 )
1150 run.add_argument(
1151 '--batch',
1152 action='store_true',
1153 help=(
1154 'If --next-job is selected, show the entire batch instead of a single '
1155 'job'
1156 ),
1157 )
1158 run.add_argument(
1159 '--reingest',
1160 help='Force some algorithms to delete data before ingesting',
1161 action='store_true',
1162 )
1163
1164 properties = parser.add_argument_group(
1165 'Properties',
1166 'Job properties when adding or showing jobs')
1167 properties.add_argument('--sid', '-s',
1168 type=ArgumentParser.sid,
1169 # default=SID.SIDLESS,
1170 help='Specify SID when adding a job')
1171 properties.add_argument('--filename',
1172 # type=ArgumentParser.input_filename,
1173 nargs='*',
1174 help='Specify one or more filenames when adding a job, or as a search '
1175 'string')
1176 properties.add_argument('--activity',
1177 nargs='+',
1178 help='Specify activity name when adding a job')
1179 properties.add_argument('--orbit',
1180 type=int,
1181 help='Orbit number if adding a job for a per-orbit activity')
1182 properties.add_argument('--start-time', '--start',
1183 type=ArgumentParser.start_time,
1184 metavar='START',
1185 help=('Start time if adding a time-based job. When filtering START is '
1186 'applied to job sensing_start if set otherwise gen_time'))
1187 properties.add_argument('--stop-time', '--stop',
1188 metavar='STOP',
1189 type=ArgumentParser.stop_time,
1190 help='Stop time if adding a time-based job if needed')
1191 properties.add_argument('--table', '--tablename',
1192 # type=ArgumentParser.table,
1193 help='destination tablename if adding an orbital stats job')
1194 properties.add_argument('--status',
1195 metavar='STATUS',
1196 default=None,
1197 help='Only show jobs with status of STATUS '
1198 '(use "none" for pending jobs')
1199 properties.add_argument('--missing',
1200 action='store_true',
1201 help='When listing jobs show only those where the job file is missing')
1202
1203 ctrl_display = parser.add_argument_group(
1204 'Display',
1205 'Control output display')
1206 ctrl_display.add_argument('--job',
1207 action='store_true',
1208 help='When displaying a single job show the JOBS table entries')
1209 ctrl_display.add_argument('--process',
1210 action='store_true',
1211 help='When displaying a single job show the PROCESSES table entries'
1212 'table entries')
1213 args = parser.parse_args()
1214
1215 if args.db:
1216 settings.set_db_name(args.db)
1217
1218 if args.project_id:
1219 settings.DATABASE_PROJECT_ID = args.project_id
1220
1221 if args.table is not None and args.table[0] != '%':
1222 # throw error if invalid
1223 TableInfo(args.table)
1224
1225 if args.replace:
1226 os.environ['CHART_REPLACE'] = '1'
1227
1228 if args.reingest:
1229 from chart.common.env import set_env
1230
1231 set_env('{ENV_PREFIX}REINGEST', True)
1232
1233 # TBD change to proper data type
1234 if args.tod is not None:
1235 args.tod = xml_to_timedelta(args.tod)
1236
1237 if args.all_summary:
1238 # no category needed
1239 show_global_summary()
1240 parser.exit()
1241
1242 # should come before category sanity checks
1243 if args.show_derived:
1244 show_derived_jobs(args.show_derived)
1245 parser.exit()
1246
1247 if (
1248 int(args.show is not None)
1249 + int(args.summary)
1250 + int(args.list is not None)
1251 + int(args.add is not None)
1252 + int(args.reschedule is not None)
1253 > 1
1254 ):
1255 parser.error('Multiple actions specified')
1256
1257 if args.status is None:
1258 status = None
1259
1260 else:
1261 status = JobStatus[args.status.upper()]
1262
1263 if args.category:
1264 args.category = args.category.upper()
1265
1266 if args.reschedule is not None:
1267 reschedule(
1268 job_ids=args.reschedule,
1269 category=args.category,
1270 status=status,
1271 table=args.table,
1272 activity=expand_activities(args.activity),
1273 sid=args.sid,
1274 start_time=args.start_time,
1275 stop_time=args.stop_time,
1276 filename=args.filename[0] if args.filename is not None else None,
1277 orbit=args.orbit,
1278 )
1279 parser.exit()
1280
1281 if args.next:
1282 show_next_job(args.category, args.batch)
1283 parser.exit()
1284
1285 if args.summary:
1286 show_summary(
1287 category=args.category,
1288 status=status,
1289 sid=args.sid,
1290 start_time=args.start_time,
1291 stop_time=args.stop_time,
1292 )
1293
1294 parser.exit()
1295
1296 if args.cleanup:
1297 if args.category is None:
1298 parser.error('Cleanup option requires a category')
1299
1300 cleanup(args.category)
1301 parser.exit()
1302
1303 if args.purge:
1304 if args.category is None:
1305 parser.error('Purge option requires a category')
1306
1307 if args.category == settings.DEFAULT_CATEGORY:
1308 parser.error(
1309 'Cannot purge category {cat}'.format(cat=settings.DEFAULT_CATEGORY)
1310 )
1311
1312 purge(args.category, args.purge)
1313 parser.exit()
1314
1315 if args.outdir and str(args.outdir).isdigit():
1316 parser.error(
1317 'Note the -o option is used to specify output directory name. This '
1318 'cannot be a number since the option could be confused with '
1319 'the --orbit option'
1320 )
1321
1322 if args.list:
1323 if args.filename is not None and len(args.filename) > 1:
1324 parser.error('Can only search for a maximum of 1 filename')
1325
1326 activity = args.activity
1327
1328 if args.list is not ANY:
1329 # the user said --list <x>
1330
1331 if args.list.lower() == 'pending':
1332 status = None
1333
1334 else:
1335 try:
1336 # see if it was --list <status>
1337 status = JobStatus[args.list.upper()]
1338 except KeyError:
1339 # see if it was --list <activity>
1340 Activity(args.list)
1341 # we don't allow --list <activity wildcard>
1342 activity = [args.list]
1343
1344 if isinstance(status, str) and status.lower() == 'none':
1345 status = None
1346
1347 # Show jobs based on execute time from processes table
1348 if args.execute_start or args.execute_stop:
1349 list_jobs_by_process(activity=expand_activities(activity),
1350 execute_start=args.execute_start,
1351 execute_stop=args.execute_stop)
1352 parser.exit()
1353
1354 # Show jobs based on jobs table
1355 list_jobs(category=args.category,
1356 status=status,
1357 activities=expand_activities(activity),
1358 sid=args.sid,
1359 start_time=args.start_time,
1360 stop_time=args.stop_time,
1361 table=args.table,
1362 filename=args.filename[0] if args.filename is not None else None,
1363 orbit=args.orbit,
1364 only_missing_files=args.missing)
1365 parser.exit()
1366
1367 if args.show:
1368 show_single_job(jobid=args.show, show_job=args.job, show_process=args.process)
1369 parser.exit()
1370
1371 if args.run and args.outdir is None:
1372 args.outdir = make_working_directory('jobcontrol')
1373
1374 if args.add is not None:
1375 if args.category is None:
1376 parser.error('Category must be specified')
1377
1378 if args.add is not True:
1379 # user wrote --add xxx
1380 activity = [args.add]
1381
1382 else:
1383 activity = args.activity
1384 # User wrote --add --activity x
1385
1386 if args.outdir is not None:
1387 parser.error('Cannot specify output directory if no jobs are being run')
1388
1389 jobs = make_jobs(
1390 category=args.category,
1391 activities=expand_activities(activity),
1392 filenames=args.filename,
1393 sid=args.sid,
1394 orbit=args.orbit,
1395 start_time=args.start_time,
1396 stop_time=args.stop_time,
1397 table=args.table,
1398 orbital=args.orbital,
1399 daynight=args.daynight,
1400 hourly=args.hourly,
1401 daily=args.daily,
1402 weekly=args.weekly,
1403 monthly=args.monthly,
1404 tod=args.tod,
1405 status=status,
1406 earliest_execution_time=args.earliest_execution_time,
1407 )
1408 logging.info('Will attempt to create {cc} jobs'.format(cc=len(jobs)))
1409
1410 if args.no_prompt or keyboard_confirm('Continue?'):
1411 dups = 0
1412 for j in jobs:
1413 notdup, job_id = add_or_sim_job(j) # , return_id=True)
1414 if not notdup:
1415 dups += 1
1416 logging.info('Duplicate of job {jobid}'.format(jobid=job_id))
1417
1418 else:
1419 logging.info('Created job {jobid}'.format(jobid=job_id))
1420
1421 logging.info(
1422 'Created {total} new jobs of which {dups} were duplicates'.format(
1423 total=len(jobs), dups=dups
1424 )
1425 )
1426 db_conn.commit()
1427
1428 parser.exit()
1429
1430 if args.run:
1431 if args.outdir is None:
1432 parser.error('Specify a working directory with --outdir')
1433
1434 jobs = []
1435 for jobid in args.run:
1436 row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
1437 if row is None:
1438 parser.error('Job {id} not found'.format(id=jobid))
1439
1440 jobs.append(Job(row=row))
1441
1442 worker.process_jobs(args.worker_name, jobs, args.outdir)
1443 parser.exit()
1444
1445 parser.error(
1446 'No action specified (try using --summary, --show, --add, --work, --reschedule '
1447 'or --cleanup)'
1448 )
1449
1450if __name__ == '__main__':
1451 main()