1#!/usr/bin/env python3
2
3"""Command line interface to dispatcher module."""
4
5import os
6from copy import copy
7import logging
8import subprocess
9from datetime import datetime, timedelta
10
11from pathlib import Path
12
13from chart.common.args import ArgumentParser
14from chart.project import settings
15from chart.backend.dispatcher import dispatch_work_order
16from chart.backend.activity import Activity
17from chart.backend.job import Job
18from chart.common.util import ensure_dir_exists
19from chart.backend.result import Result
20from chart.project import SID
21from chart.backend.dispatcher import dispatch
22from chart.backend.processes import ProcessStatus
23from chart.common.xml import xml_to_timedelta
24from chart.backend.activity import MissingAttribute
25from chart.backend.worker import make_working_directory
26from chart.alg.settings import REPORT_FILENAME
27import imp
28
29logger = logging.getLogger()
30
31
32def show_derived(path):
33 """Given a result file `path`, compute and display all the derived jobs it would trigger."""
34 from chart.backend.worker import find_derived_jobs
35 resultfile = Result(path)
36 print('Showing derived jobs for {path}'.format(path=path))
37 for job in resultfile.read_jobs():
38 print(' Examining job {id}'.format(
39 id=job.job_id if job.job_id is not None else '<manual>'))
40 for derived_job in find_derived_jobs(job):
41 print(' {job}'.format(job=derived_job))
42
43 print('All done')
44
45
46def main():
47 """Command line entry point."""
48 parser = ArgumentParser(__doc__)
49
50 conn_group = parser.add_argument_group(
51 'Connection',
52 'Options used to choose db connections')
53 conn_group.add_argument('--db', '--conn', '-d',
54 metavar='CONNECTION',
55 help='Use database connection CONNECTION')
56 conn_group.add_argument('--category',
57 default='MANUAL',
58 help='Category to use for job')
59 conn_group.add_argument('--debug',
60 default=False,
61 action='store_true',
62 help='Instruct algorithm to enable extra diagnostic outputs')
63 conn_group.add_argument('--fake-writes',
64 action='store_true',
65 help='Write to CSV file instead of real tables')
66 conn_group.add_argument('--use-subprocess',
67 action='store_true',
68 help='Run algorithm in subprocess')
69
70 filter_group = parser.add_argument_group('Filtering',
71 'Options used to specify job attributes')
72 filter_group.add_argument('--sensing-start', '--start',
73 type=ArgumentParser.start_time,
74 metavar='TIME',
75 help='Start of processing.')
76 filter_group.add_argument('--sensing-stop', '--stop',
77 type=ArgumentParser.stop_time,
78 metavar='TIME',
79 help='Nominal trigger time (stop of processing)')
80 filter_group.add_argument('--period', '-p',
81 type=ArgumentParser.period)
82 filter_group.add_argument('--sid', '-s',
83 type=ArgumentParser.sid,
84 help='Satellite ID')
85 filter_group.add_argument('--filename', '-f',
86 type=ArgumentParser.input_filename,
87 help='Filename for ingestion jobs')
88 filter_group.add_argument('--orbit',
89 type=int,
90 help='Specify orbit number')
91 filter_group.add_argument('--tablename', '--table', '-t',
92 nargs='+',
93 help='Specify tablename for running orbital stats')
94
95 action_group = parser.add_argument_group('Actions',
96 'Options used to select which action to perform')
97 action_group.add_argument('--activity', '-a',
98 metavar='ACT',
99 help='Run activity ACT')
100 action_group.add_argument('--list-activities', '-l',
101 action='store_true',
102 help='Display list of available algorithms')
103 action_group.add_argument('--info', '-i',
104 metavar='ACTIVITY',
105 help='Show information about ACTIVITY')
106 action_group.add_argument('--ingest',
107 action='store_true',
108 help=('Ingest working directory to database after if algorithm '
109 'completes'))
110 action_group.add_argument('--show-jobs', '--simulate',
111 action='store_true',
112 help=('Instead of running jobs display a table showing jobs which '
113 'would be run'))
114 action_group.add_argument('--derived',
115 type=ArgumentParser.input_filename,
116 metavar='RESULT',
117 help='Compute derived jobs for RESULT xml file')
118 action_group.add_argument('--outdir', '-o',
119 metavar='DIR',
120 type=ArgumentParser.output_dir,
121 help='Output directory name')
122 action_group.add_argument('--browse', '-b',
123 action='store_true',
124 help='Open work directory in $BROWSER after processing')
125 action_group.add_argument('--sendmail',
126 action='store_true',
127 help='Send emails to event subscribers')
128
129 batch_group = parser.add_argument_group('Batching',
130 'Options used to select how jobs are batched together')
131 batch_group.add_argument('--hourly',
132 action='store_true',
133 help='Create 1 job per hour in given time range')
134 batch_group.add_argument('--orbital',
135 action='store_true',
136 help='Create 1 job per orbit in given time range')
137 batch_group.add_argument('--daily',
138 action='store_true',
139 help='Create 1 job per day in given time range')
140 batch_group.add_argument('--weekly',
141 action='store_true',
142 help='Create 1 job per week in given time range')
143 batch_group.add_argument('--monthly',
144 action='store_true',
145 help='Create 1 job per month in given time range')
146 batch_group.add_argument('--single',
147 action='store_true',
148 help='Force jobs to be executed individually even if the activity '
149 'file says multiple execution is ok')
150 batch_group.add_argument('--tod',
151 # type=timedelta,
152 help='For daily only specify time of day, if not midnight')
153
154 # output_group = parser.add_argument_group(
155 # 'Output',
156 # 'Options used configure output formats')
157
158 parser.add_argument('alg',
159 nargs='?',
160 # type=ArgumentParser.activity,
161 help='Activity to run')
162 args = parser.parse_args()
163
164 if args.alg:
165 # compatibility to all the old 'proj alg iasi_pd_events ...' style invocation
166 args.activity = args.alg
167
168 if args.db:
169 settings.set_db_name(args.db)
170
171 if args.fake_writes:
172 settings.FAKE_WRITES = True
173 from chart.common.env import set_env
174 set_env('{ENV_PREFIX}FAKE_WRITES', '1')
175 import chart.db.ts.dispatch
176 imp.reload(chart.db.ts.dispatch)
177 import chart.db.ts
178 imp.reload(chart.db.ts)
179
180 if args.list_activities:
181 if args.orbital:
182 from chart.backend.activity import list_orbital_activities
183 list_orbital_activities()
184
185 else:
186 from chart.backend.activity import list_activities
187 list_activities()
188
189 parser.exit()
190
191 if args.info:
192 from chart.backend.activity import display_activity
193 display_activity(args.info)
194 parser.exit()
195
196 if args.derived:
197 # just show jobs which would be triggered from this job, no actions
198 # this could be misleading since when jobs are actually executed they can modify their
199 # times
200 show_derived(args.derived)
201 parser.exit()
202
203 if not args.activity:
204 parser.error('No activity specified')
205
206 # allow activity to be specified as either a XML activity filename ...
207 if Path(args.activity).exists():
208 dispatch_work_order(Path(args.activity[0]))
209 parser.exit()
210
211 # ... or an activity name from the standard directory
212 # else:
213 # we allow multiple activities to be run in serial
214 # args.activity = list(Activity(a) for a in args.activity)
215
216 activity = Activity(args.activity)
217
218 if args.outdir is None:
219 args.outdir = make_working_directory('dispatcher')
220
221 ensure_dir_exists(args.outdir)
222
223 job = Job(category=args.category)
224
225 # Create one (or more) orbital jobs
226 if args.orbit is not None:
227 job.orbit = args.orbit
228
229 if args.sid is None:
230 parser.error('SID must be specified if --orbit is used')
231
232 start, stop = args.sid.orbit.get_orbit_times(args.orbit)
233 if args.sensing_start is None:
234 args.sensing_start = start
235
236 if args.sensing_stop is None:
237 args.sensing_stop = stop
238
239 job.sensing_start = args.sensing_start
240 job.sensing_stop = args.sensing_stop
241 job.sid = args.sid if args.sid is not None else SID()
242 if args.filename is not None:
243 job.filename = args.filename.absolute()
244
245 # switch based on time settings - single job, daily jobs, orbital jobs
246 if args.orbital:
247 if args.sid is None:
248 parser.error('Must specify --sid if --orbital is used')
249
250 if args.sensing_start is None or args.sensing_stop is None:
251 parser.error('Must specify a time range if --orbital is used')
252
253 from chart.backend.worker_orbital_chains import orbits_in_range
254 jobs = []
255 for orbit in orbits_in_range(args.sid,
256 args.sensing_start,
257 args.sensing_stop,
258 None):
259 new_job = copy(job)
260 new_job.orbit = orbit['number']
261 new_job.sensing_start = orbit['start']
262 new_job.sensing_stop = orbit['stop']
263 jobs.append(new_job)
264
265 elif args.hourly:
266 if args.sensing_start is None or args.sensing_stop is None:
267 parser.error('Must specify time range if --hourly is used')
268
269 HOUR = timedelta(hours=1)
270 jobs = []
271 while args.sensing_start < args.sensing_stop:
272 new_job = copy(job)
273 new_job.sensing_start = args.sensing_start
274 new_job.sensing_stop = args.sensing_start + HOUR
275 jobs.append(new_job)
276 args.sensing_start += HOUR
277
278 elif args.daily:
279 if args.sensing_start is None or args.sensing_stop is None:
280 parser.error('Must specify time range if --orbital is used')
281
282 if args.orbit is not None:
283 parser.error('Cannot use --daily option if an orbit if specified')
284
285 if args.tod is not None:
286 # TBD change to proper data type
287 tod = xml_to_timedelta(args.tod)
288 sensing_start = datetime(args.sensing_start.year,
289 args.sensing_start.month,
290 args.sensing_start.day) + tod
291
292 else:
293 sensing_start = args.sensing_start
294
295 DAY = timedelta(days=1)
296 jobs = []
297 while sensing_start < args.sensing_stop:
298 new_job = copy(job)
299 new_job.sensing_start = sensing_start
300 new_job.sensing_stop = sensing_start + DAY
301 jobs.append(new_job)
302 sensing_start += DAY
303
304 elif args.weekly:
305 sensing_start = args.sensing_start - timedelta(days=args.sensing_start.weekday())
306 jobs = []
307 WEEK = timedelta(days=7)
308 while sensing_start < args.sensing_stop:
309 new_job = copy(job)
310 new_job.sensing_start = sensing_start
311 new_job.sensing_stop = sensing_start + WEEK
312 jobs.append(new_job)
313 sensing_start += WEEK
314
315 elif args.monthly:
316 sensing_start = args.sensing_start.replace(day=1)
317 jobs = []
318 while sensing_start < args.sensing_stop:
319 new_job = copy(job)
320 new_job.sensing_start = sensing_start
321 if sensing_start.month < 12:
322 new_job.sensing_stop = sensing_start.replace(month=sensing_start.month + 1)
323
324 else:
325 new_job.sensing_stop = sensing_start.replace(month=1,
326 year=sensing_start.year + 1)
327
328 jobs.append(new_job)
329 sensing_start = new_job.sensing_stop
330
331 else:
332 jobs = [job]
333
334 if args.show_jobs:
335 print('Jobs to be run:')
336 for job in jobs:
337 print(job)
338
339 parser.exit()
340
341 # if tables were specified multiply the jobs array giving one actual job per source
342 # job per table
343 if args.tablename is not None and len(args.tablename) > 0:
344 tabled_jobs = []
345 for job in jobs:
346 for table in args.tablename:
347 tabled_job = copy(job)
348 tabled_job.tablename = table.upper()
349 tabled_jobs.append(tabled_job)
350
351 jobs = tabled_jobs
352
353 # run jobs singly if any of the activities do not allow multiple jobs
354 if args.single:
355 multiple = False
356
357 else:
358 multiple = True
359 # for activity in args.activity:
360 # if not activity.allow_multiple:
361 # multiple = False
362 # break
363
364 if multiple:
365 # Replicate the `jobs` array giving us one new array
366 # per activity
367 use_subprocess = args.use_subprocess
368 # use_subprocess = (len(args.activity) > 1) or args.use_subprocess
369 # for activity in args.activity:
370
371 if len(jobs) == 0:
372 parser.exit('No jobs to run')
373
374 try:
375 activity.validate_call(sensing_start=jobs[0].sensing_start,
376 sensing_stop=jobs[0].sensing_stop,
377 sid=jobs[0].sid if jobs[0].sid is not None else None,
378 orbit=jobs[0].orbit,
379 tablename=jobs[0].tablename,
380 filename=jobs[0].filename)
381 except MissingAttribute as e:
382 if args.outdir is not None and str(args.outdir).isdigit():
383 print('--output directory was numeric; did you mean to use --orbit?')
384
385 parser.error(e)
386
387 a_jobs = []
388 for job in jobs:
389 a_job = copy(job)
390 a_job.activity = activity
391 a_jobs.append(a_job)
392
393 # we could probably get it working inline but using subprocess
394 # is easier when it has to import multiple algorithms
395
396 logger.info('Calling dispatch() with use_subprocess True and ingest = {i}'.format(i=args.ingest))
397 status = dispatch(jobs=a_jobs,
398 work_dir=args.outdir,
399 ingest=args.ingest,
400 use_subprocess=use_subprocess,
401 sendmails=args.sendmail).process_status
402 logger.info('Dispatch returns status {st}'.format(st=status.name))
403
404 if status is ProcessStatus.COMPLETED and args.browse:
405 browser = os.environ.get('BROWSER', 'firefox')
406 command = ('nohup',
407 browser,
408 '-no-remote',
409 'file://{d}/{f}'.format(d=Path.cwd().joinpath(args.outdir),
410 f=REPORT_FILENAME))
411 logger.info('Running {cmd}'.format(cmd=' '.join(command)))
412 subprocess.Popen(command, stdout=open('/dev/null', 'w'), stderr=open('/dev/null', 'w'))
413
414 else:
415 for job in jobs:
416 # for activity in args.activity:
417 activity.validate_call(sensing_start=job.sensing_start,
418 sensing_stop=job.sensing_stop,
419 sid=job.sid if job.sid is not None else SID(),
420 orbit=job.orbit,
421 tablename=job.tablename,
422 filename=job.filename)
423 a_job = copy(job)
424 a_job.activity = activity
425 dispatch(jobs=[a_job],
426 work_dir=args.outdir,
427 ingest=args.ingest)
428
429 logger.info('Working directory was {d}'.format(d=args.outdir))
430
431if __name__ == '__main__':
432 main()