1#!/usr/bin/env python3
  2
  3"""Create a working directory and work order file, run an activity with the specified
  4configuration, and validate and optionally ingest the results.
  5"""
  6
  7import os
  8import re
  9from copy import copy, deepcopy
 10import time
 11import logging
 12import itertools
 13import subprocess
 14from collections import defaultdict
 15from typing import Dict
 16
 17import chart.db.ts
 18from chart.backend.result import Result
 19from chart.backend.workorder import WorkOrder
 20from chart.common.util import ensure_dir_exists
 21from chart.common.env import set_env
 22from chart.backend.job import Job
 23from chart.events.ingest import ingest_events
 24from chart.project import settings
 25from chart.reports.ingest import ingest_report
 26import chart.alg.settings
 27from chart.backend.processes import ProcessStatus
 28from chart.backend.job import JobStatus
 29from chart.events.exceptions import InvalidEvent
 30import imp
 31
 32logger = logging.getLogger()
 33
 34# If a subprocess cannot be started due to a file lock, retry
 35# this many times ...
 36SUBPROCESS_RETRIES = 7
 37
 38# ... and wait for this many seconds between retries
 39SUBPROCESS_WAIT_SECONDS = 15
 40
 41# If we see a disk full or memory allocation error wait for SYS_ERR_PAUSE seconds
 42# before doing anything else.
 43SYS_ERR_PAUSE = 900
 44
 45# class LogLevel(Enum):
 46    # DEBUG = 'DEBUG'
 47    # INFO = 'INFO'
 48    # WARN = 'WARN'
 49    # ERROR = 'ERROR'
 50    # CRITICAL = 'CRITICAL'
 51    # UNRECOGNISED = None
 52
 53class DispatchResult:
 54    def __init__(self, process_status: ProcessStatus,
 55                 log_file_analysis: Dict[str, int]=None):
 56        self.process_status = process_status
 57        self.log_file_analysis = log_file_analysis
 58
 59
 60def dispatch(jobs,
 61             work_dir,
 62             ingest=False,
 63             use_subprocess=False,
 64             fake_writes=False,
 65             sendmails=False):
 66    """Execute a list of jobs and return the status of each.
 67    This function modifies the current working directory.
 68    Sample work order for an activity which expects a multiple-job format work order file::
 69
 70        <work-order>
 71          <activity>PDU_INGESTER</activity>
 72          <job>
 73            <id>123</id>
 74            <filename>HKTM_xxx_...</filename>
 75            <dirname>/leo/GS/tmp/store...</dirname>
 76          </job>
 77        </work-order>
 78
 79    Unit tests should not be run with use_subprocess=False because then
 80    when multiple tests are run together via nosetests, test_gome_sum fails.
 81    However when run with use_subprocess=True some tests hang.
 82
 83    Args:
 84        `jobs` (list of Job): Work to be done
 85        `work_dir` (Path): Filename to use as work directory. Will be created if needed.
 86        `ingest` (bool): If set any reports generated will be ingested.
 87        `use_subprocess` (bool): Spawn a separate OS subprocess for the actual work
 88        `fake_writes` (bool): Activate fake writes (to CSV files) for the job.
 89            Note once enabled, fake writes is always on even after this function returns.
 90
 91    Returns:
 92        ProcessStatus.
 93
 94    """
 95    # logger.debug('dispatcher fake writes {fw}'.format(fw=fake_writes))
 96    # if use_subprocess:
 97        # logger.debug('Dispatch with subprocess time out {to}'.format(
 98        # to=settings.DISPATCHER_TIMEOUT)
 99
