1#!/usr/bin/env python3
  2
  3"""Command line interface to dispatcher module."""
  4
  5import os
  6from copy import copy
  7import logging
  8import subprocess
  9from datetime import datetime, timedelta
 10
 11from pathlib import Path
 12
 13from chart.common.args import ArgumentParser
 14from chart.project import settings
 15from chart.backend.dispatcher import dispatch_work_order
 16from chart.backend.activity import Activity
 17from chart.backend.job import Job
 18from chart.common.util import ensure_dir_exists
 19from chart.backend.result import Result
 20from chart.project import SID
 21from chart.backend.dispatcher import dispatch
 22from chart.backend.processes import ProcessStatus
 23from chart.common.xml import xml_to_timedelta
 24from chart.backend.activity import MissingAttribute
 25from chart.backend.worker import make_working_directory
 26from chart.alg.settings import REPORT_FILENAME
 27import imp
 28
 29logger = logging.getLogger()
 30
 31
 32def show_derived(path):
 33    """Given a result file `path`, compute and display all the derived jobs it would trigger."""
 34    from chart.backend.worker import find_derived_jobs
 35    resultfile = Result(path)
 36    print('Showing derived jobs for {path}'.format(path=path))
 37    for job in resultfile.read_jobs():
 38        print('  Examining job {id}'.format(
 39                id=job.job_id if job.job_id is not None else '<manual>'))
 40        for derived_job in find_derived_jobs(job):
 41            print('    {job}'.format(job=derived_job))
 42
 43    print('All done')
 44
 45
 46def main():
 47    """Command line entry point."""
 48    parser = ArgumentParser(__doc__)
 49
 50    conn_group = parser.add_argument_group(
 51        'Connection',
 52        'Options used to choose db connections')
 53    conn_group.add_argument('--db', '--conn', '-d',
 54                            metavar='CONNECTION',
 55                            help='Use database connection CONNECTION')
 56    conn_group.add_argument('--category',
 57                            default='MANUAL',
 58                            help='Category to use for job')
 59    conn_group.add_argument('--debug',
 60                            default=False,
 61                            action='store_true',
 62                            help='Instruct algorithm to enable extra diagnostic outputs')
 63    conn_group.add_argument('--fake-writes',
 64                            action='store_true',
 65                            help='Write to CSV file instead of real tables')
 66    conn_group.add_argument('--use-subprocess',
 67                            action='store_true',
 68                            help='Run algorithm in subprocess')
 69
 70    filter_group = parser.add_argument_group('Filtering',
 71                                             'Options used to specify job attributes')
 72    filter_group.add_argument('--sensing-start', '--start',
 73                              type=ArgumentParser.start_time,
 74                              metavar='TIME',
 75                              help='Start of processing.')
 76    filter_group.add_argument('--sensing-stop', '--stop',
 77                              type=ArgumentParser.stop_time,
 78                              metavar='TIME',
 79                              help='Nominal trigger time (stop of processing)')
 80    filter_group.add_argument('--period', '-p',
 81                        type=ArgumentParser.period)
 82    filter_group.add_argument('--sid', '-s',
 83                               type=ArgumentParser.sid,
 84                               help='Satellite ID')
 85    filter_group.add_argument('--filename', '-f',
 86                              type=ArgumentParser.input_filename,
 87                              help='Filename for ingestion jobs')
 88    filter_group.add_argument('--orbit',
 89                              type=int,
 90                              help='Specify orbit number')
 91    filter_group.add_argument('--tablename', '--table', '-t',
 92                              nargs='+',
 93                              help='Specify tablename for running orbital stats')
 94
 95    action_group = parser.add_argument_group('Actions',
 96                                             'Options used to select which action to perform')
 97    action_group.add_argument('--activity', '-a',
 98                              metavar='ACT',
 99                              help='Run activity ACT')
