1#!/usr/bin/env python3
   2
   3"""Tool for manually examining or modifying the JOBS table, containing pending,
   4in progress or failed jobs."""
   5
   6import os
   7import sys
   8import tty
   9import glob
  10import fnmatch
  11import logging
  12import termios
  13from datetime import datetime, timedelta
  14from copy import copy
  15
  16from chart.common.path import Path
  17from chart.alg.settings import LOG_FILENAME
  18from chart.alg.settings import WORKORDER_FILENAME
  19from chart.alg.settings import RESULT_FILENAME
  20from chart.project import settings
  21from chart.common.xml import load_xml
  22from chart.common.args import ArgumentParser
  23from chart.backend.activity import Activity
  24from chart.common.prettyprint import Table
  25from chart.common.prettyprint import show_time_m
  26from chart.common.prettyprint import show_timedelta_relaxed
  27from chart.backend import worker
  28from chart.backend.worker import make_working_directory
  29from chart.common.exceptions import ConfigError
  30from chart.backend.jobs import delete_jobs
  31from chart.backend.jobs import find_jobs
  32from chart.backend.job import job_retrieve_fields
  33from chart.backend.job import Job
  34from chart.db.func import ANY
  35from chart.db.func import Min, Max, Count
  36from chart.db.connection import db_connect
  37from chart.common.traits import name_of_thing
  38from chart.common.xml import xml_to_timedelta
  39from chart.backend.processes import find_processes
  40from chart.backend.job import JobStatus
  41from chart.backend.jobs import update_jobs
  42from chart.backend.jobcreator import JobCreator
  43from chart.common.timerange import TimeRange
  44from chart.backend.jobs import add_job
  45from chart.project import SID
  46from chart.db.model.table import TableInfo
  47from chart.db.db_tool import keyboard_confirm
  48from chart.products.fdf.orbit import NoSuchOrbit
  49from chart.backend.result import Result
  50from chart.backend.workorder import WorkOrder
  51from chart.alg import settings as alg_settings
  52from chart.products.fdf.orbit import get_orbit_times
  53from chart.common.xml import datetime_to_xml
  54from chart.backend.jobcreator import ProductAttributes
  55
  56db_conn = db_connect()    # JOBS, PROCESSES
  57
  58
  59# colouring for the jobs table
  60STATUS_COLOURS = {
  61    'FAILED': 'red',
  62    'IN_PROGRESS': 'blue',
  63    'TIMEOUT': 'magenta',
  64    'RETRY': 'magenta',
  65    'COMPLETED': 'cyan',
  66    None: 'yellow',
  67}
  68
  69# Jobs marked IN_PROGRESS but which have been running for more than
  70# OLD_THRESHOLD are considered dead
  71OLD_THRESHOLD = timedelta(hours=1)
  72# OLD_THRESHOLD = timedelta(days=1)
  73
  74
  75def draw_jobs_table(title, jobs, target=sys.stdout):
  76    """Draw a nicely formatted table of a list of jobs.
  77    Jobs may come from the database in which case they have `id` and
  78    `gen_time` attributes or may come from code.
  79    """
  80
  81    jobs = list(jobs)  # in case its a generator
  82    headings = ['Status', 'Activity', 'Attributes']
  83    if len(jobs) > 0 and jobs[0].job_id is not None:
  84        headings = ['Id', 'GenTime'] + headings
  85
  86    t = Table(
  87        title=title,
  88        headings=headings,
  89        header_underline='=',
  90        header_blanklines=1,
  91        cross=' ',
  92        column_split=' ',
  93        header_split=' ',
  94    )
  95    # border_fg_col='cyan')
  96
  97    for job in jobs:
  98        if job.filename is not None:
  99            att = job.filename
 100
 101        else:
 102            att = ''
 103            if job.sid is not None:
 104                att += '{s} '.format(s=job.sid)
 105
 106            if job.tablename is not None:
 107                att += job.tablename + ' '
 108
 109            if job.orbit is not None:
 110                att += 'orb {orbit} from {range}'.format(
 111                    orbit=job.orbit,
 112                    range=TimeRange(job.sensing_start, job.sensing_stop),
 113                )
 114
 115            elif job.sensing_start is not None:
 116                att += '{start} to {stop}'.format(
 117                    start=job.sensing_start, stop=job.sensing_stop
 118                )
 119
 120        if job.earliest_execution_time is not None:
 121            att += ' after ' + show_time_m(job.earliest_execution_time)
 122
 123        row = [
 124            {'text': job.status.name, 'markup': job.status.name},
 125            {'text': job.activity.name, 'markup': 'activity'},
 126            att,
 127        ]
 128        if job.job_id is not None:
 129            row = [
 130                {'text': job.job_id, 'markup': 'id'},
 131                show_time_m(job.gen_time),
 132            ] + row
 133
 134        t.append(row)
 135
 136    t.write(target)
 137
 138
 139def list_jobs(category,
 140              activities,
 141              start_time,
 142              stop_time,
 143              sid=None,
 144              status=None,
 145              table=None,
 146              filename=None,
 147              orbit=None,
 148              only_missing_files=False):
 149    """Display JOBS table entries.
 150
 151    If `only_missing_files` is set then only display jobs with a filename that is missing.
 152    """
 153    jobs = []
 154    for row in find_jobs(fields=job_retrieve_fields(),
 155                         status=status,
 156                         category=category,
 157                         activity=activities,
 158                         sid=sid,
 159                         # gen_time_ge=start_time,
 160                         # gen_time_lt=stop_time,
 161                         sensing_start_ge=start_time,
 162                         sensing_start_lt=stop_time,
 163                         # don't use gen_time tests otherwise --list and --rescheule
 164                         # operate on different jobs
 165                         # sensing_start_or_gen_time_ge=start_time,
 166                         # sensing_start_or_gen_time_lt=stop_time,
 167                         tablename=table,
 168                         filename_like=filename,
 169                         orbit=orbit,
 170                         order_by=('activity', 'sensing_start', 'id')):
 171        job = Job(row)
 172        if only_missing_files and (job.filename is None or job.filename.exists()):
 173            continue
 174
 175        jobs.append(job)
 176
 177    draw_jobs_table('Jobs', jobs)
 178
 179
 180def list_jobs_by_process(execute_start=None,
 181                         execute_stop=None,
 182                         activity=None):
 183    """A rather basic display showing the jobs performed across all workers for a time range."""
 184    print('Jobs by process')
 185    last_worker = None
 186    # Find all processes in time range
 187    for p in find_processes(fields=('id',
 188                                    'worker',
 189                                    'execute_start',
 190                                    'execute_stop',
 191                                    'status',
 192                                    'working_dir'),
 193                            execute_start_ge=execute_start,
 194                            execute_stop_le=execute_stop,
 195                            ordering=('worker', 'execute_start')):
 196        # Print a divider line between different processes
 197        if p[1] != last_worker:
 198            last_worker = p[1]
 199            print(last_worker)
 200
 201        # Print process submmary
 202        print('    ',datetime_to_xml(p[2]), datetime_to_xml(p[3]))
 203        # Find jobs in the batch, filtering by activity type
 204        for j in find_jobs(fields=('activity', 'sensing_start', 'sensing_stop', 'status'),
 205                           process_id=p[0],
 206                           activity=activity):
 207            # Basic display of job
 208            print('        ',j[0], j[1], j[2], j[3])
 209
 210
 211
 212def add_or_sim_job(job, simulate=False):  # , return_id=False):
 213    """Utility function to either add a job to the database or just display it.
 214
 215    Returns:
 216                    None if job was discarded for being a duplicate, True if added.
 217    """
 218    if simulate:
 219        logging.info(
 220            'Simulating {cat} {act} {params}'.format(
 221                cat=job.category,
 222                act=job.activity,
 223                params=', '.join(
 224                    '{k}:{v}'.format(k=k, v=v)
 225                    for k, v in job.items()
 226                    if k not in ('category', 'activity') and v is not None
 227                ),
 228            )
 229        )
 230        return True, None
 231
 232    else:
 233        return add_job(job)  # , return_id=return_id)
 234
 235
 236def find_activity_and_parser(filename):
 237    """For a given filename, try to find a suitable Activity to ingest it."""
 238    if hasattr(find_activity_and_parser, 'last_pattern') and fnmatch.fnmatch(
 239        filename.name, getattr(find_activity_and_parser, 'last_pattern')
 240    ):
 241        return getattr(find_activity_and_parser, 'last_activity')
 242
 243    for jobcreator in JobCreator.all():
 244        trigger = jobcreator.trigger
 245        if 'pattern' not in trigger:
 246            continue
 247
 248        pattern = trigger['pattern']
 249        if fnmatch.fnmatch(filename.name, pattern + '*'):
 250            activity = list(jobcreator.responses())[0]['activity']
 251            parser = trigger['parser']
 252            result = (activity, parser)
 253            setattr(find_activity_and_parser, 'last_activity', result)
 254            return result
 255
 256    return None
 257
 258
 259def make_jobs(
 260    category,  # (too many arguments) pylint: disable=R0913
 261    activities=None,
 262    filenames=None,
 263    sid=None,
 264    orbit=None,
 265    start_time=None,
 266    stop_time=None,
 267    table=None,
 268    hourly=False,
 269    orbital=False,
 270    daily=False,
 271    weekly=False,
 272    monthly=False,
 273    daynight=False,
 274    tod=None,
 275    status=JobStatus.PENDING,
 276    earliest_execution_time=None,
 277):
 278    """Insert one (or multiple, if ingestion) jobs to the JOBS table.
 279    This is only used for manual adding of jobs, never by the scheduler or worker.
 280    This is a high-level function that can do some parameter substitution and
 281    may actually create multiple jobs.
 282    """
 283    res = []
 284
 285    if filenames is not None:
 286        if orbit is not None or start_time is not None or stop_time is not None:
 287            raise ValueError(
 288                'If a filename job is added do not specify --orbit, '
 289                '--start-time or --stop-time'
 290            )
 291
 292        for f in filenames:
 293            f = Path(f)
 294            norm_f = f.absolute()
 295            if activities is None or len(activities) == 0:
 296                a_p = find_activity_and_parser(f)
 297                if a_p is None:
 298                    raise ConfigError(
 299                        'Activity cannot be guessed for filename {filename}'.format(
 300                            filename=f
 301                        )
 302                    )
 303                a, p = a_p
 304
 305            elif len(activities) == 1:
 306                a = activities[0]
 307
 308            else:
 309                raise ValueError('Must specify activity for ingestion jobs')
 310
 311            # if its a PDU run it through the distiller orbit extraction code to
 312            # insert entries in the ORBITS table.
 313            # if a == 'PDU_INGESTER':
 314            # prod = eps.EPSReader(f)
 315            # distiller.orbit_check(prod)
 316
 317            # Call the scheduler file parser function to read SID and sensing time from
 318            # the file
 319            attributes = p(f)
 320            sensing_start = None
 321            if isinstance(attributes, ProductAttributes):
 322                if sid is None:
 323                    sid = attributes.sid
 324
 325                sensing_start = attributes.sensing_start
 326
 327            res.append(
 328                Job(
 329                    category=category,
 330                    activity=a,
 331                    sensing_start=sensing_start,
 332                    filename=norm_f,
 333                    sid=sid,
 334                    status=status,
 335                )
 336            )
 337
 338        return res
 339
 340    if activities is None:
 341        raise ValueError(
 342            'Activity name must be specified unless adding an ingestion job'
 343        )
 344
 345    # activity = Activity(activityname)
 346
 347    if orbit is not None:
 348        # adding a single orbit for an orbital activity
 349        if start_time is not None or stop_time is not None:
 350            raise ValueError(
 351                'Cannot use either --start or --stop if an --orbit is specified'
 352            )
 353
 354        if orbital or daynight:
 355            raise ValueError('Cannot use --orbit together with --orbital or --daynight')
 356
 357        if sid is None:
 358            raise ValueError('Must specify SID when adding an orbital algorithm')
 359
 360        times = get_orbit_times(sid, orbit)
 361        if times is None:
 362            raise ValueError('Unknown orbit')
 363
 364        job = Job(
 365            category=category,
 366            sid=sid,
 367            orbit=orbit,
 368            sensing_start=times[0],
 369            sensing_stop=times[1],
 370            status=status,
 371        )
 372
 373        if table is not None:
 374            job.tablename = table.name
 375
 376        res.append(job)
 377
 378    else:
 379        if hourly + orbital + daily + daynight + weekly + monthly > 1:
 380            raise ValueError(
 381                'Can only use one of --orbital, --daily, --weekly, '
 382                '--monthly and --daynight'
 383            )
 384
 385        if orbital:
 386            if sid is None:
 387                raise ValueError('SID must be specified for per-orbit jobs')
 388
 389            # add one job per orbit within the given timerange
 390            start_orbit = sid.orbit.find_orbit(start_time)
 391            stop_orbit = sid.orbit.find_orbit(stop_time)
 392
 393            if start_orbit is None or stop_orbit is None:
 394                raise ValueError('No orbits known for {sid}'.format(sid=sid))
 395
 396            for o in range(start_orbit, stop_orbit + 1):
 397                try:
 398                    start, stop = sid.orbit.get_orbit_times(o)
 399                except NoSuchOrbit:
 400                    logging.error('Orbit {o} not found'.format(o=o))
 401                    continue
 402
 403                job = Job(
 404                    category=category,
 405                    sid=sid,
 406                    orbit=o,
 407                    sensing_start=start,
 408                    sensing_stop=stop,
 409                    status=status,
 410                )
 411
 412                if table is not None:
 413                    job.tablename = table
 414
 415                res.append(job)
 416
 417        elif hourly:
 418            while start_time < stop_time:
 419                job = Job(
 420                    category=category,
 421                    sid=sid,
 422                    sensing_start=start_time,
 423                    sensing_stop=start_time + timedelta(hours=1),
 424                    status=status,
 425                )
 426
 427                if table is not None:
 428                    job.tablename = table
 429
 430                res.append(job)
 431
 432                start_time += timedelta(hours=1)
 433
 434        elif daily:
 435            # clip start time to start of that day
 436            start_time = datetime(start_time.year, start_time.month, start_time.day)
 437            if tod is not None:
 438                start_time += tod
 439            while start_time < stop_time:
 440                job = Job(
 441                    category=category,
 442                    sid=sid,
 443                    sensing_start=start_time,
 444                    sensing_stop=start_time + timedelta(days=1),
 445                    status=status,
 446                )
 447
 448                if table is not None:
 449                    job.tablename = table.name
 450
 451                res.append(job)
 452
 453                start_time += timedelta(days=1)
 454
 455        elif weekly:
 456            DAYS_PER_WEEK = 7
 457            if start_time.weekday() != 0:
 458                start_time += timedelta(days=DAYS_PER_WEEK - start_time.weekday())
 459
 460            while True:
 461                report_end_time = start_time + timedelta(days=DAYS_PER_WEEK)
 462
 463                if report_end_time > stop_time:
 464                    break
 465
 466                job = Job(
 467                    category=category,
 468                    sid=sid,
 469                    sensing_start=start_time,
 470                    sensing_stop=report_end_time,
 471                    status=status,
 472                )
 473
 474                if table is not None:
 475                    job.tablename = table.name
 476
 477                res.append(job)
 478
 479                start_time = report_end_time
 480
 481        elif monthly:
 482            while True:
 483                if start_time.month < 12:
 484                    report_stop_time = start_time.replace(month=start_time.month + 1)
 485
 486                else:
 487                    report_stop_time = start_time.replace(
 488                        month=1, year=start_time.year + 1
 489                    )
 490                # print(start_time, report_stop_time)
 491
 492                if report_stop_time > stop_time:
 493                    break
 494
 495                job = Job(
 496                    category=category,
 497                    sid=sid,
 498                    sensing_start=start_time,
 499                    sensing_stop=report_stop_time,
 500                    status=status,
 501                )
 502
 503                if table is not None:
 504                    job.tablename = table.name
 505
 506                res.append(job)
 507
 508                start_time = report_stop_time
 509
 510        elif daynight:
 511            raise NotImplementedError()
 512            # logging.info('activity ignored')
 513            # for region in worker.find_daynight_regions(sid, start_time, stop_time):
 514            # job = Job(category=category,
 515            # activity='DAY_STATS' if region['type'] == 'day' else 'NIGHT_STATS',
 516            # sid=sid,
 517            # tablename=table.name,
 518            # orbit=region['orbit'],
 519            # sensing_start=region['start'],
 520            # sensing_stop=region['stop'])
 521
 522            # res.append(job)
 523
 524            # return res
 525
 526        else:
 527            # just add a single job for the given time range
 528            job = Job(
 529                category=category,
 530                sid=sid,
 531                sensing_start=start_time,
 532                sensing_stop=stop_time,
 533                status=status
 534            )
 535
 536            if table is not None:
 537                job.tablename = table
 538
 539            res.append(job)
 540
 541    # now we have a basic list of jobs, with no activities set
 542    jobs_with_activities = []
 543    for job in res:
 544        for a in activities:
 545            job_with_activity = copy(job)
 546            job_with_activity.activity = a
 547            job_with_activity.earliest_execution_time = earliest_execution_time
 548            jobs_with_activities.append(job_with_activity)
 549
 550    return jobs_with_activities
 551
 552
 553def retrieve_job(jobid):
 554    """Should be unused now."""
 555    row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
 556
 557    return Job(row=row)
 558
 559
 560def show_single_job(jobid, show_job, show_process):
 561    """Display information from the JOBS and PROCESSES tables for `jobid`.
 562    Also display process log if available and computes derived jobs.
 563    If `show_job` is set show the JOBS table fields only.
 564    """
 565
 566    row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
 567    if row is None:
 568        raise ValueError('No such job {id}'.format(id=jobid))
 569
 570    job = Job(row=row)
 571
 572    if show_job or not show_process:
 573        t = Table(title='Single job')
 574        for f in (
 575                'Job_ID',
 576                'Category',
 577                'Status',
 578                'Gen_time',
 579                'Activity',
 580                'Filename',
 581                'SID',
 582                'Orbit',
 583                'Sensing_start',
 584                'Sensing_stop',
 585                'Earliest_execution_time',
 586                'Process_ID',
 587                'Parent',
 588                'Tablename',
 589        ):
 590            t.append((f, name_of_thing(getattr(job, f.lower()))))
 591
 592        t.write()
 593
 594    # if job.status is None:
 595    # print('')
 596    # draw_jobs_table('Related jobs',
 597    # worker.find_similar_jobs(job, 10, datetime(2000, 1, 1)))
 598
 599    if job.process_id is not None and (show_process or not show_job):
 600        print('')
 601        show_proc(proc_id=job.process_id, show_log=not show_job and not show_process)
 602
 603
 604def show_proc(proc_id, show_log):
 605    """Show all information for process `proc_id`."""
 606
 607    t = Table(title='Process ' + str(proc_id), headings=('Attribute', 'Value'))
 608
 609    row = find_processes(
 610        fields=(
 611            'WORKER',
 612            'PID',
 613            'EXECUTE_START',
 614            'EXECUTE_STOP',
 615            'STATUS',
 616            'WORKING_DIR',
 617        ),
 618        process_id=proc_id,
 619    ).fetchone()
 620
 621    if row is None:
 622        raise ValueError('No such process id {id}'.format(id=proc_id))
 623
 624    for kv in zip(
 625        ('Worker', 'PID', 'Execute start', 'Execute stop', 'Status', 'Work dir'), row
 626    ):
 627        t.append(kv)
 628
 629    jobs_brief = find_jobs(fields=('ID', 'ACTIVITY'), process_id=proc_id).fetchall()
 630
 631    if len(jobs_brief) > 0:
 632        t.append(
 633            (
 634                'Computed work dir',
 635                os.path.join(
 636                    row[2].date().strftime('%Y%m%d'),
 637                    jobs_brief[0][1] + '_' + str(proc_id),
 638                ),
 639            )
 640        )
 641
 642    t.write()
 643
 644    if not show_log:
 645        return
 646
 647    print('')
 648    draw_jobs_table('Jobs', [retrieve_job(job[0]) for job in jobs_brief])
 649
 650    print('\nLog\n===\n')
 651    if row[5] is None:
 652        print('Working dir not set')
 653
 654    else:
 655        wd_path = Path(row[5])
 656        log_path = wd_path.child(LOG_FILENAME)
 657        if log_path.exists():
 658            print(open(str(log_path), 'r').read())
 659
 660        else:
 661            print('Log file {f} not found'.format(f=log_path))
 662
 663
 664def show_summary(
 665    category,
 666    # activity,
 667    status,
 668    sid,
 669    start_time,
 670    stop_time,
 671):
 672    """Display a summary table for a range of jobs showing counts of jobs
 673    grouped by status, activity and sid."""
 674    t = Table(
 675        title='Summary of jobs',
 676        headings=(
 677            'Activity',
 678            'SID',
 679            'Status',
 680            'Count',
 681            'Max gen time',
 682            'Max earliest exec time',
 683        ),
 684    )
 685
 686    last_activity = None
 687
 688    for row in find_jobs(
 689        fields=[
 690            'ACTIVITY',
 691            'STATUS',
 692            'count(*)',
 693            'max(GEN_TIME)',
 694            'max(EARLIEST_EXECUTION_TIME)',
 695        ]
 696        + SID.sql_sys_select('JOBS'),
 697        # fields=('ACTIVITY', 'SCID', 'STATUS', 'count(*)', 'max(GEN_TIME)',
 698        # 'max(EARLIEST_EXECUTION_TIME)'),
 699        category=category,
 700        status=status,
 701        sid=sid,
 702        sensing_start_or_gen_time_ge=start_time,
 703        sensing_start_or_gen_time_lt=stop_time,
 704        order_by=['ACTIVITY'] + SID.sql_sys_select('JOBS') + ['STATUS'],
 705        group_by=['ACTIVITY'] + SID.sql_sys_select('JOBS') + ['STATUS'],
 706    ):
 707        activity = row[0]
 708        status = row[1]
 709        count = row[2]
 710        max_gen_time = row[3]
 711        max_early = row[4]
 712        sid = SID.from_sys_select('JOBS', row[5:])
 713
 714        if activity == last_activity:
 715            activity = ''
 716
 717        else:
 718            activity = {'text': activity, 'markup': 'activity'}
 719
 720        status = {'text': status, 'markup': status}
 721        # print_row[2] = {'colour': STATUS_COLOURS.get(print_row[2]), 'text': print_row[2]}
 722
 723        t.append((activity, sid, status, count, max_gen_time, max_early))
 724
 725        last_activity = activity
 726
 727    t.write()
 728
 729
 730def show_global_summary():
 731    """Show brief summary of all categories."""
 732    for category, count in find_jobs(
 733        fields=('CATEGORY', Count()), group_by='CATEGORY', order_by='CATEGORY'
 734    ):
 735
 736        t = Table(title='Category {cat}'.format(cat=category))
 737        t.append(('Total jobs', count))
 738        for status, count in find_jobs(
 739            fields=('STATUS', Count()),
 740            category=category,
 741            group_by='STATUS',
 742            order_by='STATUS',
 743        ):
 744
 745            if status is None:
 746                status = JobStatus.PENDING
 747                count = find_jobs(
 748                    fields=(Count(),), category=category, status=status,
 749                ).fetchone()[0]
 750
 751            elif status == JobStatus.IN_PROGRESS.name:
 752                old_cutoff = datetime.utcnow() - OLD_THRESHOLD
 753                old_count = db_conn.query(
 754                    "SELECT count(*) "
 755                    "FROM jobs, processes "
 756                    "WHERE jobs.process_id=processes.id "
 757                    "AND jobs.category=:category "
 758                    "AND jobs.status='IN_PROGRESS' "
 759                    "AND processes.execute_start<:cutoff",
 760                    category=category,
 761                    cutoff=old_cutoff,
 762                ).fetchone()[0]
 763                count = '{count} ({old_count} jobs are probably orphaned)'.format(
 764                    count=count, old_count=old_count
 765                )
 766
 767            t.append(('{status} jobs'.format(status=status), count))
 768
 769        early, late = find_jobs(
 770            fields=(Min('GEN_TIME'), Max('GEN_TIME')), category=category
 771        ).fetchone()
 772        t.append(('Earliest gen_time', early))
 773        t.append(('Latest gen_time', late))
 774        t.write()
 775
 776
 777def cleanup(category):
 778    """Convert TIMEOUT, RETRY and old IN_PROGRESS jobs to PENDING (null status)."""
 779    retry = update_jobs(category=category, status=JobStatus.RETRY, set_status=None)
 780    if retry > 0:
 781        logging.info('Changed {cc} jobs from RETRY to PENDING'.format(cc=retry))
 782
 783    timeout = update_jobs(category=category, status=JobStatus.TIMEOUT, set_status=None)
 784    if timeout > 0:
 785        logging.info('Changed {cc} jobs from TIMEOUT to PENDING'.format(cc=timeout))
 786
 787    total_in_progress = find_jobs(
 788        fields=(Count(),), category=category, status=JobStatus.IN_PROGRESS
 789    ).fetchone()[0]
 790    in_progress = 0
 791    for (jobid,) in db_conn.query(
 792        "SELECT jobs.id "
 793        "FROM jobs, processes "
 794        "WHERE category=:category "
 795        "AND jobs.process_id=processes.id "
 796        "AND jobs.status='IN_PROGRESS' "
 797        "AND processes.execute_start<:cutoff",
 798        category=category,
 799        cutoff=datetime.utcnow() - OLD_THRESHOLD,
 800    ):
 801        # logging.debug('Not changing job {i}'.format(i=jobid))
 802        update_jobs(job_id=jobid, set_status=None, commit=False)
 803        in_progress += 1
 804
 805    if (total_in_progress - in_progress) > 0:
 806        logging.info(
 807            '{cc} jobs are IN_PROGRESS and started recently (within {threshold})'.format(
 808                cc=total_in_progress - in_progress,
 809                threshold=show_timedelta_relaxed(OLD_THRESHOLD),
 810            )
 811        )
 812
 813    if in_progress > 0:
 814        logging.info(
 815            'Changed {cc} jobs from IN_PROGRESS to PENDING'.format(cc=in_progress)
 816        )
 817
 818    if retry == 0 and timeout == 0 and in_progress == 0:
 819        logging.info('No jobs to be cleaned up')
 820
 821    failed = find_jobs(
 822        fields=(Count(),), category=category, status=JobStatus.FAILED
 823    ).fetchone()[0]
 824    if failed > 0:
 825        logging.info('{cc} FAILED jobs were not modified'.format(cc=failed))
 826
 827    db_conn.commit()
 828
 829
 830def purge(category, min_time):
 831    """Delete all jobs in `category` older than `min_time`."""
 832
 833    count = delete_jobs(category=category, gen_time_lt=min_time)
 834    logging.info(
 835        'Deleted {cc} jobs older than {cutoff}'.format(cc=count, cutoff=min_time)
 836    )
 837
 838
 839def show_next_job(category, batch=False, target=sys.stdout):
 840    """Display the next job [batch] that the worker would run."""
 841    target.write('Next job to be executed in category {cat}:\n'.format(cat=category))
 842    job = worker.select_next_job(category)
 843    if job is None:
 844        target.write('No upcoming jobs\n')
 845        return
 846
 847    target.write('{job}\n'.format(job=job))
 848
 849    if batch:
 850        draw_jobs_table(
 851            'Batch',
 852            worker.find_similar_jobs(job, count=worker.DEFAULT_BATCH_SIZE),
 853            target,
 854        )
 855
 856
 857def reschedule(
 858    job_ids,
 859    category=None,
 860    status=None,
 861    activity=None,
 862    table=None,
 863    sid=None,
 864    start_time=None,
 865    stop_time=None,
 866    filename=None,
 867    orbit=None,
 868):
 869    """Reset job statuses to null."""
 870    if name_of_thing(activity).endswith('_STATS') and table is None:
 871        print('Warning: STATS job selected but no table filter')
 872
 873    # if is_listlike(activity):
 874    # if len(activity) == 1:
 875    # activity = activity[0]
 876    # else:
 877    # raise ValueError('Use only one activity for reschedule')
 878    # if activity is not None:
 879    # activity = Activity(activity)
 880
 881    if len(job_ids) > 0:
 882        cc = update_jobs(job_id=job_ids, set_status=JobStatus.PENDING, commit=False)
 883
 884    else:
 885        cc = update_jobs(
 886            category=category,
 887            activity=activity,
 888            status=status,
 889            table=table,
 890            sid=sid,
 891            sensing_start_ge=start_time,
 892            sensing_start_lt=stop_time,
 893            filename=filename,
 894            orbit=orbit,
 895            set_status=JobStatus.PENDING,
 896            commit=False,
 897        )
 898
 899    print('This command will update {cc} jobs'.format(cc=cc))
 900    if keyboard_confirm():
 901        print('Committing')
 902        db_conn.commit()
 903
 904    else:
 905        print('Not committing')
 906
 907
 908def expand_activities(activities):
 909    """Expand a wildcard such as --activity *_REPORT to a list of Activity objects."""
 910    result = None
 911    if activities is None:
 912        return []
 913
 914    result = set()
 915    for name in activities:
 916        if '%' in name:
 917            # raise ValueError('Use "*" for wildcards')
 918            # allow SQL-style wildcards
 919            name = name.replace('%', '*')
 920
 921        if '-' in name:
 922            print('Found \'-\' character in activity name, did you mean \'_\' instead?')
 923
 924        if '*' in name or '?' in name or '[' in name:
 925            matches = 0
 926            for a in Activity.all():
 927                if a.enabled and fnmatch.fnmatch(a.name, name.upper()):
 928                    result.add(a)
 929                    matches += 1
 930
 931            if matches == 0:
 932                raise ValueError('No matches found for {name}'.format(name=name))
 933
 934        else:
 935            try:
 936                activity = Activity(name)
 937            except Activity.UnknownActivity:
 938                raise ValueError('Unknown activity {act}'.format(act=name))
 939
 940            result.add(activity)
 941
 942    return list(result)
 943
 944
 945def show_derived_jobs(working_dir, target=sys.stdout):
 946    """Compute and show but not run derived jobs."""
 947    if not working_dir.is_dir():
 948        raise ValueError('Input value must be a working directory containing '
 949                         'work order and result XML files')
 950
 951    result_file = Result(working_dir.joinpath(alg_settings.RESULT_FILENAME))
 952    wo_file = WorkOrder(working_dir.joinpath(alg_settings.WORKORDER_FILENAME))
 953    wo_jobs = {}
 954    for wo_job in wo_file.read_jobs():
 955        print('wo job', wo_job)
 956        wo_jobs[wo_job.job_id] = wo_job
 957
 958    for job in result_file.read_jobs():
 959        print('result job', job)
 960        job.filename = wo_jobs.get(job.job_id).filename
 961        # print(res)
 962        # assert wo.job_id == res.job_id
 963        # wo.status = res.status
 964        # wo.activity = res.activity
 965        # wo.aux_outputs = res.aux_outputs
 966        # wo.tables = res.tables
 967        target.write('Examining job {j} for derived:\n'.format(j=job))
 968        # target.write('Examining job {id} {act}:'.format(id=wo.job_id, act=wo.activity))
 969        # count = 0
 970        for derived in worker.find_derived_jobs(job):
 971            # print('     derived {job}'.format(job=der))
 972            # count += 1
 973            target.write('      {d}\n'.format(d=derived))
 974
 975        # if count == 0:
 976        # print('     no derived jobs')
 977
 978
 979def main():
 980    """Command line entry point."""
 981    parser = ArgumentParser()
 982    category = parser.add_argument_group('Category')
 983    category.add_argument(
 984        '--category',
 985        default=settings.DEFAULT_CATEGORY,
 986        help='Job category. All other categories will be ignored.',
 987    )
 988
 989    actions = parser.add_argument_group('Actions', 'Actions to perform')
 990    actions.add_argument(
 991        '--list',
 992        '-l',
 993        const=None,
 994        nargs='?',
 995        metavar='STATUS',
 996        help='Display current jobs, optionally filtering by STATUS',
 997    )
 998    actions.add_argument('--show', '--info', type=int, help='Display a single job')
 999    actions.add_argument(
1000        '--show-derived',
1001        type=ArgumentParser.input_dirname,
1002        metavar='RESULT',
1003        help=(
1004            'Compute and display derived jobs from work order and result xml file '
1005            'within working directory'
1006        ),
1007    ),
1008    # actions.add_argument('--delete', '--delete-job',
1009    # type=int,
1010    # nargs='*',
1011    # help='Delete one or more jobs')
1012    actions.add_argument(
1013        '--summary',
1014        action='store_true',
1015        help='Show a summary view of a single category',
1016    )
1017    actions.add_argument(
1018        '--all-summary',
1019        action='store_true',
1020        help='Show a global summary over all categories',
1021    )
1022    # actions.add_argument('--global-summary', '--all-summary',
1023    # action='store_true',
1024    # help='Show general status for all categories. No other flags can be '
1025    # 'used.')
1026    actions.add_argument(
1027        '--cleanup',
1028        action='store_true',
1029        help='Convert RETRY, TIMEOUT and IN_PROGRESS jobs more than {dur} old to '
1030        'PENDING'.format(dur=OLD_THRESHOLD),
1031    )
1032    actions.add_argument(
1033        '--db', metavar='CONNECTION', help='Use database connection CONNECTION'
1034    )
1035    actions.add_argument('--project-id',
1036                         type=int,
1037                         metavar='ID',
1038                         help='Force PROJECT for multi-project JOBS table function')
1039    actions.add_argument(
1040        '--purge',
1041        metavar='AGE',
1042        help='Delete all jobs in category where generation time is older than AGE',
1043        type=ArgumentParser.datetime,
1044    )
1045
1046    filter_group = parser.add_argument_group('Filter', 'Filtering results')
1047    filter_group.add_argument('--execute-start',
1048                              type=ArgumentParser.start_time,
1049                              help='Filter for executed jobs')
1050    filter_group.add_argument('--execute-stop',
1051                              type=ArgumentParser.stop_time,
1052                              help='Filter for executed jobs')
1053
1054    add = parser.add_argument_group('Add', 'Adding jobs')
1055    add.add_argument(
1056        '--add',
1057        # action='store_true',
1058        const=True,
1059        nargs='?',
1060        help='Create new job(s)',
1061    )
1062
1063    add.add_argument(
1064        '--hourly',
1065        action='store_true',
1066        help='If a time range is specified add one job per day',
1067    )
1068    add.add_argument(
1069        '--orbital',
1070        action='store_true',
1071        help='If a time range is specified add one job per orbit',
1072    )
1073    add.add_argument(
1074        '--daily',
1075        action='store_true',
1076        help='If a time range is specified add one job per day',
1077    )
1078    add.add_argument(
1079        '--tod',
1080        # type=timedelta,
1081        help='For daily only specify time of day, if not midnight',
1082    )
1083    add.add_argument(
1084        '--weekly',
1085        action='store_true',
1086        help='If a time range is specified add one job per week at Monday 00:00',
1087    )
1088    add.add_argument(
1089        '--monthly',
1090        action='store_true',
1091        help='If a time range is specified add one job per month on the 1st, 00:00',
1092    )
1093    add.add_argument(
1094        '--daynight', action='store_true', help='Insert jobs for day/night regions'
1095    )
1096    add.add_argument(
1097        '--no-prompt', action='store_true', help='Do not prompt before creating jobs'
1098    )
1099    add.add_argument(
1100        '--earliest-execution-time',
1101        help='Set the earliest execution time for the jobs',
1102    )
1103
1104    run = parser.add_argument_group('Run', 'Options for executing jobs')
1105    run.add_argument(
1106        '--run',
1107        type=int,
1108        metavar='JOBID',
1109        nargs='*',
1110        help='Process JOBID. Derived jobs will be added to the jobs table '
1111        'but not processed',
1112    )
1113    # run.add_argument('--run-all',
1114    # action='store_true',
1115    # help='Process all jobs of the current category, including any '
1116    # 'derived jobs')
1117    # run.add_argument('--batch-size',
1118    # type=int,
1119    # default=DEFAULT_BATCH_SIZE,
1120    # help='Number of jobs to batch together for --run-all option')
1121    run.add_argument(
1122        '--outdir',
1123        '-o',
1124        type=ArgumentParser.output_dir,
1125        help='Working directory, if processing is done',
1126    )
1127    run.add_argument(
1128        '--worker-name',
1129        '--worker',
1130        default='manual',
1131        help="Worker name to be placed into the 'worker' field "
1132        "of the processes table",
1133    )
1134    run.add_argument(
1135        '--replace',
1136        action='store_true',
1137        help='Tell the ingester process to delete existing data before insertion',
1138    )
1139    run.add_argument(
1140        '--reschedule',
1141        type=int,
1142        nargs='*',
1143        # const=True,
1144        metavar='JOBID',
1145        help='Clear the status of JOBID(s) so they will be re-run by the worker',
1146    )
1147    run.add_argument(
1148        '--next', action='store_true', help='Just show the next job to be performed'
1149    )
1150    run.add_argument(
1151        '--batch',
1152        action='store_true',
1153        help=(
1154            'If --next-job is selected, show the entire batch instead of a single '
1155            'job'
1156        ),
1157    )
1158    run.add_argument(
1159        '--reingest',
1160        help='Force some algorithms to delete data before ingesting',
1161        action='store_true',
1162    )
1163
1164    properties = parser.add_argument_group(
1165        'Properties',
1166        'Job properties when adding or showing jobs')
1167    properties.add_argument('--sid', '-s',
1168                            type=ArgumentParser.sid,
1169                            # default=SID.SIDLESS,
1170                            help='Specify SID when adding a job')
1171    properties.add_argument('--filename',
1172                            # type=ArgumentParser.input_filename,
1173                            nargs='*',
1174                            help='Specify one or more filenames when adding a job, or as a search '
1175                            'string')
1176    properties.add_argument('--activity',
1177                            nargs='+',
1178                            help='Specify activity name when adding a job')
1179    properties.add_argument('--orbit',
1180                            type=int,
1181                            help='Orbit number if adding a job for a per-orbit activity')
1182    properties.add_argument('--start-time', '--start',
1183                            type=ArgumentParser.start_time,
1184                            metavar='START',
1185                            help=('Start time if adding a time-based job. When filtering START is '
1186                                  'applied to job sensing_start if set otherwise gen_time'))
1187    properties.add_argument('--stop-time', '--stop',
1188                            metavar='STOP',
1189                            type=ArgumentParser.stop_time,
1190                            help='Stop time if adding a time-based job if needed')
1191    properties.add_argument('--table', '--tablename',
1192                            # type=ArgumentParser.table,
1193                            help='destination tablename if adding an orbital stats job')
1194    properties.add_argument('--status',
1195                            metavar='STATUS',
1196                            default=None,
1197                            help='Only show jobs with status of STATUS '
1198                            '(use "none" for pending jobs')
1199    properties.add_argument('--missing',
1200                            action='store_true',
1201                            help='When listing jobs show only those where the job file is missing')
1202
1203    ctrl_display = parser.add_argument_group(
1204        'Display',
1205        'Control output display')
1206    ctrl_display.add_argument('--job',
1207                         action='store_true',
1208                         help='When displaying a single job show the JOBS table entries')
1209    ctrl_display.add_argument('--process',
1210                         action='store_true',
1211                         help='When displaying a single job show the PROCESSES table entries'
1212                         'table entries')
1213    args = parser.parse_args()
1214
1215    if args.db:
1216        settings.set_db_name(args.db)
1217
1218    if args.project_id:
1219        settings.DATABASE_PROJECT_ID = args.project_id
1220
1221    if args.table is not None and args.table[0] != '%':
1222        # throw error if invalid
1223        TableInfo(args.table)
1224
1225    if args.replace:
1226        os.environ['CHART_REPLACE'] = '1'
1227
1228    if args.reingest:
1229        from chart.common.env import set_env
1230
1231        set_env('{ENV_PREFIX}REINGEST', True)
1232
1233    # TBD change to proper data type
1234    if args.tod is not None:
1235        args.tod = xml_to_timedelta(args.tod)
1236
1237    if args.all_summary:
1238        # no category needed
1239        show_global_summary()
1240        parser.exit()
1241
1242    # should come before category sanity checks
1243    if args.show_derived:
1244        show_derived_jobs(args.show_derived)
1245        parser.exit()
1246
1247    if (
1248        int(args.show is not None)
1249        + int(args.summary)
1250        + int(args.list is not None)
1251        + int(args.add is not None)
1252        + int(args.reschedule is not None)
1253        > 1
1254    ):
1255        parser.error('Multiple actions specified')
1256
1257    if args.status is None:
1258        status = None
1259
1260    else:
1261        status = JobStatus[args.status.upper()]
1262
1263    if args.category:
1264        args.category = args.category.upper()
1265
1266    if args.reschedule is not None:
1267        reschedule(
1268            job_ids=args.reschedule,
1269            category=args.category,
1270            status=status,
1271            table=args.table,
1272            activity=expand_activities(args.activity),
1273            sid=args.sid,
1274            start_time=args.start_time,
1275            stop_time=args.stop_time,
1276            filename=args.filename[0] if args.filename is not None else None,
1277            orbit=args.orbit,
1278        )
1279        parser.exit()
1280
1281    if args.next:
1282        show_next_job(args.category, args.batch)
1283        parser.exit()
1284
1285    if args.summary:
1286        show_summary(
1287            category=args.category,
1288            status=status,
1289            sid=args.sid,
1290            start_time=args.start_time,
1291            stop_time=args.stop_time,
1292        )
1293
1294        parser.exit()
1295
1296    if args.cleanup:
1297        if args.category is None:
1298            parser.error('Cleanup option requires a category')
1299
1300        cleanup(args.category)
1301        parser.exit()
1302
1303    if args.purge:
1304        if args.category is None:
1305            parser.error('Purge option requires a category')
1306
1307        if args.category == settings.DEFAULT_CATEGORY:
1308            parser.error(
1309                'Cannot purge category {cat}'.format(cat=settings.DEFAULT_CATEGORY)
1310            )
1311
1312        purge(args.category, args.purge)
1313        parser.exit()
1314
1315    if args.outdir and str(args.outdir).isdigit():
1316        parser.error(
1317            'Note the -o option is used to specify output directory name. This '
1318            'cannot be a number since the option could be confused with '
1319            'the --orbit option'
1320        )
1321
1322    if args.list:
1323        if args.filename is not None and len(args.filename) > 1:
1324            parser.error('Can only search for a maximum of 1 filename')
1325
1326        activity = args.activity
1327
1328        if args.list is not ANY:
1329            # the user said --list <x>
1330
1331            if args.list.lower() == 'pending':
1332                status = None
1333
1334            else:
1335                try:
1336                    # see if it was --list <status>
1337                    status = JobStatus[args.list.upper()]
1338                except KeyError:
1339                    # see if it was --list <activity>
1340                    Activity(args.list)
1341                    # we don't allow --list <activity wildcard>
1342                    activity = [args.list]
1343
1344        if isinstance(status, str) and status.lower() == 'none':
1345            status = None
1346
1347        # Show jobs based on execute time from processes table
1348        if args.execute_start or args.execute_stop:
1349            list_jobs_by_process(activity=expand_activities(activity),
1350                                 execute_start=args.execute_start,
1351                                 execute_stop=args.execute_stop)
1352            parser.exit()
1353
1354        # Show jobs based on jobs table
1355        list_jobs(category=args.category,
1356                  status=status,
1357                  activities=expand_activities(activity),
1358                  sid=args.sid,
1359                  start_time=args.start_time,
1360                  stop_time=args.stop_time,
1361                  table=args.table,
1362                  filename=args.filename[0] if args.filename is not None else None,
1363                  orbit=args.orbit,
1364                  only_missing_files=args.missing)
1365        parser.exit()
1366
1367    if args.show:
1368        show_single_job(jobid=args.show, show_job=args.job, show_process=args.process)
1369        parser.exit()
1370
1371    if args.run and args.outdir is None:
1372        args.outdir = make_working_directory('jobcontrol')
1373
1374    if args.add is not None:
1375        if args.category is None:
1376            parser.error('Category must be specified')
1377
1378        if args.add is not True:
1379            # user wrote --add xxx
1380            activity = [args.add]
1381
1382        else:
1383            activity = args.activity
1384            # User wrote --add --activity x
1385
1386        if args.outdir is not None:
1387            parser.error('Cannot specify output directory if no jobs are being run')
1388
1389        jobs = make_jobs(
1390            category=args.category,
1391            activities=expand_activities(activity),
1392            filenames=args.filename,
1393            sid=args.sid,
1394            orbit=args.orbit,
1395            start_time=args.start_time,
1396            stop_time=args.stop_time,
1397            table=args.table,
1398            orbital=args.orbital,
1399            daynight=args.daynight,
1400            hourly=args.hourly,
1401            daily=args.daily,
1402            weekly=args.weekly,
1403            monthly=args.monthly,
1404            tod=args.tod,
1405            status=status,
1406            earliest_execution_time=args.earliest_execution_time,
1407        )
1408        logging.info('Will attempt to create {cc} jobs'.format(cc=len(jobs)))
1409
1410        if args.no_prompt or keyboard_confirm('Continue?'):
1411            dups = 0
1412            for j in jobs:
1413                notdup, job_id = add_or_sim_job(j)  # , return_id=True)
1414                if not notdup:
1415                    dups += 1
1416                    logging.info('Duplicate of job {jobid}'.format(jobid=job_id))
1417
1418                else:
1419                    logging.info('Created job {jobid}'.format(jobid=job_id))
1420
1421            logging.info(
1422                'Created {total} new jobs of which {dups} were duplicates'.format(
1423                    total=len(jobs), dups=dups
1424                )
1425            )
1426            db_conn.commit()
1427
1428        parser.exit()
1429
1430    if args.run:
1431        if args.outdir is None:
1432            parser.error('Specify a working directory with --outdir')
1433
1434        jobs = []
1435        for jobid in args.run:
1436            row = find_jobs(fields=job_retrieve_fields(), job_id=jobid).fetchone()
1437            if row is None:
1438                parser.error('Job {id} not found'.format(id=jobid))
1439
1440            jobs.append(Job(row=row))
1441
1442        worker.process_jobs(args.worker_name, jobs, args.outdir)
1443        parser.exit()
1444
1445    parser.error(
1446        'No action specified (try using --summary, --show, --add, --work, --reschedule '
1447        'or --cleanup)'
1448    )
1449
1450if __name__ == '__main__':
1451    main()