1#!/usr/bin/env python3
2
3"""Ingest Procedure Archive zipfiles.
4
5Documented in [MOF MON] "MTG GS MOD_MME Monitoring ICD", EUM/MTG/ICD/12/0724
6
7Sections
8 5.9 File format description
9
10 5.9.3 File name convention for holding zipfile (although supplied test data does not follow it)
11
12For this algorithm we perform a full scan of the .zip file and ingest all likely contents.
13
14From the low level files, types are:
15
16L1 (Step Event): Domain, Counter, Timestamp, Attribute, Value, Step Identifier
17L2 (Step List Event): Domain, Counter, Timestamp, State Value, Step List Identifier
18L3 (Thread Event): Domain, Counter, Timestamp, Attribute, Value, Thread Identifier
19L4 (Argument): Domain, Counter, Timestamp, Attribute, Value
20L5 (Variable Event): Domain, Counter, Timestamp, Attribute, Value
21
22Sample zipfile name: APEX_history.zip
23
24Sample internal name: APEX_history/ProcArchive 2018.05.02 00-00/P7659_7139.txt
25
26Sample line (new format, utf-8 encoding):
27
28<H>MTI1:¬S=17¬0¬1562934452@174
29<L>1¬MTI1:¬12¬1562828931@214¬S=5¬P.1
30<L>3¬MTI1:¬20¬1562828931@385¬S=0¬P
31
32
33APEX history consist of 2 types of files:
34- HLCH files
35- Procedure Archive (PA) files (what this ingester handles)
36
37The ingesters for those 2 types parse the input files, or rather interpret them
38to extract INSTANCE data and execution LOG data. In the PA files, the ingester extracts
39occurences of Steps, StepLists and Threads along with their attributes
40and put them into INSTANCES table, whereas States are stored in the execution log table.
41
42Other levels (Argument and Variables), go into execution LOG table.
43"""
44import os
45import re
46from builtins import Exception
47from collections import namedtuple
48import zipfile
49import logging
50from datetime import datetime
51from datetime import timedelta
52from operator import itemgetter
53
54from chart.alg import JobStatus, init_algorithm
55from chart.db import ts
56from chart.project import SID
57from chart.db import connection
58from chart.db.exceptions import DuplicateDataError
59#from chart.project import settings
60from chart import settings
61
62import chartepssg.alg.apex_common as common
63from chartepssg.alg.apex_common import ArchiveCursor
64from chartepssg.alg.common import jsondumps
65from chartepssg.alg.settings import scid_to_sid
66
67
68logger = logging.getLogger()
69
70
71db_conn = connection.db_connect()
72
73
74# filename:
75# MOF_OPE_PROC-ARC_MTI1 _20180518152359_20180516123000_20180517122959.zip
76FILENAME_MATCHER = re.compile(
77 r'(?P<scid>[a-zA-Z0-9]+)_'
78 r'[\w]+_PROC_HIST__'
79 r'G[0-9]+Z_'
80 r'S(?P<start>[0-9]+)Z_'
81 r'E(?P<stop>[0-9]+)Z'
82 r'.zip$')
83
84# Time decoder found in filename pattern
85TIME_DECODER = '%Y%m%d%H%M%S'
86
87
88def fileattr(filename):
89 """Given a `filename` return a tuple of SID, sensing start and end time.
90
91 They look like:
92 MOF_OPE_PROC-ARC_MTI1 _20180518152359_20180516123000_20180517122959.zip
93 """
94 match = FILENAME_MATCHER.match(filename.name)
95 if not match:
96 raise ValueError('File name {f} not recognised as a Procedure Archive product'.format(
97 f=filename.name))
98
99 groups = match.groupdict()
100
101 sid = None
102 try:
103 sid = scid_to_sid(groups['scid'])
104 except Exception as ex:
105 logger.warning(ex)
106
107 return (sid,
108 datetime.strptime(groups['start'], TIME_DECODER),
109 datetime.strptime(groups['stop'], TIME_DECODER))
110
111
112# If the `replace` ingestion option is set, refuse to delete existing data if the range of
113# the new file exceeds REPLACE_THRESHOLD
114# (otherwise a file with corrupt metadata might cause us to delete years of data)
115REPLACE_THRESHOLD = timedelta(days=2)
116
117# original latin encoding
118# NOT = '\xac'
119# new unicode encoding
120NOT = '\u00ac'
121
122L1_DECODER = re.compile(r'<L>1{inv}'
123 r'(?P<domain>[^:]+):?{inv}'
124 r'(?P<counter>[0-9]+){inv}'
125 r'(?P<apex_time>[0-9]+@[0-9]+){inv}'
126 r'(?P<attribute>[^=]+)='
127 r'(?P<value>[^{inv}]+){inv}'
128 r'(?P<step_identifier>.+)\r?$'.format(inv=NOT))
129
130L2_DECODER = re.compile(r'<L>2{inv}'
131 r'(?P<domain>[^:]+):?{inv}'
132 r'(?P<counter>[0-9]+){inv}'
133 r'(?P<apex_time>[0-9]+@[0-9]+){inv}'
134 r'(?P<state_value>[^{inv}]+){inv}'
135 r'(?P<step_list_identifier>.+)\r?$'.format(inv=NOT))
136
137L3_DECODER = re.compile(r'<L>3{inv}'
138 r'(?P<domain>[^:]+):?{inv}'
139 r'(?P<counter>[0-9]+){inv}'
140 r'(?P<apex_time>[0-9]+@[0-9]+){inv}'
141 r'(?P<attribute>[^=]+)='
142 r'(?P<value>[^{inv}]+){inv}'
143 r'(?P<thread_identifier>.+)\r?$'.format(inv=NOT))
144
145L4_DECODER = re.compile(r'<L>4{inv}'
146 r'(?P<domain>[^:]+):?{inv}'
147 r'(?P<counter>[0-9]+){inv}'
148 r'(?P<apex_time>[0-9]+@[0-9]+){inv}'
149 r'(?P<attribute>[^=]+){inv}'
150 r'(?P<value>[^{inv}]+)\r?$'.format(inv=NOT))
151
152L5_DECODER = re.compile(r'<L>5{inv}'
153 r'(?P<domain>[^:]+):?{inv}'
154 r'(?P<counter>[0-9]+){inv}'
155 r'(?P<apex_time>[0-9]+@[0-9]+){inv}'
156 r'(?P<attribute>[^=]+){inv}'
157 r'(?P<value>[^{inv}]+)\r?$'.format(inv=NOT))
158
159PAEvent = namedtuple('PAEvent',
160 'timestamp sid domain instance level counter attribute value identifier')
161
162
163class EventsFileParser:
164 """Parser class for each Instance file with the PA zip files."""
165
166 def __init__(self, parent, file_info):
167
168 self.eventsfile = parent.zipfile.open(file_info.filename, 'r')
169 lines = self.eventsfile.read().decode(common.UTF8).split('\n')
170
171 self.parent = parent
172 self.filename = file_info.filename
173 self.instance = os.path.splitext(file_info.filename)[0]
174 self.file_iterator = ArchiveCursor(lines, self.decode)
175 self.objects = {}
176 self.parameters = {}
177
178 # event handlers map
179 self.event_handlers_map = {}
180 self.event_handlers_map['STEP'] = self.on_step
181 self.event_handlers_map['STEPLIST'] = self.on_step_list
182 self.event_handlers_map['THREAD'] = self.on_thread
183 self.event_handlers_map['ARGUMENT'] = self.on_argument
184 self.event_handlers_map['VARIABLE'] = self.on_variable
185
186 self.proc_props = None
187
188
189 def get_proc_props(self, sid_num):
190 """Get procedure json properties."""
191 proc_props = dict()
192 if self.proc_props is None:
193 self.proc_props = common.get_proc_attributes(sid_num, self.instance)
194
195 if self.proc_props is None:
196 logger.warning('Failed to retrieve properties for parent Procedure '
197 'Instance {inst}: HL Instance may not be ingested yet!'.format(inst=self.instance))
198 #raise Exception(ex_msg)
199
200 if self.proc_props is not None:
201 proc_props = self.proc_props
202
203 return proc_props
204
205 def set_object_properties(self, pa_event):
206 """Set properties of an object in the execution log (Step, StepList or Thread)."""
207 id = pa_event.identifier
208 object_props = {}
209
210 # if exist, update end_time
211 if id in self.objects.keys():
212 object_props = self.objects[id]
213 object_props['END_TIME'] = pa_event.timestamp
214 else:
215 # get start time, and other event attribs
216 object_props['START_TIME'] = object_props['END_TIME'] = pa_event.timestamp
217 object_props['EVENT'] = pa_event
218
219 # get proc json properties
220 proc_props = self.get_proc_props(pa_event.sid.bind_insert()[0])
221
222 if 'SOURCE' not in proc_props.keys():
223 object_props['SOURCE'] = pa_event.instance
224 else:
225 object_props['SOURCE'] = proc_props['SOURCE']
226
227 if 'PARENT' not in proc_props.keys():
228 object_props['PARENT'] = pa_event.instance
229 else:
230 object_props['PARENT'] = proc_props['PARENT']
231
232 if 'IDENTIFIER' not in proc_props.keys():
233 object_props['PROCEDURE'] = pa_event.instance
234 else:
235 procedure = object_props['PROCEDURE'] = proc_props['IDENTIFIER']
236 try:
237 props = common.get_attributes_from_du(pa_event.level,
238 pa_event.identifier,
239 procedure)
240 if props is not None:
241 object_props['PROPERTIES'] = props
242 except Exception as ex:
243 logger.warning('Unable to retrieve {level} details for {id} '
244 'from procedure {procedure} DU.'.format(
245 level=pa_event.level,
246 id=pa_event.identifier,
247 procedure=procedure))
248
249 # if property, add it to the dict
250 if pa_event.attribute not in ['STATE']:
251 object_props[pa_event.attribute.upper()] = pa_event.value
252 elif pa_event.value != 'Expired':
253 object_props['STATE'] = pa_event.value
254
255 self.objects[id] = object_props
256
257 def set_parameter_properties(self, pa_event, param_type):
258 """Set properties for Variable or Argument."""
259 attribute = pa_event.attribute
260 sid_num = pa_event.sid.bind_insert()[0]
261 instance = pa_event.instance
262
263 param_props = {}
264
265 # get source
266 proc_props = self.get_proc_props(sid_num)
267
268 if 'SOURCE' not in proc_props.keys():
269 param_props['SOURCE'] = pa_event.instance
270 else:
271 param_props['SOURCE'] = proc_props['SOURCE']
272
273 if 'PARENT' not in proc_props.keys():
274 param_props['PARENT'] = pa_event.instance
275 else:
276 param_props['PARENT'] = proc_props['PARENT']
277
278 if 'IDENTIFIER' not in proc_props.keys():
279 param_props['PROCEDURE'] = pa_event.instance
280 else:
281 param_props['PROCEDURE'] = proc_props['IDENTIFIER']
282
283 # Get the Arguments or Variables dict
284 param_type_dict = {}
285 if param_type in proc_props.keys():
286 param_type_dict = proc_props[param_type]
287
288 # Get the specifc param name list of lists within the previous dict
289 param_name = pa_event.attribute.upper()
290 param_values = []
291 if param_name in param_type_dict.keys():
292 param_values = param_type_dict[param_name]
293
294 # append the newest value tuple
295 param_values.append([pa_event.value, pa_event.timestamp])
296 param_type_dict[param_name] = param_values
297 proc_props[param_type] = param_type_dict
298
299 common.update_instance(sid_num, instance, 'PROCEDURE', 'ATTRIBUTES', jsondumps(proc_props))
300 self.parameters[attribute] = param_props
301
302 def on_step(self, pa_event):
303 """Handle Step level line."""
304 self.set_object_properties(pa_event)
305
306 def on_step_list(self, pa_event):
307 """Handle StepList level line."""
308 self.set_object_properties(pa_event)
309
310 def on_thread(self, pa_event):
311 """Handle Thread level line."""
312 self.set_object_properties(pa_event)
313
314 def on_argument(self, pa_event):
315 """Handle Argument level line."""
316 self.set_parameter_properties(pa_event, 'ARGUMENTS')
317
318 def on_variable(self, pa_event):
319 """Handle Variable level line."""
320 self.set_parameter_properties(pa_event, 'VARIABLES')
321
322 def parse(self):
323 """Parse all PAEvents inside internal text file and classify them\
324 into instances or LOG entries."""
325 try:
326 while (True):
327 pa_event = self.file_iterator.next()
328
329 self.parent.sids.add(pa_event.sid)
330
331 handler_func = self.event_handlers_map[pa_event.level]
332 handler_func(pa_event)
333 # Add the event to the exec log anyway
334 self.add_event_log(pa_event)
335
336 # Finished parsing
337 except StopIteration:
338 logger.info('Finished parsing PA Events file: {name} with {lines} lines'
339 .format(name=self.filename, lines=self.file_iterator.index - 1))
340 self.parent.pa_objects[self.instance] = self.objects
341 self.parent.pa_parameters[self.instance] = self.parameters
342 # error parsing
343 except Exception as ex:
344 logger.warning('Issue found while parsing PA Events file - {name} at line {line}: {ex}'
345 .format(name=self.filename, line=self.file_iterator.index, ex=ex))
346
347 def decode(self, line):
348 """Decode each line in text file and return PAEvent namedtuple."""
349 pa_event = self.gen_L1(self.instance, line)
350 if pa_event is None:
351 pa_event = self.gen_L2(self.instance, line)
352 if pa_event is None:
353 pa_event = self.gen_L3(self.instance, line)
354 if pa_event is None:
355 pa_event = self.gen_L4(self.instance, line)
356 if pa_event is None:
357 pa_event = self.gen_L5(self.instance, line)
358
359 return pa_event
360
361 def add_event_log(self, pa_event):
362 """Add an execution event log to the LOG."""
363 self.parent.pa_events.append(pa_event)
364
365 def gen_L1(self, instance, line):
366 """Decode L1 level."""
367 l1_match = L1_DECODER.match(line)
368 timestamp = None
369 if l1_match is not None:
370 groups = l1_match.groupdict()
371
372 # decode
373 attr_code = groups['attribute']
374 if attr_code in common.MAP_L1_ATTR.keys():
375 attribute = common.MAP_L1_ATTR[attr_code]
376 else:
377 attribute = attr_code
378
379 val_code = groups['value']
380 if attr_code in ['S']:
381 value = common.MAP_L1_STATES[val_code]
382 else:
383 value = val_code
384
385 timestamp = common.apextime_to_utc(groups['apex_time'])
386 domain = groups['domain']
387
388 return PAEvent(level=common.MAP_LEVEL['<L>1'],
389 instance=instance,
390 sid=scid_to_sid(domain),
391 domain=domain,
392 counter=int(groups['counter']),
393 timestamp=timestamp,
394 attribute=attribute,
395 value=value,
396 identifier=groups['step_identifier'])
397
398 def gen_L2(self, instance, line):
399 """Decode L2 level."""
400 l2_match = L2_DECODER.match(line)
401 if l2_match is not None:
402 groups = l2_match.groupdict()
403 timestamp = common.apextime_to_utc(groups['apex_time'])
404
405 # decode
406 val_code = groups['state_value']
407 value = common.MAP_L2_STATES[val_code]
408 domain = groups['domain']
409
410 return PAEvent(level=common.MAP_LEVEL['<L>2'],
411 instance=instance,
412 sid=scid_to_sid(domain),
413 domain=domain,
414 counter=int(groups['counter']),
415 timestamp=timestamp,
416 attribute='STATE',
417 value=value,
418 identifier=groups['step_list_identifier'])
419
420 def gen_L3(self, instance, line):
421 """Decode L3 level."""
422 l3_match = L3_DECODER.match(line)
423 if l3_match is not None:
424 groups = l3_match.groupdict()
425 # decode
426 attr_code = groups['attribute']
427 if attr_code in common.MAP_L3_ATTR.keys():
428 attribute = common.MAP_L3_ATTR[attr_code]
429 else:
430 attribute = attr_code
431
432 val_code = groups['value']
433 if attr_code in ['S']:
434 value = common.MAP_L3_STATES[val_code]
435 else:
436 value = val_code
437
438 timestamp = common.apextime_to_utc(groups['apex_time'])
439 domain = groups['domain']
440
441 return PAEvent(level=common.MAP_LEVEL['<L>3'],
442 instance=instance,
443 sid=scid_to_sid(domain),
444 domain=domain,
445 counter=int(groups['counter']),
446 timestamp=timestamp,
447 attribute=attribute,
448 value=value,
449 identifier=groups['thread_identifier'])
450
451 def gen_L4(self, instance, line):
452 """Decode L4 level."""
453 l4_match = L4_DECODER.match(line)
454 if l4_match is not None:
455 groups = l4_match.groupdict()
456 timestamp = common.apextime_to_utc(groups['apex_time'])
457 domain = groups['domain']
458 return PAEvent(level=common.MAP_LEVEL['<L>4'],
459 instance=instance,
460 sid=scid_to_sid(domain),
461 domain=domain,
462 counter=int(groups['counter']),
463 timestamp=timestamp,
464 attribute=groups['attribute'],
465 value=groups['value'],
466 identifier=None)
467
468 def gen_L5(self, instance, line):
469 """Decode L5 level."""
470 l5_match = L5_DECODER.match(line)
471 if l5_match is not None:
472 groups = l5_match.groupdict()
473 timestamp = common.apextime_to_utc(groups['apex_time'])
474 domain = groups['domain']
475 return PAEvent(level=common.MAP_LEVEL['<L>5'],
476 instance=instance,
477 sid=scid_to_sid(domain),
478 domain=domain,
479 counter=int(groups['counter']),
480 timestamp=timestamp,
481 attribute=groups['attribute'],
482 value=groups['value'],
483 identifier=None)
484
485
486class ProcedureArchiveParser:
487 """Procedure Archive Parser Class for PA zip files."""
488
489 def __init__(self, filename):
490 # The document states we can extract metadata from the archive filename
491 # (source, environment, product name, scid, start, stop, generation times)
492 # however, the sample data uses a different filename convention for the time being,
493 # so we ignore the filename except to open the file
494 logger.info('Opening Procedure Archive {f}'.format(f=filename))
495 self.zipfile = zipfile.ZipFile(filename)
496 self.pa_events = []
497 self.pa_objects = {}
498 self.pa_parameters = {}
499 self.sids = set()
500
501 def parse(self):
502 """For simplicity, we just emit a series of generators which emit PAEvent objects."""
503 # If the client code ever needs to know the internal text file names we could emit
504 # instances of a new class instead
505 for member_info in self.zipfile.infolist():
506 if member_info.is_dir():
507 logger.debug('Skipping dir {n}'.format(n=member_info.filename))
508 continue
509
510 # logger.debug('Retrieving internal file {n}'.format(n=member_info.filename))
511 events_parser = EventsFileParser(self, member_info)
512 events_parser.parse()
513
514 obj_count = 0
515 for inst in self.pa_objects.values():
516 obj_count = obj_count + len(inst.items())
517 return obj_count, len(self.pa_events)
518
519 def get_interval(self):
520 """Get the start and stop time interval of the ingested PA file."""
521 events = self.pa_events
522 # get the min and max of timestamp to work out replace period
523 start = min(events, key=itemgetter(0)).timestamp
524 stop = max(events, key=itemgetter(0)).timestamp
525 return start, stop
526
527 def ingest(self, replace=True, force_replace=False):
528 """Ingest PA file into DB.
529
530 If `replace` is set we preemptively delete any existing rows in the file's given timerange.
531 As a precaution, the function will fail if `replace` is set and the file duration is
532 >1 hour, unless `force_replace` is set.
533 """
534 _, event_cnt = self.parse()
535
536 if event_cnt > 0:
537 start, stop = self.get_interval()
538
539
540 self.insert_events(start, stop, replace, force_replace)
541 self.insert_instances(start, stop, replace, force_replace)
542
543 return start, stop
544
545 else:
546 return None, None
547
548 def insert_events(self, start, stop, replace, force_replace):
549 """Ingest execution LOG events into APEX_LOG table."""
550 table = common.TBL_APEX_LOG
551
552 events = self.pa_events
553 if len(events) > 0 and replace:
554 duration = start - stop
555 if duration > REPLACE_THRESHOLD and not force_replace:
556 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
557
558 logger.debug('replacing {t} from {start} to {stop}'.format(
559 t=table.name, start=start, stop=stop))
560
561 # connection
562 sql = ('DELETE FROM {table} WHERE LEVEL != :level '
563 'AND SENSING_TIME >=:start AND SENSING_TIME <=:stop').format(table=table.name)
564 db_conn.execute(sql, level='PROCEDURE', start=start, stop=stop)
565 db_conn.commit()
566
567 cc = 0
568
569 ins_cur = ts.insert(table=table,
570 fields=['SENSING_TIME'] + SID.insert_fields +
571 ['INSTANCE', 'DOMAIN', 'LEVEL', 'COUNTER',
572 'PROCEDURE', 'SOURCE', 'PARENT',
573 'ATTRIBUTE', 'VALUE', 'IDENTIFIER'])
574
575 for pa in events:
576 if pa.level == 'ARGUMENT' or pa.level == 'VARIABLE':
577 inst_entities = self.pa_parameters[pa.instance]
578 entity = inst_entities[pa.attribute]
579 else:
580 inst_entities = self.pa_objects[pa.instance]
581 entity = inst_entities[pa.identifier]
582
583 ins_cur.execute(
584 None,
585 [pa.timestamp] + pa.sid.bind_insert() +
586 [pa.instance, pa.domain, pa.level, pa.counter,
587 entity['PROCEDURE'], entity['SOURCE'], entity['PARENT'],
588 pa.attribute, pa.value, pa.identifier])
589 cc += 1
590
591 logger.info('Ingested {cc} PA generated Log events'.format(cc=cc))
592
593 def insert_instances(self, start, stop, replace, force_replace):
594 """Ingest instances into apex_instances table."""
595 table = common.TBL_APEX_INSTANCES
596
597 if replace:
598 duration = stop - start
599
600 if duration > REPLACE_THRESHOLD and not force_replace:
601 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
602
603 # connection
604 sql = ('DELETE FROM {table} WHERE LEVEL != :level '
605 'AND SENSING_TIME >= :start AND SENSING_TIME <= :stop').format(table=table.name)
606 db_conn.execute(sql, level='PROCEDURE', start=start, stop=stop)
607 db_conn.commit()
608
609 # Fields
610 fields = ['SID_NUM', 'INSTANCE', 'DOMAIN', 'LEVEL',
611 'SENSING_TIME', 'END_TIME', 'IDENTIFIER',
612 'PROCEDURE', 'SOURCE', 'PARENT',
613 'ATTRIBUTES']
614 # Insert a series of instances from `source` into `table`.
615 rows = []
616 sid = None
617 for instance in self.pa_objects.keys():
618 inst_objects = self.pa_objects[instance]
619 for identifier in inst_objects.keys():
620 entity = inst_objects[identifier]
621 # Remove some redundant attribs from dict before converting it to JSON
622 pa_event = entity.pop('EVENT', None)
623 sid = pa_event.sid
624 start = entity.pop('START_TIME', None)
625 end = entity.pop('END_TIME', None)
626 source = entity.pop('SOURCE', None)
627 parent = entity.pop('PARENT', None)
628 procedure = entity.pop('PROCEDURE', None)
629
630 row = (sid.bind_insert()[0],
631 instance,
632 pa_event.domain,
633 pa_event.level,
634 start,
635 end,
636 identifier,
637 procedure,
638 source,
639 parent,
640 jsondumps(entity)
641 )
642 rows.append(row)
643
644 sql = "INSERT INTO {table} ({fs}) VALUES %s ".format(
645 table=table.name,
646 fs=','.join(fields), )
647
648 try:
649 db_conn.executemany(sql, rows)
650 except DuplicateDataError as ex:
651
652 db_conn.commit()
653
654 # Remove existing entry before inserting it
655 logger.debug('Duplicate(s) found {ex}. Deleting {sid} from {start} to {stop}'.
656 format(ex=ex, sid=sid, start=start, stop=stop))
657
658 # connection
659 sql = ('DELETE FROM {table} WHERE LEVEL != :level '
660 'AND SENSING_TIME >= :start AND SENSING_TIME <= :stop').format(table=table.name)
661 db_conn.execute(sql, level='PROCEDURE', start=start, stop=stop)
662 db_conn.commit()
663
664 logger.debug('Second insertion attempt ')
665 db_conn.executemany(sql, rows)
666
667 # commit ingestion
668 db_conn.commit()
669
670 cc = len(rows)
671 logger.info('Ingested {cc} PA generated Instance events'.format(cc=cc))
672
673
674def main():
675 """Command line entry point."""
676 wo, resultfile, _ = init_algorithm()
677
678 for job in wo.read_jobs():
679 ingester = ProcedureArchiveParser(job.filename)
680 start, stop = ingester.ingest(job.filename)
681 #sid, _, _ = fileattr(job.filename)
682
683 sids = ingester.sids
684
685 job.status = JobStatus.COMPLETED
686
687 #_, start, stop = fileattr(job.filename)
688 if start is not None or stop is not None:
689 start = start - common.APEX_EV_ING_MARGIN
690 stop = stop + common.APEX_EV_ING_MARGIN
691
692 tables=[{'table': common.TBL_APEX_LOG,
693 'sensing_start': start,
694 'sensing_stop': stop,
695 'sid': sid} for sid in sids
696 ]
697
698 # PKR Do not generate APEX Log and Instance Events, rather generate on fly as
699 # with TM, TC and EV
700 # resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
701 resultfile.add_job(job, JobStatus.COMPLETED)
702
703
704if __name__ == '__main__':
705 main()