100    action_group.add_argument('--list-activities', '-l',
101                              action='store_true',
102                              help='Display list of available algorithms')
103    action_group.add_argument('--info', '-i',
104                              metavar='ACTIVITY',
105                              help='Show information about ACTIVITY')
106    action_group.add_argument('--ingest',
107                              action='store_true',
108                              help=('Ingest working directory to database after if algorithm '
109                                    'completes'))
110    action_group.add_argument('--show-jobs', '--simulate',
111                              action='store_true',
112                              help=('Instead of running jobs display a table showing jobs which '
113                                    'would be run'))
114    action_group.add_argument('--derived',
115                              type=ArgumentParser.input_filename,
116                              metavar='RESULT',
117                              help='Compute derived jobs for RESULT xml file')
118    action_group.add_argument('--outdir', '-o',
119                              metavar='DIR',
120                              type=ArgumentParser.output_dir,
121                              help='Output directory name')
122    action_group.add_argument('--browse', '-b',
123                              action='store_true',
124                              help='Open work directory in $BROWSER after processing')
125    action_group.add_argument('--sendmail',
126                              action='store_true',
127                              help='Send emails to event subscribers')
128
129    batch_group = parser.add_argument_group('Batching',
130                                            'Options used to select how jobs are batched together')
131    batch_group.add_argument('--hourly',
132                             action='store_true',
133                             help='Create 1 job per hour in given time range')
134    batch_group.add_argument('--orbital',
135                             action='store_true',
136                             help='Create 1 job per orbit in given time range')
137    batch_group.add_argument('--daily',
138                             action='store_true',
139                             help='Create 1 job per day in given time range')
140    batch_group.add_argument('--weekly',
141                             action='store_true',
142                             help='Create 1 job per week in given time range')
143    batch_group.add_argument('--monthly',
144                             action='store_true',
145                             help='Create 1 job per month in given time range')
146    batch_group.add_argument('--single',
147                             action='store_true',
148                             help='Force jobs to be executed individually even if the activity '
149                             'file says multiple execution is ok')
150    batch_group.add_argument('--tod',
151                     # type=timedelta,
152                     help='For daily only specify time of day, if not midnight')
153
154    # output_group = parser.add_argument_group(
155        # 'Output',
156        # 'Options used configure output formats')
157
158    parser.add_argument('alg',
159                        nargs='?',
160                        # type=ArgumentParser.activity,
161                        help='Activity to run')
162    args = parser.parse_args()
163
164    if args.alg:
165        # compatibility to all the old 'proj alg iasi_pd_events ...' style invocation
166        args.activity = args.alg
167
168    if args.db:
169        settings.set_db_name(args.db)
170
171    if args.fake_writes:
172        settings.FAKE_WRITES = True
173        from chart.common.env import set_env
174        set_env('{ENV_PREFIX}FAKE_WRITES', '1')
175        import chart.db.ts.dispatch
176        imp.reload(chart.db.ts.dispatch)
177        import chart.db.ts
178        imp.reload(chart.db.ts)
179
180    if args.list_activities:
181        if args.orbital:
182            from chart.backend.activity import list_orbital_activities
183            list_orbital_activities()
184
185        else:
186            from chart.backend.activity import list_activities
187            list_activities()
188
189        parser.exit()
190
191    if args.info:
192        from chart.backend.activity import display_activity
193        display_activity(args.info)
194        parser.exit()
195
196    if args.derived:
197        # just show jobs which would be triggered from this job, no actions
198        # this could be misleading since when jobs are actually executed they can modify their
199        # times
200        show_derived(args.derived)
201        parser.exit()
202
203    if not args.activity:
204        parser.error('No activity specified')
205
206    # allow activity to be specified as either a XML activity filename ...
207    if Path(args.activity).exists():
208        dispatch_work_order(Path(args.activity[0]))
209        parser.exit()
210
211    # ... or an activity name from the standard directory
212    # else:
213        # we allow multiple activities to be run in serial
214        # args.activity = list(Activity(a) for a in args.activity)
215
216    activity = Activity(args.activity)
217
218    if args.outdir is None:
219        args.outdir = make_working_directory('dispatcher')
220
221    ensure_dir_exists(args.outdir)
222
223    job = Job(category=args.category)
224
225    # Create one (or more) orbital jobs
226    if args.orbit is not None:
227        job.orbit = args.orbit
228
229        if args.sid is None:
230            parser.error('SID must be specified if --orbit is used')
231
232        start, stop = args.sid.orbit.get_orbit_times(args.orbit)
233        if args.sensing_start is None:
234            args.sensing_start = start
235
236        if args.sensing_stop is None:
237            args.sensing_stop = stop
238
239    job.sensing_start = args.sensing_start
240    job.sensing_stop = args.sensing_stop
241    job.sid = args.sid if args.sid is not None else SID()
242    if args.filename is not None:
243        job.filename = args.filename.absolute()
244
245    # switch based on time settings - single job, daily jobs, orbital jobs
246    if args.orbital:
247        if args.sid is None:
248            parser.error('Must specify --sid if --orbital is used')
249
250        if args.sensing_start is None or args.sensing_stop is None:
251            parser.error('Must specify a time range if --orbital is used')
252
253        from chart.backend.worker_orbital_chains import orbits_in_range
254        jobs = []
255        for orbit in orbits_in_range(args.sid,
256                                     args.sensing_start,
257                                     args.sensing_stop,
258                                     None):
259            new_job = copy(job)
260            new_job.orbit = orbit['number']
261            new_job.sensing_start = orbit['start']
262            new_job.sensing_stop = orbit['stop']
263            jobs.append(new_job)
264
265    elif args.hourly:
266        if args.sensing_start is None or args.sensing_stop is None:
267            parser.error('Must specify time range if --hourly is used')
268
269        HOUR = timedelta(hours=1)
270        jobs = []
271        while args.sensing_start < args.sensing_stop:
272            new_job = copy(job)
273            new_job.sensing_start = args.sensing_start
274            new_job.sensing_stop = args.sensing_start + HOUR
275            jobs.append(new_job)
276            args.sensing_start += HOUR
277
278    elif args.daily:
279        if args.sensing_start is None or args.sensing_stop is None:
280            parser.error('Must specify time range if --orbital is used')
281
282        if args.orbit is not None:
283            parser.error('Cannot use --daily option if an orbit if specified')
284
285        if args.tod is not None:
286            # TBD change to proper data type
287            tod = xml_to_timedelta(args.tod)
288            sensing_start = datetime(args.sensing_start.year,
289                                     args.sensing_start.month,
290                                     args.sensing_start.day) + tod
291
292        else:
293            sensing_start = args.sensing_start
294
295        DAY = timedelta(days=1)
296        jobs = []
297        while sensing_start < args.sensing_stop:
298            new_job = copy(job)
299            new_job.sensing_start = sensing_start
300            new_job.sensing_stop = sensing_start + DAY
301            jobs.append(new_job)
302            sensing_start += DAY
303
304    elif args.weekly:
305        sensing_start = args.sensing_start - timedelta(days=args.sensing_start.weekday())
306        jobs = []
307        WEEK = timedelta(days=7)
308        while sensing_start < args.sensing_stop:
309            new_job = copy(job)
310            new_job.sensing_start = sensing_start
311            new_job.sensing_stop = sensing_start + WEEK
312            jobs.append(new_job)
313            sensing_start += WEEK
314
315    elif args.monthly:
316        sensing_start = args.sensing_start.replace(day=1)
317        jobs = []
318        while sensing_start < args.sensing_stop:
319            new_job = copy(job)
320            new_job.sensing_start = sensing_start
321            if sensing_start.month < 12:
322                new_job.sensing_stop = sensing_start.replace(month=sensing_start.month + 1)
323
324            else:
325                new_job.sensing_stop = sensing_start.replace(month=1,
326                                                                year=sensing_start.year + 1)
327
328            jobs.append(new_job)
329            sensing_start = new_job.sensing_stop
330
331    else:
332        jobs = [job]
333
334    if args.show_jobs:
335        print('Jobs to be run:')
336        for job in jobs:
337            print(job)
338
339        parser.exit()
340
341    # if tables were specified multiply the jobs array giving one actual job per source
342    # job per table
343    if args.tablename is not None and len(args.tablename) > 0:
344        tabled_jobs = []
345        for job in jobs:
346            for table in args.tablename:
347                tabled_job = copy(job)
348                tabled_job.tablename = table.upper()
349                tabled_jobs.append(tabled_job)
350
351        jobs = tabled_jobs
352
353    # run jobs singly if any of the activities do not allow multiple jobs
354    if args.single:
355        multiple = False
356
357    else:
358        multiple = True
359        # for activity in args.activity:
360            # if not activity.allow_multiple:
361                # multiple = False
362                # break
363
364    if multiple:
365        # Replicate the `jobs` array giving us one new array
366        # per activity
367        use_subprocess = args.use_subprocess
368        # use_subprocess = (len(args.activity) > 1) or args.use_subprocess
369        # for activity in args.activity:
370
371        if len(jobs) == 0:
372            parser.exit('No jobs to run')
373
374        try:
375            activity.validate_call(sensing_start=jobs[0].sensing_start,
376                                   sensing_stop=jobs[0].sensing_stop,
377                                   sid=jobs[0].sid if jobs[0].sid is not None else None,
378                                   orbit=jobs[0].orbit,
379                                   tablename=jobs[0].tablename,
380                                   filename=jobs[0].filename)
381        except MissingAttribute as e:
382            if args.outdir is not None and str(args.outdir).isdigit():
383                print('--output directory was numeric; did you mean to use --orbit?')
384
385            parser.error(e)
386
387        a_jobs = []
388        for job in jobs:
389            a_job = copy(job)
390            a_job.activity = activity
391            a_jobs.append(a_job)
392
393        # we could probably get it working inline but using subprocess
394        # is easier when it has to import multiple algorithms
395
396        logger.info('Calling dispatch() with use_subprocess True and ingest = {i}'.format(i=args.ingest))
397        status = dispatch(jobs=a_jobs,
398                          work_dir=args.outdir,
399                          ingest=args.ingest,
400                          use_subprocess=use_subprocess,
401                          sendmails=args.sendmail).process_status
402        logger.info('Dispatch returns status {st}'.format(st=status.name))
403
404        if status is ProcessStatus.COMPLETED and args.browse:
405            browser = os.environ.get('BROWSER', 'firefox')
406            command = ('nohup',
407                       browser,
408                       '-no-remote',
409                       'file://{d}/{f}'.format(d=Path.cwd().joinpath(args.outdir),
410                                               f=REPORT_FILENAME))
411            logger.info('Running {cmd}'.format(cmd=' '.join(command)))
412            subprocess.Popen(command, stdout=open('/dev/null', 'w'), stderr=open('/dev/null', 'w'))
413
414    else:
415        for job in jobs:
416            # for activity in args.activity:
417            activity.validate_call(sensing_start=job.sensing_start,
418                                   sensing_stop=job.sensing_stop,
419                                   sid=job.sid if job.sid is not None else SID(),
420                                   orbit=job.orbit,
421                                   tablename=job.tablename,
422                                   filename=job.filename)
423            a_job = copy(job)
424            a_job.activity = activity
425            dispatch(jobs=[a_job],
426                     work_dir=args.outdir,
427                     ingest=args.ingest)
428
429    logger.info('Working directory was {d}'.format(d=args.outdir))
430
431if __name__ == '__main__':
432    main()