100    # else:
101        # logger.debug('Dispatch using inline call')
102
103    if len(jobs) == 0:
104        return DispatchResult(ProcessStatus.COMPLETED, None)
105
106    # Allow client to pass in dictionaries instead of Job objects
107    # this is a hack to get the unit tests working as they all use dictionaries for Jobs
108    if len(jobs) > 0 and isinstance(jobs[0], dict):
109        new_jobs = []
110        for job in jobs:
111            if 'id' in job:
112                job['job_id'] = job['id']
113                del job['id']
114
115            new_jobs.append(Job(**job))
116
117        jobs = new_jobs
118
119    activity = jobs[0].activity  # all jobs must have same activity
120    category = jobs[0].category  # all jobs must have same category
121
122    # if activity.convention is CallingConvention.SID_HOURLY and not inner_call:
123        # return dispatch(jobs=hourly_expand(jobs),
124                        # work_dir=work_dir,
125                        # ingest=ingest,
126                        # use_subprocess=use_subprocess,
127                        # fake_writes=fake_writes,
128                        # inner_call=True)
129
130    if fake_writes:
131        old_fake_writes = settings.FAKE_WRITES
132        settings.FAKE_WRITES = True
133        imp.reload(chart.db.ts.dispatch)
134        imp.reload(chart.db.ts)
135
136    # make sure work_dir is absolute even if we change pwd
137    work_dir = work_dir.absolute()
138
139    wo_path = work_dir.child(chart.alg.settings.WORKORDER_FILENAME)
140    wo = WorkOrder(filename=wo_path,
141                   mode='w',
142                   activity=activity,
143                   category=category)
144
145    for job in jobs:
146        wo.add_job(job)
147
148    # transfer instability element over
149    instability = activity.elem.find('instability')
150    if instability is not None:
151        # deepcopy is required otherwise we will move the
152        # <instability> element causing subsequent calls
153        # to fail
154        wo.elem.append(deepcopy(instability.elem))
155
156    ensure_dir_exists(work_dir)
157
158    try:
159        wo.write()
160    except IOError as e:
161        # probably disk full (errno 28 is probably disk full)
162        logger.error(str(e))
163        time.sleep(SYS_ERR_PAUSE)
164        return DispatchResult(ProcessStatus.RETRY, None)
165    except OSError as e:
166        if e.errno == 12:  # probably out of memory
167            logger.error(str(e))
168            time.sleep(SYS_ERR_PAUSE)
169            return DispatchResult(ProcessStatus.RETRY, None)
170
171    logger.info('Work order created running {exe}'.format(
172            exe=activity.abs_executable))
173
174    # Actually calls the subprocess or run in process
175    res = dispatch_work_order(wo_path, use_subprocess)
176    if res.process_status is not ProcessStatus.COMPLETED:
177        for job in jobs:
178            job.status = JobStatus[res.process_status.name]
179
180        return res
181
182    def fail(message):
183        """If anything goes wrong with the subprocess, especially if the result file
184        has any problems, we flag everything (the individual jobs and the process itself)
185        as FAILED.
186        """
187
188        logger.error(message + '. Setting the status of all jobs and the process to FAILED')
189        for job in jobs:
190            job.status = JobStatus.FAILED
191
192        return DispatchResult(ProcessStatus.FAILED, None)
193
194    # read the result.xml file and set the job statuses
195    try:
196        result_file = Result(filename=work_dir.child(chart.alg.settings.RESULT_FILENAME),
197                     mode='r')
198    except IOError:
199        return fail('Cannot read result file')
200
201    total_jobs = 0
202    completed_jobs = 0
203    for job_res, job in zip(result_file.read_jobs(), jobs):
204        if job_res.status is None:
205            return fail('Cannot read job status')
206
207        if job_res.job_id != job.job_id:
208            return fail('Mismatch in job ID values, expecting {exp} found {act}'.format(
209                    exp=job.job_id,
210                    act=job_res.job_id))
211
212        job.status = job_res.status
213        if job_res.status is JobStatus.COMPLETED:
214            completed_jobs += 1
215
216        job.tables = job_res.tables
217
218        total_jobs += 1
219
220    if total_jobs != len(jobs):
221        return fail('Not all jobs were listed in results file. Assuming the process failed')
222
223    if completed_jobs == total_jobs:
224        logger.info('{completed} out of {total} jobs completed'.format(
225                completed=completed_jobs,
226                total=total_jobs))
227
228    else:
229        logger.warn('{completed} out of {total} jobs completed'.format(
230                completed=completed_jobs,
231                total=total_jobs))
232
233    # for NRT processing only ingest events.xml file
234    # logger.debug('cleanup: ingest {ingest} events {events}'.format(
235    # ingest=ingest, events=os.path.exists(os.path.join(work_dir,
236    # chart.alg.settings.EVENTS_FILENAME))))
237    if ingest and work_dir.child(chart.alg.settings.EVENTS_FILENAME).exists():
238        # if jobs[0].category == 'SCHEDULER':
239            # sendmail = True
240
241        # else:
242            # sendmail = False
243
244        try:
245            ingest_events(input_file=work_dir.child(chart.alg.settings.EVENTS_FILENAME),
246                          sendmails=sendmails)
247        except InvalidEvent as e:
248            logger.error('Found bad event {e}'.format(e=e))
249            return DispatchResult(ProcessStatus.FAILED, None)
250
251    # for NRT processing of report algorithms only, zip and archive the working directory
252    if ingest and activity.classname == 'report':  # and category == 'SCHEDULER':
253        # zip up the archive directory, copy it to the archive directory
254        # and make an entry in the reports table
255        ingest_report(work_dir)
256
257    first_fail = None
258
259    # the worker process will pause for 15 minutes if there is a database problem
260    for job in jobs:
261        if job.status is JobStatus.RETRY:
262            return DispatchResult(ProcessStatus.RETRY, None)
263
264        elif job.status is JobStatus.FAILED:
265            if first_fail is None:
266                first_fail = job
267
268    # restore fake writes in case we are being run via nosetests and there is another
269    # test coming up which doesn't want them
270    if fake_writes:
271        settings.FAKE_WRITES = old_fake_writes
272        imp.reload(chart.db.ts)
273
274    return res
275
276
277def dispatch_subprocess(work_dir, activity) -> DispatchResult:
278    """Run algorithm as a subprocess."""
279    logger.debug('Dispatching using subprocess')
280    env = copy(os.environ)
281    # AIX magic:
282    env['LDR_CNTRL'] = 'MAXDATA=0x80000000'
283    # this is needed on AIX otherwise our subprocess doesn't
284    # get allocated enough memory
285
286    log_path = work_dir.child(chart.alg.settings.LOG_FILENAME)
287    if log_path.exists():
288        log_path.unlink()
289
290    # tell the child process to log to its working directory
291    env['{ENV_PREFIX}LOG_FILE'.format(
292            ENV_PREFIX=settings.ENV_PREFIX)] = str(chart.alg.settings.LOG_FILENAME)
293
294    # this is needed for daily digest to tell it where the rotating log files are
295    # when run from reporter.
296    # The log module gives priority to single log file if both single and multiple are configured
297    # logger.debug('Setting env ROTATING_LOG_FILE to ' + str(settings.ROTATING_LOG_FILE))
298    if settings.ROTATING_LOG_FILE is not None:
299        env['{ENV_PREFIX}ROTATING_LOG_FILE'.format(ENV_PREFIX=settings.ENV_PREFIX)] =\
300            settings.ROTATING_LOG_FILE
301
302    out = open(str(work_dir.child(chart.alg.settings.STDOUT_FILENAME)), 'w+')
303    err = open(str(work_dir.child(chart.alg.settings.STDERR_FILENAME)), 'w+')
304    child = None
305    for _ in range(SUBPROCESS_RETRIES):
306        # if the algorithm executable file is locked keep retrying it
307        # logger.debug('Run {exe}'.format(exe=activity.abs_executable))
308        try:
309            child = subprocess.Popen((str(activity.abs_executable),),
310                                     cwd=str(work_dir),
311                                     env=env,
312                                     #stdout=subprocess.PIPE,
313                                     #stderr=subprocess.PIPE)
314                                     stdout=out.fileno(),
315                                     stderr=err.fileno())
316        except OSError as e:
317            logger.info('{err} - retrying after {wait} seconds'.format(
318                    err=e, wait=SUBPROCESS_WAIT_SECONDS))
319            # this can happen if i.e. the file is unavailable for a moment
320            time.sleep(SUBPROCESS_WAIT_SECONDS)
321
322        break
323
324    if child is None:
325        logger.error('Could not run {exe}'.format(exe=activity.abs_executable))
326        return DispatchResult(ProcessStatus.FAILED)
327
328    # wait until either the child process exits or we reach timeout
329    start_time = time.time()
330    activity_timeout = activity.timeout
331    # Allow for per-activity timeouts
332    if activity_timeout is None:
333        timeout_time = start_time + settings.DISPATCHER_TIMEOUT.total_seconds()
334
335    else:
336        timeout_time = (start_time + min(settings.DISPATCHER_TIMEOUT,
337                                         activity_timeout)).total_seconds()
338
339    while time.time() < timeout_time and child.returncode is None:
340        if child.poll() is None:
341            time.sleep(1)
342
343    # log an error in the worker log file if the return code is non-zero
344    if child.returncode != 0:
345        if child.returncode is None:
346            logger.error('Timeout after {limit}'.format(limit=settings.DISPATCHER_TIMEOUT))
347            child.kill()
348            return DispatchResult(ProcessStatus.TIMEOUT)
349
350        logger.error('Algorithm {exe} exited with code {exitcode}'.format(
351                exe=activity.abs_executable, exitcode=child.returncode))
352
353    # copy to worker log file any text written to stderr
354    if err.tell() > 0:
355        err.seek(0)
356        for line in err:
357            line = line.strip()
358            if len(line) > 0:
359                logger.error('STDERR: ' + line)
360
361        err.close()
362
363    else:
364        err.close()
365        os.unlink(err.name)
366
367    # delete stdout.txt if it is empty
368    if out.tell() > 0:
369        out.close()
370
371    else:
372        out.close()
373        os.unlink(out.name)
374
375    # log file scan. We just print the results to our log to the daily digest
376    # log file analysis of worker.log will detect it
377    if work_dir.child(chart.alg.settings.LOG_FILENAME).exists():
378        logger_finder = re.compile(r'[0-9-]+ [0-9:]+ [^ ]+ ([A-Z]+)')
379        # logger_finder = re.compile(r'^.{20}([A-Z]+)')
380        msg_map = defaultdict(int)
381        for line in open(str(work_dir.child(chart.alg.settings.LOG_FILENAME)), 'r'):
382            match = logger_finder.match(line)
383            if match is not None:
384                msg_map[match.group(1)] += 1
385
386            else:
387                pass
388
389        logger.info('Subprocess complete log file analysis shows {counts}'.format(
390                counts=', '.join(k + ':' + str(v) for k, v in msg_map.items())))
391
392        if msg_map['ERROR'] > 0:
393            logger.error('{cc} errors were reported'.format(cc=msg_map['ERROR']))
394
395        if msg_map['CRITICAL'] > 0:
396            logger.critical('Critical errors were reported')
397
398    else:
399        logger.info('No log file found')
400        msg_map = None
401
402    return DispatchResult(ProcessStatus.COMPLETED, msg_map)
403
404
405def dispatch_inline(activity) -> DispatchResult:
406    """Run algorithm by importing it into this process."""
407    logger.debug('Dispatching using inline import')
408    os.environ['CHART_DISPATCHER'] = '1'
409    logger.info('Importing {exe}'.format(exe=activity.abs_executable))
410    import imp
411    imp.load_source('__main__', str(activity.abs_executable))
412    logger.info('Post import csv sync')
413    from chart.db import ts  # (unused) pylint: disable=W0612
414    if hasattr(ts, 'csv_files'):
415        for c in ts.csv_files:
416            # logger.debug('Closing file')
417            c.handle.close()
418
419    # logger.info('Done csv scan')
420    return DispatchResult(ProcessStatus.COMPLETED, None)
421
422
423def dispatch_work_order(wo_path, use_subprocess=False) -> DispatchResult:
424    """Given a prepared work order `wo_path` spawn a subprocess and execute it."""
425    wo = WorkOrder(wo_path)
426    activity = wo.activity
427    work_dir = wo_path.parent.absolute()
428    os.chdir(str(work_dir))
429
430    # tell the subprocess it is being run by the dispatcher
431    set_env('CHART_DISPATCHER', '1')
432
433    # make sure the subprocess inherits the same database as us
434    set_env('{ENV_PREFIX}DB', settings.DATABASES['default']['DB_NAME'])
435
436    # Delete existing core file. We report if a new one is created.
437    if work_dir.child('core').exists():
438        work_dir.child('core').unlink()
439
440    if use_subprocess or \
441            settings.SINGLE_LOG_FILE is not None or \
442            settings.ROTATING_LOG_FILE is not None or \
443            settings.DISPATCHER_SUBPROCESS:
444        res = dispatch_subprocess(work_dir, activity)
445
446    else:
447        res = dispatch_inline(activity)
448
449    # any core files should be noted
450    if work_dir.child('core').exists():
451        logger.error('Algorithm produced a core dump')
452
453    return res