1#!/usr/bin/env python3
2
3"""Create a working directory and work order file, run an activity with the specified
4configuration, and validate and optionally ingest the results.
5"""
6
7import os
8import re
9from copy import copy, deepcopy
10import time
11import logging
12import itertools
13import subprocess
14from collections import defaultdict
15from typing import Dict
16
17import chart.db.ts
18from chart.backend.result import Result
19from chart.backend.workorder import WorkOrder
20from chart.common.util import ensure_dir_exists
21from chart.common.env import set_env
22from chart.backend.job import Job
23from chart.events.ingest import ingest_events
24from chart.project import settings
25from chart.reports.ingest import ingest_report
26import chart.alg.settings
27from chart.backend.processes import ProcessStatus
28from chart.backend.job import JobStatus
29from chart.events.exceptions import InvalidEvent
30import imp
31
32logger = logging.getLogger()
33
34# If a subprocess cannot be started due to a file lock, retry
35# this many times ...
36SUBPROCESS_RETRIES = 7
37
38# ... and wait for this many seconds between retries
39SUBPROCESS_WAIT_SECONDS = 15
40
41# If we see a disk full or memory allocation error wait for SYS_ERR_PAUSE seconds
42# before doing anything else.
43SYS_ERR_PAUSE = 900
44
45# class LogLevel(Enum):
46 # DEBUG = 'DEBUG'
47 # INFO = 'INFO'
48 # WARN = 'WARN'
49 # ERROR = 'ERROR'
50 # CRITICAL = 'CRITICAL'
51 # UNRECOGNISED = None
52
53class DispatchResult:
54 def __init__(self, process_status: ProcessStatus,
55 log_file_analysis: Dict[str, int]=None):
56 self.process_status = process_status
57 self.log_file_analysis = log_file_analysis
58
59
60def dispatch(jobs,
61 work_dir,
62 ingest=False,
63 use_subprocess=False,
64 fake_writes=False,
65 sendmails=False):
66 """Execute a list of jobs and return the status of each.
67 This function modifies the current working directory.
68 Sample work order for an activity which expects a multiple-job format work order file::
69
70 <work-order>
71 <activity>PDU_INGESTER</activity>
72 <job>
73 <id>123</id>
74 <filename>HKTM_xxx_...</filename>
75 <dirname>/leo/GS/tmp/store...</dirname>
76 </job>
77 </work-order>
78
79 Unit tests should not be run with use_subprocess=False because then
80 when multiple tests are run together via nosetests, test_gome_sum fails.
81 However when run with use_subprocess=True some tests hang.
82
83 Args:
84 `jobs` (list of Job): Work to be done
85 `work_dir` (Path): Filename to use as work directory. Will be created if needed.
86 `ingest` (bool): If set any reports generated will be ingested.
87 `use_subprocess` (bool): Spawn a separate OS subprocess for the actual work
88 `fake_writes` (bool): Activate fake writes (to CSV files) for the job.
89 Note once enabled, fake writes is always on even after this function returns.
90
91 Returns:
92 ProcessStatus.
93
94 """
95 # logger.debug('dispatcher fake writes {fw}'.format(fw=fake_writes))
96 # if use_subprocess:
97 # logger.debug('Dispatch with subprocess time out {to}'.format(
98 # to=settings.DISPATCHER_TIMEOUT)
99
100 # else:
101 # logger.debug('Dispatch using inline call')
102
103 if len(jobs) == 0:
104 return DispatchResult(ProcessStatus.COMPLETED, None)
105
106 # Allow client to pass in dictionaries instead of Job objects
107 # this is a hack to get the unit tests working as they all use dictionaries for Jobs
108 if len(jobs) > 0 and isinstance(jobs[0], dict):
109 new_jobs = []
110 for job in jobs:
111 if 'id' in job:
112 job['job_id'] = job['id']
113 del job['id']
114
115 new_jobs.append(Job(**job))
116
117 jobs = new_jobs
118
119 activity = jobs[0].activity # all jobs must have same activity
120 category = jobs[0].category # all jobs must have same category
121
122 # if activity.convention is CallingConvention.SID_HOURLY and not inner_call:
123 # return dispatch(jobs=hourly_expand(jobs),
124 # work_dir=work_dir,
125 # ingest=ingest,
126 # use_subprocess=use_subprocess,
127 # fake_writes=fake_writes,
128 # inner_call=True)
129
130 if fake_writes:
131 old_fake_writes = settings.FAKE_WRITES
132 settings.FAKE_WRITES = True
133 imp.reload(chart.db.ts.dispatch)
134 imp.reload(chart.db.ts)
135
136 # make sure work_dir is absolute even if we change pwd
137 work_dir = work_dir.absolute()
138
139 wo_path = work_dir.child(chart.alg.settings.WORKORDER_FILENAME)
140 wo = WorkOrder(filename=wo_path,
141 mode='w',
142 activity=activity,
143 category=category)
144
145 for job in jobs:
146 wo.add_job(job)
147
148 # transfer instability element over
149 instability = activity.elem.find('instability')
150 if instability is not None:
151 # deepcopy is required otherwise we will move the
152 # <instability> element causing subsequent calls
153 # to fail
154 wo.elem.append(deepcopy(instability.elem))
155
156 ensure_dir_exists(work_dir)
157
158 try:
159 wo.write()
160 except IOError as e:
161 # probably disk full (errno 28 is probably disk full)
162 logger.error(str(e))
163 time.sleep(SYS_ERR_PAUSE)
164 return DispatchResult(ProcessStatus.RETRY, None)
165 except OSError as e:
166 if e.errno == 12: # probably out of memory
167 logger.error(str(e))
168 time.sleep(SYS_ERR_PAUSE)
169 return DispatchResult(ProcessStatus.RETRY, None)
170
171 logger.info('Work order created running {exe}'.format(
172 exe=activity.abs_executable))
173
174 # Actually calls the subprocess or run in process
175 res = dispatch_work_order(wo_path, use_subprocess)
176 if res.process_status is not ProcessStatus.COMPLETED:
177 for job in jobs:
178 job.status = JobStatus[res.process_status.name]
179
180 return res
181
182 def fail(message):
183 """If anything goes wrong with the subprocess, especially if the result file
184 has any problems, we flag everything (the individual jobs and the process itself)
185 as FAILED.
186 """
187
188 logger.error(message + '. Setting the status of all jobs and the process to FAILED')
189 for job in jobs:
190 job.status = JobStatus.FAILED
191
192 return DispatchResult(ProcessStatus.FAILED, None)
193
194 # read the result.xml file and set the job statuses
195 try:
196 result_file = Result(filename=work_dir.child(chart.alg.settings.RESULT_FILENAME),
197 mode='r')
198 except IOError:
199 return fail('Cannot read result file')
200
201 total_jobs = 0
202 completed_jobs = 0
203 for job_res, job in zip(result_file.read_jobs(), jobs):
204 if job_res.status is None:
205 return fail('Cannot read job status')
206
207 if job_res.job_id != job.job_id:
208 return fail('Mismatch in job ID values, expecting {exp} found {act}'.format(
209 exp=job.job_id,
210 act=job_res.job_id))
211
212 job.status = job_res.status
213 if job_res.status is JobStatus.COMPLETED:
214 completed_jobs += 1
215
216 job.tables = job_res.tables
217
218 total_jobs += 1
219
220 if total_jobs != len(jobs):
221 return fail('Not all jobs were listed in results file. Assuming the process failed')
222
223 if completed_jobs == total_jobs:
224 logger.info('{completed} out of {total} jobs completed'.format(
225 completed=completed_jobs,
226 total=total_jobs))
227
228 else:
229 logger.warn('{completed} out of {total} jobs completed'.format(
230 completed=completed_jobs,
231 total=total_jobs))
232
233 # for NRT processing only ingest events.xml file
234 # logger.debug('cleanup: ingest {ingest} events {events}'.format(
235 # ingest=ingest, events=os.path.exists(os.path.join(work_dir,
236 # chart.alg.settings.EVENTS_FILENAME))))
237 if ingest and work_dir.child(chart.alg.settings.EVENTS_FILENAME).exists():
238 # if jobs[0].category == 'SCHEDULER':
239 # sendmail = True
240
241 # else:
242 # sendmail = False
243
244 try:
245 ingest_events(input_file=work_dir.child(chart.alg.settings.EVENTS_FILENAME),
246 sendmails=sendmails)
247 except InvalidEvent as e:
248 logger.error('Found bad event {e}'.format(e=e))
249 return DispatchResult(ProcessStatus.FAILED, None)
250
251 # for NRT processing of report algorithms only, zip and archive the working directory
252 if ingest and activity.classname == 'report': # and category == 'SCHEDULER':
253 # zip up the archive directory, copy it to the archive directory
254 # and make an entry in the reports table
255 ingest_report(work_dir)
256
257 first_fail = None
258
259 # the worker process will pause for 15 minutes if there is a database problem
260 for job in jobs:
261 if job.status is JobStatus.RETRY:
262 return DispatchResult(ProcessStatus.RETRY, None)
263
264 elif job.status is JobStatus.FAILED:
265 if first_fail is None:
266 first_fail = job
267
268 # restore fake writes in case we are being run via nosetests and there is another
269 # test coming up which doesn't want them
270 if fake_writes:
271 settings.FAKE_WRITES = old_fake_writes
272 imp.reload(chart.db.ts)
273
274 return res
275
276
277def dispatch_subprocess(work_dir, activity) -> DispatchResult:
278 """Run algorithm as a subprocess."""
279 logger.debug('Dispatching using subprocess')
280 env = copy(os.environ)
281 # AIX magic:
282 env['LDR_CNTRL'] = 'MAXDATA=0x80000000'
283 # this is needed on AIX otherwise our subprocess doesn't
284 # get allocated enough memory
285
286 log_path = work_dir.child(chart.alg.settings.LOG_FILENAME)
287 if log_path.exists():
288 log_path.unlink()
289
290 # tell the child process to log to its working directory
291 env['{ENV_PREFIX}LOG_FILE'.format(
292 ENV_PREFIX=settings.ENV_PREFIX)] = str(chart.alg.settings.LOG_FILENAME)
293
294 # this is needed for daily digest to tell it where the rotating log files are
295 # when run from reporter.
296 # The log module gives priority to single log file if both single and multiple are configured
297 # logger.debug('Setting env ROTATING_LOG_FILE to ' + str(settings.ROTATING_LOG_FILE))
298 if settings.ROTATING_LOG_FILE is not None:
299 env['{ENV_PREFIX}ROTATING_LOG_FILE'.format(ENV_PREFIX=settings.ENV_PREFIX)] =\
300 settings.ROTATING_LOG_FILE
301
302 out = open(str(work_dir.child(chart.alg.settings.STDOUT_FILENAME)), 'w+')
303 err = open(str(work_dir.child(chart.alg.settings.STDERR_FILENAME)), 'w+')
304 child = None
305 for _ in range(SUBPROCESS_RETRIES):
306 # if the algorithm executable file is locked keep retrying it
307 # logger.debug('Run {exe}'.format(exe=activity.abs_executable))
308 try:
309 child = subprocess.Popen((str(activity.abs_executable),),
310 cwd=str(work_dir),
311 env=env,
312 #stdout=subprocess.PIPE,
313 #stderr=subprocess.PIPE)
314 stdout=out.fileno(),
315 stderr=err.fileno())
316 except OSError as e:
317 logger.info('{err} - retrying after {wait} seconds'.format(
318 err=e, wait=SUBPROCESS_WAIT_SECONDS))
319 # this can happen if i.e. the file is unavailable for a moment
320 time.sleep(SUBPROCESS_WAIT_SECONDS)
321
322 break
323
324 if child is None:
325 logger.error('Could not run {exe}'.format(exe=activity.abs_executable))
326 return DispatchResult(ProcessStatus.FAILED)
327
328 # wait until either the child process exits or we reach timeout
329 start_time = time.time()
330 activity_timeout = activity.timeout
331 # Allow for per-activity timeouts
332 if activity_timeout is None:
333 timeout_time = start_time + settings.DISPATCHER_TIMEOUT.total_seconds()
334
335 else:
336 timeout_time = (start_time + min(settings.DISPATCHER_TIMEOUT,
337 activity_timeout)).total_seconds()
338
339 while time.time() < timeout_time and child.returncode is None:
340 if child.poll() is None:
341 time.sleep(1)
342
343 # log an error in the worker log file if the return code is non-zero
344 if child.returncode != 0:
345 if child.returncode is None:
346 logger.error('Timeout after {limit}'.format(limit=settings.DISPATCHER_TIMEOUT))
347 child.kill()
348 return DispatchResult(ProcessStatus.TIMEOUT)
349
350 logger.error('Algorithm {exe} exited with code {exitcode}'.format(
351 exe=activity.abs_executable, exitcode=child.returncode))
352
353 # copy to worker log file any text written to stderr
354 if err.tell() > 0:
355 err.seek(0)
356 for line in err:
357 line = line.strip()
358 if len(line) > 0:
359 logger.error('STDERR: ' + line)
360
361 err.close()
362
363 else:
364 err.close()
365 os.unlink(err.name)
366
367 # delete stdout.txt if it is empty
368 if out.tell() > 0:
369 out.close()
370
371 else:
372 out.close()
373 os.unlink(out.name)
374
375 # log file scan. We just print the results to our log to the daily digest
376 # log file analysis of worker.log will detect it
377 if work_dir.child(chart.alg.settings.LOG_FILENAME).exists():
378 logger_finder = re.compile(r'[0-9-]+ [0-9:]+ [^ ]+ ([A-Z]+)')
379 # logger_finder = re.compile(r'^.{20}([A-Z]+)')
380 msg_map = defaultdict(int)
381 for line in open(str(work_dir.child(chart.alg.settings.LOG_FILENAME)), 'r'):
382 match = logger_finder.match(line)
383 if match is not None:
384 msg_map[match.group(1)] += 1
385
386 else:
387 pass
388
389 logger.info('Subprocess complete log file analysis shows {counts}'.format(
390 counts=', '.join(k + ':' + str(v) for k, v in msg_map.items())))
391
392 if msg_map['ERROR'] > 0:
393 logger.error('{cc} errors were reported'.format(cc=msg_map['ERROR']))
394
395 if msg_map['CRITICAL'] > 0:
396 logger.critical('Critical errors were reported')
397
398 else:
399 logger.info('No log file found')
400 msg_map = None
401
402 return DispatchResult(ProcessStatus.COMPLETED, msg_map)
403
404
405def dispatch_inline(activity) -> DispatchResult:
406 """Run algorithm by importing it into this process."""
407 logger.debug('Dispatching using inline import')
408 os.environ['CHART_DISPATCHER'] = '1'
409 logger.info('Importing {exe}'.format(exe=activity.abs_executable))
410 import imp
411 imp.load_source('__main__', str(activity.abs_executable))
412 logger.info('Post import csv sync')
413 from chart.db import ts # (unused) pylint: disable=W0612
414 if hasattr(ts, 'csv_files'):
415 for c in ts.csv_files:
416 # logger.debug('Closing file')
417 c.handle.close()
418
419 # logger.info('Done csv scan')
420 return DispatchResult(ProcessStatus.COMPLETED, None)
421
422
423def dispatch_work_order(wo_path, use_subprocess=False) -> DispatchResult:
424 """Given a prepared work order `wo_path` spawn a subprocess and execute it."""
425 wo = WorkOrder(wo_path)
426 activity = wo.activity
427 work_dir = wo_path.parent.absolute()
428 os.chdir(str(work_dir))
429
430 # tell the subprocess it is being run by the dispatcher
431 set_env('CHART_DISPATCHER', '1')
432
433 # make sure the subprocess inherits the same database as us
434 set_env('{ENV_PREFIX}DB', settings.DATABASES['default']['DB_NAME'])
435
436 # Delete existing core file. We report if a new one is created.
437 if work_dir.child('core').exists():
438 work_dir.child('core').unlink()
439
440 if use_subprocess or \
441 settings.SINGLE_LOG_FILE is not None or \
442 settings.ROTATING_LOG_FILE is not None or \
443 settings.DISPATCHER_SUBPROCESS:
444 res = dispatch_subprocess(work_dir, activity)
445
446 else:
447 res = dispatch_inline(activity)
448
449 # any core files should be noted
450 if work_dir.child('core').exists():
451 logger.error('Algorithm produced a core dump')
452
453 return res