1#!/usr/bin/env python3
2
3"""Ingester module for High level control History data, as shown below.
4
5ExeProcs:
6<H>MSG3:Instrument::P4871_7139¬EX=Automatic¬2¬1525219320@853
7<H>MSG3:Instrument::P4871_7139¬ST=1525219320@854¬3¬1525219320@854
8<H>MSG3:Instrument::P4871_7139¬ID=SN_SV500¬4¬1525219320@854
9<H>MSG3:Instrument::P4871_7139¬V=43-180¬5¬1525219320@854
10<H>MSG3:Instrument::P4871_7139¬SC=7103.220418¬6¬1525219320@854
11<H>MSG3:Instrument::P4871_7139¬T=Seviri Observ File¬7¬1525219320@854
12<H>MSG3:Instrument::P4871_7139¬S=2¬8¬1525219320@854
13<H>MSG3:Instrument::P4871_7139¬S=6¬9¬1525219320@854
14<H>MSG3:Instrument::P4871_7139¬S=14¬68¬1525219325@720
15<H>MSG3:Instrument::P4871_7139¬E=1525219325@720¬69¬1525219325@720
16...
17
18APEX history consist of 2 types of files:
19- HLCH files (what this ingester handles)
20- Procedure Archive (PA) files
21
22The ingesters for those 2 types parse the input files, or
23rather interpret them to extract ENTITY data and execution LOG data.
24In HLCH files, Procedure initalisation info, end info and Interaction info
25form procedure ENTITY table entries. Other info (mostly states), go into history LOG table.
26
27"""
28
29import re
30from collections import namedtuple
31from datetime import datetime
32from datetime import timedelta
33from operator import itemgetter
34import logging
35
36from chart import settings
37from chart.alg import JobStatus, init_algorithm
38from chart.db import ts
39from chart.db import connection
40from chart.project import SID
41from chart.db.exceptions import DuplicateDataError
42
43import chartepssg.alg.apex_common as common
44from chartepssg.alg.apex_common import ArchiveCursor, EmptyArchive, expand_proc_property
45from chartepssg.alg.common import jsondumps
46from chartepssg.alg.settings import scid_to_sid
47
48# logger util
49logger = logging.getLogger()
50# connection instance
51db_conn = connection.db_connect()
52
53# If the `replace` ingestion option is set, refuse to delete existing data if the range of
54# the new file exceeds REPLACE_THRESHOLD
55# (otherwise a file with corrupt metadata might cause us to delete years of data)
56REPLACE_THRESHOLD = timedelta(days=2)
57
58
59# Filename pattern matcher:
60# MOF_OPE_HL-HIST_MTI1 _20180518152359_20180516123000_20180517122959.txt
61FILENAME_MATCHER = re.compile(
62 r'(?P<scid>[a-zA-Z0-9]+)_'
63 r'[\w]+_PROC_HIST__'
64 r'G[0-9]+Z_'
65 r'S(?P<start>[0-9]+)Z_'
66 r'E(?P<stop>[0-9]+)Z'
67 r'.txt$')
68
69# Filename time decoder
70TIME_DECODER = '%Y%m%d%H%M%S'
71
72
73def fileattr(filename):
74 """Given a `filename` return a tuple of SID, sensing start and end time.
75
76 They look like:
77 MOF_OPE_HL-HIST_MTI1 _20180518152359_20180516123000_20180517122959.txt
78
79 """
80 match = FILENAME_MATCHER.match(filename.name)
81 if not match:
82 raise ValueError('File name {f} not recognised as a High Level History product'.format(
83 f=filename.name))
84
85 groups = match.groupdict()
86
87 sid = None
88 try:
89 sid = scid_to_sid(groups['scid'])
90 except Exception as ex:
91 logger.warning(ex)
92
93 return (sid,
94 datetime.strptime(groups['start'], TIME_DECODER),
95 datetime.strptime(groups['stop'], TIME_DECODER))
96
97
98# Storage for commands
99HLCH_EXEC = namedtuple('HLCH', 'timestamp sid domain instance attribute value counter')
100
101
102# Extract our required fields from a normal data line
103# \xac (negation sign) is used as a delimiter in this file format
104DECODER = re.compile(r'<H>'
105 # r'(?P<sat>[\w]+):?'
106 r'(?P<domain>[\w]+)::'
107 r'(?P<instance>[^\xac]+)\xac'
108 r'(?P<attribute>[^=]+)='
109 r'(?P<value>[^\xac]*)\xac'
110 r'(?P<counter>[0-9]+)\xac'
111 r'(?P<apex_time>[0-9]+@[0-9]+)'
112 )
113
114
115class HLCH_Ingester:
116 """Ingester class for HLCH files.
117
118 Class Handles parsing and insertion into DB for Procedure instances and LOG events.
119 """
120
121 def __init__(self, filename):
122
123 self.sids = set()
124 file = open(filename, 'r', encoding=common.UTF8)
125 self.filename = filename
126 self.file_iterator = ArchiveCursor(file.readlines(), self.decode)
127 self.proc_instances = {}
128 self.proc_execution = []
129
130 # define mappings of attrib to functions
131 self.func_map = {
132 self.is_proc_start_time : self.on_proc_start_time,
133 self.is_proc_init_attrib: self.on_proc_init_attrib,
134 self.is_proc_interaction: self.on_proc_interaction,
135 self.is_proc_end: self.on_proc_end,
136 self.is_proc_exec_mode: self.on_proc_exec_mode,
137 self.is_proc_suspended: self.on_proc_suspended,
138 self.is_property: self.on_property,
139 self.is_state: self.on_state,
140 }
141
142 def parse(self):
143 """Launch the parsing of the given file."""
144 try:
145 while (True):
146
147 # next() must be called here only
148 entry = self.file_iterator.next()
149 hlch_exec = self.translate(entry)
150 attr = entry["attribute"]
151
152 self.sids.add(hlch_exec.sid)
153
154 db, proc_props = self.get_proc_props(hlch_exec)
155
156 for is_func, on_func in self.func_map.items():
157 if is_func(attr):
158 on_func(hlch_exec, proc_props)
159
160 """
161 # if the entry was not handled, then not recognized
162 if not valid:
163 logger.error('Encountered an unexpected attribute token in HLCH file: {}'
164 .format(entry['attribute']))
165 raise UnexpectedAttribute
166 """
167
168 self.update_proc_props(db, hlch_exec, proc_props)
169
170
171 except StopIteration:
172 entries = self.file_iterator.index - 1
173 if entries == 0:
174 raise EmptyArchive
175 else:
176 logger.info('Finished parsing HLCH file: {name} with {entries} entries'
177 .format(name=self.filename.name, entries=entries))
178
179 # return the number of detected items
180 return len(self.proc_instances.keys()), len(self.proc_execution)
181
182 def decode(self, line):
183 """Decode HLCH line into a dict of HLCH attributes."""
184 match = DECODER.match(line)
185 if match:
186 return match.groupdict()
187 else:
188 return None
189
190 def translate(self, entry):
191 """Translate entry dict into HLCH_EXEC namedtuple."""
192 # sat = entry['sat']
193 domain = entry['domain']
194 instance = entry['instance']
195 attr_code = entry['attribute']
196 val_code = entry['value']
197 counter = int(entry['counter'])
198 apex_time = entry['apex_time']
199
200 sid = scid_to_sid(domain)
201 timestamp = common.apextime_to_utc(apex_time)
202
203 # decode attribute
204 if attr_code in common.MAP_H_ATTR:
205 attribute = common.MAP_H_ATTR[attr_code]
206 else:
207 attribute = attr_code
208
209 # decode value
210 if attr_code in ['ST', 'E']:
211 value = common.apextime_to_utc(val_code)
212 elif attr_code in ['S', 'I']:
213 value = common.MAP_H_STATES[val_code]
214 else:
215 value = val_code
216
217 hlch_exec = HLCH_EXEC(sid=sid,
218 timestamp=timestamp,
219 domain=domain,
220 instance=instance,
221 attribute=attribute,
222 value=value,
223 counter=counter)
224 self.proc_execution.append(hlch_exec)
225
226 return hlch_exec
227
228 def get_proc_props(self, hlch_exec):
229 """Update procedure json attributes."""
230 db, proc_props = self.fetch_proc_props(hlch_exec.sid.bind_insert()[0], hlch_exec.instance)
231
232 if proc_props is None:
233
234 proc_props = {}
235 proc_props['EARLIEST_START_TIME'] = hlch_exec.timestamp
236 proc_props['ARGUMENTS'] = {}
237 proc_props['VARIABLES'] = {}
238
239 # set the end time as the current timestamp for now
240 proc_props['END_TIME'] = hlch_exec.timestamp
241 proc_props['LATEST_END_TIME'] = hlch_exec.timestamp
242
243 proc_props['SID'] = hlch_exec.sid
244 proc_props['DOMAIN'] = hlch_exec.domain
245 proc_props['INSTANCE'] = hlch_exec.instance
246
247 # update/add the proc instance
248 self.proc_instances[hlch_exec.instance] = proc_props
249
250 return db, proc_props
251
252
253 def update_proc_props(self, db, hlch_exec, proc_props):
254 """Update proc props either in db or in dict"""
255 instance = hlch_exec.instance
256 sid_num = hlch_exec.sid.bind_insert()[0]
257
258 if db:
259 common.update_instance(sid_num, instance, 'PROCEDURE', 'ATTRIBUTES', jsondumps(proc_props))
260
261 self.proc_instances[instance] = proc_props
262
263
264 def is_proc_start_time(self, attr):
265 """Check if it is a proc start time token"""
266 return attr in ['ST']
267
268 def is_proc_init_attrib(self, attr):
269 """Check if it is a start proc attributes token"""
270 return attr in ['T', 'ID', 'V', 'SC']
271
272 def is_proc_exec_mode(self, attr):
273 """Check if we come across a change of procedure execution mode."""
274 # check if we come across EXECUTION_MODE, and see if we have already this instance
275 return attr in ['EX']
276
277 def is_state(self, attr):
278 """Check if we come across a change of procedure execution state."""
279 return attr in ['S']
280
281 def is_proc_interaction(self, attr):
282 """Check if we come across a procedure interaction."""
283 return attr in ['I']
284
285 def is_property(self, attr):
286 """Check if we come across a change of setting of attribute of procedure."""
287 return attr not in ['S', 'I', 'EX', 'ST', 'T', 'ID', 'V', 'SC', 'E']
288
289 def is_proc_end(self, attr):
290 """Check if we come across the end of procedure execution."""
291 return attr in ['E']
292
293 def is_proc_suspended(self, attr):
294 """Check if we come across the end of procedure execution."""
295 return attr in ['SP']
296
297 def check_system_closed(self):
298 """Check if we come across system closed state."""
299 entry = self.file_iterator.current()
300 if self.is_state(entry['attribute']):
301 # STATE value 17: 'System Closed'
302 return (entry['value'] in ['17'])
303
304 return False
305
306 def on_proc_start_time(self, hlch_exec, proc_props):
307 """When the start time token is detected"""
308 # it seems we have a new proc instance,
309 # if we had it already in the db, we should delete it
310 instance = hlch_exec.instance
311 sid_num = hlch_exec.sid.bind_insert()[0]
312 common.delete_proc_instance(sid_num, instance)
313
314 # then we proceed
315 proc_props['START_TIME'] = hlch_exec.value
316 start_time = min(hlch_exec.value, hlch_exec.timestamp)
317 earliest = proc_props.get('EARLIEST_START_TIME', '')
318 if isinstance(earliest, datetime):
319 start_time = min(start_time, earliest)
320
321 proc_props['EARLIEST_START_TIME'] = start_time
322
323 if proc_props.get('LATEST_END_TIME') is None:
324 proc_props['LATEST_END_TIME'] = hlch_exec.timestamp
325
326 hlch_exec_inst = HLCH_EXEC(sid=hlch_exec.sid,
327 timestamp=proc_props['EARLIEST_START_TIME'],
328 domain=hlch_exec.domain,
329 instance=hlch_exec.instance,
330 attribute='STATE',
331 value='Initialising',
332 counter=hlch_exec.counter)
333
334 self.proc_execution.append(hlch_exec_inst)
335
336
337 def on_proc_init_attrib(self, hlch_exec, proc_props):
338 """When pne of the start attributes tokens detected"""
339 proc_props[hlch_exec.attribute.upper()] = hlch_exec.value
340 # if SOURCE then try to find the source proc id
341 entry = self.file_iterator.current()
342 if entry["attribute"] in ['SC']:
343 # by default, parent is set to source id
344 proc_props['PARENT'] = hlch_exec.value
345 # lets fetch the source inst title
346 _, parent_props = self.fetch_proc_props(hlch_exec.sid.bind_insert()[0], hlch_exec.value)
347 if parent_props is not None:
348 proc_props['PARENT'] = parent_props.get('IDENTIFIER', 'Not found')
349
350 def on_proc_end(self, hlch_exec, proc_props):
351 """Handle the occurrence of Proc end."""
352 sid_num = hlch_exec.sid.bind_insert()[0]
353 instance = hlch_exec.instance
354 end_time = max(hlch_exec.value, hlch_exec.timestamp)
355
356 if proc_props is not None:
357 proc_props['END_TIME'] = hlch_exec.value
358 proc_props['LATEST_END_TIME'] = end_time
359 else:
360 proc_props = common.get_proc_attributes(sid_num, instance)
361 if proc_props is not None:
362 proc_props['END_TIME'] = hlch_exec.value
363 # update proc instance with end_time in DB
364 common.update_instance(sid_num, instance, 'PROCEDURE', 'END_TIME',
365 end_time)
366
367 def on_system_closed(self, hlch_exec):
368 """Handle the occurrence of System Closed State."""
369 sid_num = hlch_exec.sid.bind_insert()[0]
370 instance = hlch_exec.instance
371 end_time = hlch_exec.timestamp
372
373 if hlch_exec.instance in self.proc_instances.keys():
374 self.proc_instances[instance]['LATEST_END_TIME'] = end_time
375 else:
376 proc_props = common.get_proc_attributes(sid_num, instance)
377 if proc_props is not None:
378 # update proc instance with end_time in DB
379 common.update_instance(sid_num, instance, 'PROCEDURE', 'END_TIME',
380 end_time)
381
382 def fetch_proc_props(self, sid_num, instance):
383 """Get procedure instance json attributes."""
384 db = False
385 proc_props = None
386 if instance in self.proc_instances.keys():
387 proc_props = self.proc_instances[instance]
388 else:
389 proc_props = common.get_proc_attributes(sid_num, instance)
390 if proc_props is not None:
391 db = True
392
393 return db, proc_props
394
395 def on_proc_interaction(self, hlch_exec, proc_props):
396 """Handle the occurrence of Proc Interaction."""
397 expand_proc_property(proc_props, 'INTERACTIONS',
398 hlch_exec.value, hlch_exec.timestamp)
399
400 def on_proc_exec_mode(self, hlch_exec, proc_props):
401 """Handle the occurrence of Proc execution mode change."""
402 expand_proc_property(proc_props, 'EXECUTION_MODE',
403 hlch_exec.value, hlch_exec.timestamp)
404
405 def on_proc_suspended(self, hlch_exec, proc_props):
406 """Handle the occurrence of Proc execution mode change."""
407 expand_proc_property(proc_props, 'SUSPENSIONS',
408 hlch_exec.value, hlch_exec.timestamp)
409
410 def on_state(self, hlch_exec, proc_props):
411 """Handle the occurrence of Proc state change."""
412 if hlch_exec.value != 'Expired':
413 proc_props['STATE'] = hlch_exec.value
414
415 if self.check_system_closed():
416 self.on_system_closed(hlch_exec)
417
418 def on_property(self, hlch_exec, proc_props):
419 """Handle the occurrence of Proc property setting."""
420 proc_props[hlch_exec.attribute.upper()] = hlch_exec.value
421
422 def delete_existing(self, table, earliest, latest, replace, force_replace):
423 """Delete existing records if they exist and delete is forced."""
424 if replace:
425 duration = latest - earliest
426 if duration > REPLACE_THRESHOLD and not force_replace:
427 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
428
429 logger.debug('Deleting {table} from {start} to {stop}'.format(
430 table=table.name, start=earliest, stop=latest))
431 sql = ('DELETE FROM {table} WHERE LEVEL = :level '
432 'AND SENSING_TIME >= :start AND SENSING_TIME <= :stop').format(
433 table=table.name)
434 db_conn.execute(sql, level='PROCEDURE', start=earliest, stop=latest)
435 db_conn.commit()
436
437 def insert_proc_instances(self):
438 """Insert procedure instance instances in apex_instances table."""
439 table = common.TBL_APEX_INSTANCES
440 # Fields
441 fields = ['SID_NUM', 'LEVEL', 'SENSING_TIME', 'END_TIME',
442 'INSTANCE', 'DOMAIN', 'IDENTIFIER', 'PROCEDURE',
443 'SOURCE', 'PARENT', 'ATTRIBUTES']
444
445 sql = ('INSERT INTO {tablename} ({fs}) VALUES %s ').format(
446 tablename=table.name,
447 fs=','.join(fields), )
448
449 # Insert a series of instances from `source` into `table`.
450 cc = 0
451 rows = []
452 for instance in self.proc_instances:
453 hlch = self.proc_instances[instance]
454 # Remove some redundant attribs from dict before converting it to JSON
455 hlch.pop('INSTANCE', None)
456 sid = hlch.pop('SID', None)
457 sid_num = sid.bind_insert()[0]
458 start = hlch.pop('EARLIEST_START_TIME', hlch.get('START_TIME',
459 hlch.get('START_TIME', hlch.get('END_TIME')))
460 )
461 end = hlch.pop('LATEST_END_TIME', hlch.get('END_TIME', None))
462 domain = hlch['DOMAIN']
463
464 identifier = hlch.get('IDENTIFIER', 'Not found')
465 source = hlch.get('SOURCE', 'Not found')
466 parent = hlch.get('PARENT', 'Not found')
467
468 row = (sid_num,
469 'PROCEDURE',
470 start,
471 end,
472 instance,
473 domain,
474 identifier,
475 identifier,
476 source,
477 parent,
478 jsondumps(hlch)
479 )
480 try:
481 # this is not efficient, but the only way to detect and remove duplicates
482 # with data that has JSON
483 db_conn.executemany(sql, [row])
484 db_conn.commit()
485
486 except DuplicateDataError as ex:
487
488 # Remove existing entry before inserting it
489 logger.warning('APEX Instance duplicate found {ex}'.format(ex=ex))
490 """
491 # delete
492 #db_conn.commit()
493 sql_del = ('DELETE FROM {table} WHERE sid_num = :sid_num AND SENSING_TIME = :timestamp '
494 'AND instance = :instance AND level = :level '
495 'AND identifier = :identifier').format(table=table.name)
496 db_conn.execute(sql_del, sid_num=sid_num, timestamp=start, level='PROCEDURE',
497 instance=instance, identifier=identifier)
498 # decrement the entry that was deleted
499 cc -= 1
500
501 db_conn.executemany(sql, [row])
502 db_conn.commit()
503 """
504
505
506 # increment
507 cc += 1
508
509 logger.info('Ingested {cc} genereted HLCH Instance events'.format(cc=cc))
510 return cc
511
512 def insert_proc_executions(self):
513 """Insert Procedure execution log into APEX_LOG table."""
514 cc = 0
515
516 table = common.TBL_APEX_LOG
517 ins_cur = ts.insert(table=table,
518 fields=['SENSING_TIME'] + SID.insert_fields +
519 ['DOMAIN', 'INSTANCE', 'LEVEL',
520 'IDENTIFIER', 'SOURCE', 'PROCEDURE',
521 'PARENT', 'ATTRIBUTE', 'VALUE', 'COUNTER'])
522
523 # then insert transitions
524 for hlch in self.proc_execution:
525 sid_num = hlch.sid.bind_insert()[0]
526 _, proc_props = self.fetch_proc_props(sid_num, hlch.instance)
527 if proc_props is not None:
528 identifier = proc_props.get('IDENTIFIER', 'Not found')
529 source = proc_props.get('SOURCE', 'Not found')
530 parent = proc_props.get('PARENT', 'Not found')
531
532 try:
533 ins_cur.execute(None,
534 [hlch.timestamp] + hlch.sid.bind_insert() +
535 [hlch.domain, hlch.instance, 'PROCEDURE',
536 identifier, source, identifier,
537 parent, hlch.attribute, hlch.value, hlch.counter])
538 except DuplicateDataError as ex:
539
540 logger.warning('APEX Log duplicate found {ex}'.format(ex=ex))
541 """
542 # delete
543 db_conn.commit()
544 sql_del = ('DELETE FROM {table} WHERE sid_num = :sid_num AND SENSING_TIME = :timestamp '
545 'AND instance = :instance AND attribute = :attribute '
546 'AND level = :level AND counter = :counter').format(table=table.name)
547 db_conn.execute(sql_del, sid_num=sid_num, timestamp=hlch.timestamp, level='PROCEDURE',
548 instance=hlch.instance, attribute=hlch.attribute,
549 identifier=identifier, counter=hlch.counter)
550 # commit transaction
551 db_conn.commit()
552 # decrement the entry that was deleted
553 cc -= 1
554
555 # try inserting again
556 ins_cur.execute(None,
557 [hlch.timestamp] + hlch.sid.bind_insert() +
558 [hlch.domain, hlch.instance, 'PROCEDURE',
559 identifier, source, identifier,
560 parent, hlch.attribute, hlch.value, hlch.counter])
561 """
562
563 cc += 1
564 else:
565 logger.warning('No previous recorded details for '
566 'procedure instance {inst}'.format(inst=hlch.instance))
567
568 logger.info('Ingested {cc} HLCH generated Log events'.format(cc=cc))
569 return cc
570
571 def ingest(self, replace=True, force_replace=False):
572 """Ingest a HLCH file into DB.
573
574 If `replace` is set we preemptively delete any existing rows in the file's given timerange.
575 As a precaution, the function will fail if `replace` is set and the file duration is
576 >1 hour, unless `force_replace` is set.
577 """
578 try:
579 #_, start, stop = fileattr(self.filename)
580 #logger.info('File start time: {start}; File stop time: {stop}'.format(start=start, stop=stop))
581
582 # parse file contents
583 self.parse()
584
585 # get the min and max of timestamp to work out replace period
586 start, stop = self.get_interval()
587 logger.info('Ingesting HLCH events from {start} until {stop}'.format(
588 start=start, stop=stop))
589
590 # delete any existing records
591 self.delete_existing(common.TBL_APEX_LOG, start, stop, replace, force_replace)
592 self.delete_existing(common.TBL_APEX_INSTANCES, start, stop, replace, force_replace)
593
594 self.insert_proc_executions()
595 self.insert_proc_instances()
596
597 return start, stop
598 except EmptyArchive:
599 logger.warning('HLCH file: {name} has no entries.'
600 .format(name=self.filename.name))
601 return None, None
602 except Exception as ex:
603 logger.error('Error while ingesting HLCH file: {name} at line {line}: {ex}.'
604 .format(name=self.filename, line=self.file_iterator.index, ex=ex))
605 raise ex
606
607 def get_interval(self):
608 """Get the start and stop interval for the current ingestion."""
609 start = min(self.proc_execution, key=itemgetter(0)).timestamp
610 stop = max(self.proc_execution, key=itemgetter(0)).timestamp
611
612 return start, stop
613
614
615def main():
616 """Command line entry point."""
617 wo, resultfile, _ = init_algorithm()
618
619 for job in wo.read_jobs():
620 ingester = HLCH_Ingester(job.filename)
621 start, stop = ingester.ingest()
622 #sid, _, _ = fileattr(job.filename)
623 sids = ingester.sids
624
625 job.status = JobStatus.COMPLETED
626
627 if start is not None or stop is not None:
628
629 start = start - common.APEX_EV_ING_MARGIN
630 stop = stop + common.APEX_EV_ING_MARGIN
631
632 tables=[{'table': common.TBL_APEX_LOG,
633 'sensing_start': start,
634 'sensing_stop': stop,
635 'sid': sid} for sid in sids
636 ]
637
638 # PKR Do not generate APEX Log and Instance Events, rather generate on fly as
639 # with TM, TC and EV
640 # resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
641 resultfile.add_job(job, JobStatus.COMPLETED)
642
643
644if __name__ == '__main__':
645 main()