1#!/usr/bin/env python3
  2
  3"""Ingester module for High level control History data, as shown below.
  4
  5ExeProcs:
  6<H>MSG3:Instrument::P4871_7139¬EX=Automatic¬2¬1525219320@853
  7<H>MSG3:Instrument::P4871_7139¬ST=1525219320@854¬3¬1525219320@854
  8<H>MSG3:Instrument::P4871_7139¬ID=SN_SV500¬4¬1525219320@854
  9<H>MSG3:Instrument::P4871_7139¬V=43-180¬5¬1525219320@854
 10<H>MSG3:Instrument::P4871_7139¬SC=7103.220418¬6¬1525219320@854
 11<H>MSG3:Instrument::P4871_7139¬T=Seviri Observ File¬7¬1525219320@854
 12<H>MSG3:Instrument::P4871_7139¬S=2¬8¬1525219320@854
 13<H>MSG3:Instrument::P4871_7139¬S=6¬9¬1525219320@854
 14<H>MSG3:Instrument::P4871_7139¬S=14¬68¬1525219325@720
 15<H>MSG3:Instrument::P4871_7139¬E=1525219325@720¬69¬1525219325@720
 16...
 17
 18APEX history consist of 2 types of files:
 19- HLCH files (what this ingester handles)
 20- Procedure Archive (PA) files
 21
 22The ingesters for those 2 types parse the input files, or
 23rather interpret them to extract ENTITY data and execution LOG data.
 24In HLCH files, Procedure initalisation info, end info and Interaction info
 25form procedure ENTITY table entries. Other info (mostly states), go into history LOG table.
 26
 27"""
 28
 29import re
 30from collections import namedtuple
 31from datetime import datetime
 32from datetime import timedelta
 33from operator import itemgetter
 34import logging
 35
 36from chart import settings
 37from chart.alg import JobStatus, init_algorithm
 38from chart.db import ts
 39from chart.db import connection
 40from chart.project import SID
 41from chart.db.exceptions import DuplicateDataError
 42
 43import chartepssg.alg.apex_common as common
 44from chartepssg.alg.apex_common import ArchiveCursor, EmptyArchive, expand_proc_property
 45from chartepssg.alg.common import jsondumps
 46from chartepssg.alg.settings import scid_to_sid
 47
 48# logger util
 49logger = logging.getLogger()
 50# connection instance
 51db_conn = connection.db_connect()
 52
 53# If the `replace` ingestion option is set, refuse to delete existing data if the range of
 54# the new file exceeds REPLACE_THRESHOLD
 55# (otherwise a file with corrupt metadata might cause us to delete years of data)
 56REPLACE_THRESHOLD = timedelta(days=2)
 57
 58
 59# Filename pattern matcher:
 60# MOF_OPE_HL-HIST_MTI1 _20180518152359_20180516123000_20180517122959.txt
 61FILENAME_MATCHER = re.compile(
 62    r'(?P<scid>[a-zA-Z0-9]+)_'
 63    r'[\w]+_PROC_HIST__'
 64    r'G[0-9]+Z_'
 65    r'S(?P<start>[0-9]+)Z_'
 66    r'E(?P<stop>[0-9]+)Z'
 67    r'.txt$')
 68
 69# Filename time decoder
 70TIME_DECODER = '%Y%m%d%H%M%S'
 71
 72
 73def fileattr(filename):
 74    """Given a `filename` return a tuple of SID, sensing start and end time.
 75
 76    They look like:
 77    MOF_OPE_HL-HIST_MTI1 _20180518152359_20180516123000_20180517122959.txt
 78
 79    """
 80    match = FILENAME_MATCHER.match(filename.name)
 81    if not match:
 82        raise ValueError('File name {f} not recognised as a High Level History product'.format(
 83            f=filename.name))
 84
 85    groups = match.groupdict()
 86
 87    sid = None
 88    try:
 89        sid = scid_to_sid(groups['scid'])
 90    except Exception as ex:
 91        logger.warning(ex)
 92
 93    return (sid,
 94            datetime.strptime(groups['start'], TIME_DECODER),
 95            datetime.strptime(groups['stop'], TIME_DECODER))
 96
 97
 98# Storage for commands
 99HLCH_EXEC = namedtuple('HLCH', 'timestamp sid domain instance attribute value counter')
100
101
102# Extract our required fields from a normal data line
103# \xac (negation sign) is used as a delimiter in this file format
104DECODER = re.compile(r'<H>'
105                     # r'(?P<sat>[\w]+):?'
106                     r'(?P<domain>[\w]+)::'
107                     r'(?P<instance>[^\xac]+)\xac'
108                     r'(?P<attribute>[^=]+)='
109                     r'(?P<value>[^\xac]*)\xac'
110                     r'(?P<counter>[0-9]+)\xac'
111                     r'(?P<apex_time>[0-9]+@[0-9]+)'
112                       )
113
114
115class HLCH_Ingester:
116    """Ingester class for HLCH files.
117
118    Class Handles parsing and insertion into DB for Procedure instances and LOG events.
119    """
120
121    def __init__(self, filename):
122
123        self.sids = set()
124        file = open(filename, 'r', encoding=common.UTF8)
125        self.filename = filename
126        self.file_iterator = ArchiveCursor(file.readlines(), self.decode)
127        self.proc_instances = {}
128        self.proc_execution = []
129
130        # define mappings of attrib to functions
131        self.func_map = {
132            self.is_proc_start_time : self.on_proc_start_time,
133            self.is_proc_init_attrib: self.on_proc_init_attrib,
134            self.is_proc_interaction: self.on_proc_interaction,
135            self.is_proc_end: self.on_proc_end,
136            self.is_proc_exec_mode: self.on_proc_exec_mode,
137            self.is_proc_suspended: self.on_proc_suspended,
138            self.is_property: self.on_property,
139            self.is_state: self.on_state,
140        }
141
142    def parse(self):
143        """Launch the parsing of the given file."""
144        try:
145            while (True):
146
147                # next() must be called here only
148                entry = self.file_iterator.next()
149                hlch_exec = self.translate(entry)
150                attr = entry["attribute"]
151
152                self.sids.add(hlch_exec.sid)
153
154                db, proc_props = self.get_proc_props(hlch_exec)
155
156                for is_func, on_func in self.func_map.items():
157                    if is_func(attr):
158                        on_func(hlch_exec, proc_props)
159
160                """
161                # if the entry was not handled, then not recognized
162                if not valid:
163                    logger.error('Encountered an unexpected attribute token in HLCH file: {}'
164                                .format(entry['attribute']))
165                    raise UnexpectedAttribute
166                """
167
168                self.update_proc_props(db, hlch_exec, proc_props)
169
170
171        except StopIteration:
172            entries = self.file_iterator.index - 1
173            if entries == 0:
174                raise EmptyArchive
175            else:
176                logger.info('Finished parsing HLCH file: {name} with {entries} entries'
177                        .format(name=self.filename.name, entries=entries))
178
179                # return the number of detected items
180                return len(self.proc_instances.keys()), len(self.proc_execution)
181
182    def decode(self, line):
183        """Decode HLCH line into a dict of HLCH attributes."""
184        match = DECODER.match(line)
185        if match:
186            return match.groupdict()
187        else:
188            return None
189
190    def translate(self, entry):
191        """Translate entry dict into HLCH_EXEC namedtuple."""
192        # sat = entry['sat']
193        domain = entry['domain']
194        instance = entry['instance']
195        attr_code = entry['attribute']
196        val_code = entry['value']
197        counter = int(entry['counter'])
198        apex_time = entry['apex_time']
199
200        sid = scid_to_sid(domain)
201        timestamp = common.apextime_to_utc(apex_time)
202
203        # decode attribute
204        if attr_code in common.MAP_H_ATTR:
205            attribute = common.MAP_H_ATTR[attr_code]
206        else:
207            attribute = attr_code
208
209        # decode value
210        if attr_code in ['ST', 'E']:
211            value = common.apextime_to_utc(val_code)
212        elif attr_code in ['S', 'I']:
213            value = common.MAP_H_STATES[val_code]
214        else:
215            value = val_code
216
217        hlch_exec = HLCH_EXEC(sid=sid,
218                         timestamp=timestamp,
219                         domain=domain,
220                         instance=instance,
221                         attribute=attribute,
222                         value=value,
223                         counter=counter)
224        self.proc_execution.append(hlch_exec)
225
226        return hlch_exec
227
228    def get_proc_props(self, hlch_exec):
229        """Update procedure json attributes."""
230        db, proc_props = self.fetch_proc_props(hlch_exec.sid.bind_insert()[0], hlch_exec.instance)
231
232        if proc_props is None:
233
234            proc_props = {}
235            proc_props['EARLIEST_START_TIME'] = hlch_exec.timestamp
236            proc_props['ARGUMENTS'] = {}
237            proc_props['VARIABLES'] = {}
238
239            # set the end time as the current timestamp for now
240            proc_props['END_TIME'] = hlch_exec.timestamp
241            proc_props['LATEST_END_TIME'] = hlch_exec.timestamp
242
243        proc_props['SID'] = hlch_exec.sid
244        proc_props['DOMAIN'] = hlch_exec.domain
245        proc_props['INSTANCE'] = hlch_exec.instance
246
247        # update/add the proc instance
248        self.proc_instances[hlch_exec.instance] = proc_props
249
250        return db, proc_props
251
252
253    def update_proc_props(self, db, hlch_exec, proc_props):
254        """Update proc props either in db or in dict"""
255        instance = hlch_exec.instance
256        sid_num = hlch_exec.sid.bind_insert()[0]
257
258        if db:
259            common.update_instance(sid_num, instance, 'PROCEDURE', 'ATTRIBUTES', jsondumps(proc_props))
260
261        self.proc_instances[instance] = proc_props
262
263
264    def is_proc_start_time(self, attr):
265        """Check if it is a proc start time token"""
266        return attr in ['ST']
267
268    def is_proc_init_attrib(self, attr):
269        """Check if it is a start proc attributes token"""
270        return attr in ['T', 'ID', 'V', 'SC']
271
272    def is_proc_exec_mode(self, attr):
273        """Check if we come across a change of procedure execution mode."""
274        # check if we come across EXECUTION_MODE, and see if we have already this instance
275        return attr in ['EX']
276
277    def is_state(self, attr):
278        """Check if we come across a change of procedure execution state."""
279        return attr in ['S']
280
281    def is_proc_interaction(self, attr):
282        """Check if we come across a procedure interaction."""
283        return attr in ['I']
284
285    def is_property(self, attr):
286        """Check if we come across a change of setting of attribute of procedure."""
287        return attr not in ['S', 'I', 'EX', 'ST', 'T', 'ID', 'V', 'SC', 'E']
288
289    def is_proc_end(self, attr):
290        """Check if we come across the end of procedure execution."""
291        return attr in ['E']
292
293    def is_proc_suspended(self, attr):
294        """Check if we come across the end of procedure execution."""
295        return attr in ['SP']
296
297    def check_system_closed(self):
298        """Check if we come across system closed state."""
299        entry = self.file_iterator.current()
300        if self.is_state(entry['attribute']):
301            # STATE value 17: 'System Closed'
302            return (entry['value'] in ['17'])
303
304        return False
305
306    def on_proc_start_time(self, hlch_exec, proc_props):
307        """When the start time token is detected"""
308        # it seems we have a new proc instance,
309        # if we had it already in the db, we should delete it
310        instance = hlch_exec.instance
311        sid_num = hlch_exec.sid.bind_insert()[0]
312        common.delete_proc_instance(sid_num, instance)
313
314        # then we proceed
315        proc_props['START_TIME'] = hlch_exec.value
316        start_time = min(hlch_exec.value, hlch_exec.timestamp)
317        earliest = proc_props.get('EARLIEST_START_TIME', '')
318        if isinstance(earliest, datetime):
319            start_time = min(start_time, earliest)
320
321        proc_props['EARLIEST_START_TIME'] = start_time
322
323        if proc_props.get('LATEST_END_TIME') is None:
324            proc_props['LATEST_END_TIME'] = hlch_exec.timestamp
325
326        hlch_exec_inst = HLCH_EXEC(sid=hlch_exec.sid,
327                         timestamp=proc_props['EARLIEST_START_TIME'],
328                         domain=hlch_exec.domain,
329                         instance=hlch_exec.instance,
330                         attribute='STATE',
331                         value='Initialising',
332                         counter=hlch_exec.counter)
333
334        self.proc_execution.append(hlch_exec_inst)
335
336
337    def on_proc_init_attrib(self, hlch_exec, proc_props):
338        """When pne of the start attributes tokens detected"""
339        proc_props[hlch_exec.attribute.upper()] = hlch_exec.value
340        # if SOURCE then try to find the source proc id
341        entry = self.file_iterator.current()
342        if entry["attribute"] in ['SC']:
343            # by default, parent is set to source id
344            proc_props['PARENT'] = hlch_exec.value
345            # lets fetch the source inst title
346            _, parent_props = self.fetch_proc_props(hlch_exec.sid.bind_insert()[0], hlch_exec.value)
347            if parent_props is not None:
348                proc_props['PARENT'] = parent_props.get('IDENTIFIER', 'Not found')
349
350    def on_proc_end(self, hlch_exec, proc_props):
351        """Handle the occurrence of Proc end."""
352        sid_num = hlch_exec.sid.bind_insert()[0]
353        instance = hlch_exec.instance
354        end_time = max(hlch_exec.value, hlch_exec.timestamp)
355
356        if proc_props is not None:
357            proc_props['END_TIME'] = hlch_exec.value
358            proc_props['LATEST_END_TIME'] = end_time
359        else:
360            proc_props = common.get_proc_attributes(sid_num, instance)
361            if proc_props is not None:
362                proc_props['END_TIME'] = hlch_exec.value
363                # update proc  instance with end_time in  DB
364                common.update_instance(sid_num, instance, 'PROCEDURE', 'END_TIME',
365                                       end_time)
366
367    def on_system_closed(self, hlch_exec):
368        """Handle the occurrence of System Closed State."""
369        sid_num = hlch_exec.sid.bind_insert()[0]
370        instance = hlch_exec.instance
371        end_time = hlch_exec.timestamp
372
373        if hlch_exec.instance in self.proc_instances.keys():
374            self.proc_instances[instance]['LATEST_END_TIME'] = end_time
375        else:
376            proc_props = common.get_proc_attributes(sid_num, instance)
377            if proc_props is not None:
378                # update proc  instance with end_time in  DB
379                common.update_instance(sid_num, instance, 'PROCEDURE', 'END_TIME',
380                                       end_time)
381
382    def fetch_proc_props(self, sid_num, instance):
383        """Get procedure instance json attributes."""
384        db = False
385        proc_props = None
386        if instance in self.proc_instances.keys():
387            proc_props = self.proc_instances[instance]
388        else:
389            proc_props = common.get_proc_attributes(sid_num, instance)
390            if proc_props is not None:
391                db = True
392
393        return db, proc_props
394
395    def on_proc_interaction(self, hlch_exec, proc_props):
396        """Handle the occurrence of Proc Interaction."""
397        expand_proc_property(proc_props, 'INTERACTIONS',
398                    hlch_exec.value, hlch_exec.timestamp)
399
400    def on_proc_exec_mode(self, hlch_exec, proc_props):
401        """Handle the occurrence of Proc execution mode change."""
402        expand_proc_property(proc_props, 'EXECUTION_MODE',
403                    hlch_exec.value, hlch_exec.timestamp)
404
405    def on_proc_suspended(self, hlch_exec, proc_props):
406        """Handle the occurrence of Proc execution mode change."""
407        expand_proc_property(proc_props, 'SUSPENSIONS',
408                    hlch_exec.value, hlch_exec.timestamp)
409
410    def on_state(self, hlch_exec, proc_props):
411        """Handle the occurrence of Proc state change."""
412        if hlch_exec.value != 'Expired':
413            proc_props['STATE'] = hlch_exec.value
414
415        if self.check_system_closed():
416            self.on_system_closed(hlch_exec)
417
418    def on_property(self, hlch_exec, proc_props):
419        """Handle the occurrence of Proc property setting."""
420        proc_props[hlch_exec.attribute.upper()] = hlch_exec.value
421
422    def delete_existing(self, table, earliest, latest, replace, force_replace):
423        """Delete existing records if they exist and delete is forced."""
424        if replace:
425            duration = latest - earliest
426            if duration > REPLACE_THRESHOLD and not force_replace:
427                raise ValueError('Refusing to delete replace {d}'.format(d=duration))
428
429            logger.debug('Deleting {table} from {start} to {stop}'.format(
430                table=table.name, start=earliest, stop=latest))
431            sql = ('DELETE FROM {table} WHERE LEVEL = :level '
432                   'AND SENSING_TIME >= :start AND SENSING_TIME <= :stop').format(
433                table=table.name)
434            db_conn.execute(sql, level='PROCEDURE', start=earliest, stop=latest)
435            db_conn.commit()
436
437    def insert_proc_instances(self):
438        """Insert procedure instance instances in apex_instances table."""
439        table = common.TBL_APEX_INSTANCES
440        # Fields
441        fields = ['SID_NUM', 'LEVEL', 'SENSING_TIME', 'END_TIME',
442                  'INSTANCE', 'DOMAIN', 'IDENTIFIER', 'PROCEDURE',
443                  'SOURCE', 'PARENT', 'ATTRIBUTES']
444
445        sql = ('INSERT INTO {tablename} ({fs}) VALUES  %s ').format(
446            tablename=table.name,
447            fs=','.join(fields), )
448
449        # Insert a series of instances from `source` into `table`.
450        cc = 0
451        rows = []
452        for instance in self.proc_instances:
453            hlch = self.proc_instances[instance]
454            # Remove some redundant attribs from dict before converting it to JSON
455            hlch.pop('INSTANCE', None)
456            sid = hlch.pop('SID', None)
457            sid_num = sid.bind_insert()[0]
458            start = hlch.pop('EARLIEST_START_TIME', hlch.get('START_TIME',
459                            hlch.get('START_TIME', hlch.get('END_TIME')))
460                            )
461            end = hlch.pop('LATEST_END_TIME', hlch.get('END_TIME', None))
462            domain = hlch['DOMAIN']
463
464            identifier = hlch.get('IDENTIFIER', 'Not found')
465            source = hlch.get('SOURCE', 'Not found')
466            parent = hlch.get('PARENT', 'Not found')
467
468            row = (sid_num,
469                   'PROCEDURE',
470                   start,
471                   end,
472                   instance,
473                   domain,
474                   identifier,
475                   identifier,
476                   source,
477                   parent,
478                   jsondumps(hlch)
479                   )
480            try:
481                # this is not efficient, but the only way to detect and remove duplicates
482                # with data that has JSON
483                db_conn.executemany(sql, [row])
484                db_conn.commit()
485
486            except DuplicateDataError as ex:
487
488                # Remove existing entry before inserting it
489                logger.warning('APEX Instance duplicate found {ex}'.format(ex=ex))
490                """
491                # delete
492                #db_conn.commit()
493                sql_del = ('DELETE FROM {table} WHERE sid_num = :sid_num AND SENSING_TIME = :timestamp '
494                        'AND instance = :instance AND level = :level '
495                        'AND identifier = :identifier').format(table=table.name)
496                db_conn.execute(sql_del, sid_num=sid_num, timestamp=start, level='PROCEDURE',
497                            instance=instance, identifier=identifier)
498                # decrement the entry that was deleted
499                cc -= 1
500
501                db_conn.executemany(sql, [row])
502                db_conn.commit()
503                """
504
505
506            # increment
507            cc += 1
508
509        logger.info('Ingested {cc} genereted HLCH Instance events'.format(cc=cc))
510        return cc
511
512    def insert_proc_executions(self):
513        """Insert Procedure execution log into APEX_LOG table."""
514        cc = 0
515
516        table = common.TBL_APEX_LOG
517        ins_cur = ts.insert(table=table,
518                            fields=['SENSING_TIME'] + SID.insert_fields +
519                                   ['DOMAIN', 'INSTANCE', 'LEVEL',
520                                    'IDENTIFIER', 'SOURCE', 'PROCEDURE',
521                                    'PARENT', 'ATTRIBUTE', 'VALUE', 'COUNTER'])
522
523        # then insert transitions
524        for hlch in self.proc_execution:
525            sid_num = hlch.sid.bind_insert()[0]
526            _, proc_props = self.fetch_proc_props(sid_num, hlch.instance)
527            if proc_props is not None:
528                identifier = proc_props.get('IDENTIFIER', 'Not found')
529                source = proc_props.get('SOURCE', 'Not found')
530                parent = proc_props.get('PARENT', 'Not found')
531
532                try:
533                    ins_cur.execute(None,
534                            [hlch.timestamp] + hlch.sid.bind_insert() +
535                            [hlch.domain, hlch.instance, 'PROCEDURE',
536                             identifier, source, identifier,
537                             parent, hlch.attribute, hlch.value, hlch.counter])
538                except DuplicateDataError as ex:
539
540                    logger.warning('APEX Log duplicate found {ex}'.format(ex=ex))
541                    """
542                    # delete
543                    db_conn.commit()
544                    sql_del = ('DELETE FROM {table} WHERE sid_num = :sid_num AND SENSING_TIME = :timestamp '
545                            'AND instance = :instance AND attribute = :attribute '
546                            'AND level = :level AND counter = :counter').format(table=table.name)
547                    db_conn.execute(sql_del, sid_num=sid_num, timestamp=hlch.timestamp, level='PROCEDURE',
548                                instance=hlch.instance, attribute=hlch.attribute,
549                                identifier=identifier, counter=hlch.counter)
550                    # commit transaction
551                    db_conn.commit()
552                    # decrement the entry that was deleted
553                    cc -= 1
554
555                    # try inserting again
556                    ins_cur.execute(None,
557                            [hlch.timestamp] + hlch.sid.bind_insert() +
558                            [hlch.domain, hlch.instance, 'PROCEDURE',
559                             identifier, source, identifier,
560                             parent, hlch.attribute, hlch.value, hlch.counter])
561                    """
562
563                cc += 1
564            else:
565                logger.warning('No previous recorded details for '
566                               'procedure instance {inst}'.format(inst=hlch.instance))
567
568        logger.info('Ingested {cc} HLCH generated Log events'.format(cc=cc))
569        return cc
570
571    def ingest(self, replace=True, force_replace=False):
572        """Ingest a HLCH file into DB.
573
574        If `replace` is set we preemptively delete any existing rows in the file's given timerange.
575        As a precaution, the function will fail if `replace` is set and the file duration is
576        >1 hour, unless `force_replace` is set.
577        """
578        try:
579            #_, start, stop = fileattr(self.filename)
580            #logger.info('File start time: {start}; File stop time: {stop}'.format(start=start, stop=stop))
581
582            # parse file contents
583            self.parse()
584
585            # get the min and max of timestamp to work out replace period
586            start, stop = self.get_interval()
587            logger.info('Ingesting HLCH events from {start} until {stop}'.format(
588                start=start, stop=stop))
589
590            # delete any existing records
591            self.delete_existing(common.TBL_APEX_LOG, start, stop, replace, force_replace)
592            self.delete_existing(common.TBL_APEX_INSTANCES, start, stop, replace, force_replace)
593
594            self.insert_proc_executions()
595            self.insert_proc_instances()
596
597            return start, stop
598        except EmptyArchive:
599            logger.warning('HLCH file: {name} has no entries.'
600                         .format(name=self.filename.name))
601            return None, None
602        except Exception as ex:
603            logger.error('Error while ingesting HLCH file: {name} at line {line}: {ex}.'
604                         .format(name=self.filename, line=self.file_iterator.index, ex=ex))
605            raise ex
606
607    def get_interval(self):
608        """Get the start and stop interval for the current ingestion."""
609        start = min(self.proc_execution, key=itemgetter(0)).timestamp
610        stop = max(self.proc_execution, key=itemgetter(0)).timestamp
611
612        return start, stop
613
614
615def main():
616    """Command line entry point."""
617    wo, resultfile, _ = init_algorithm()
618
619    for job in wo.read_jobs():
620        ingester = HLCH_Ingester(job.filename)
621        start, stop = ingester.ingest()
622        #sid, _, _ = fileattr(job.filename)
623        sids = ingester.sids
624
625        job.status = JobStatus.COMPLETED
626
627        if start is not None or stop is not None:
628
629            start = start - common.APEX_EV_ING_MARGIN
630            stop = stop + common.APEX_EV_ING_MARGIN
631
632        tables=[{'table': common.TBL_APEX_LOG,
633                'sensing_start': start,
634                'sensing_stop': stop,
635                'sid': sid} for sid in sids
636            ]
637
638        # PKR Do not generate APEX Log and Instance Events, rather generate on fly as
639        # with TM, TC and EV 
640        # resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
641        resultfile.add_job(job, JobStatus.COMPLETED)
642
643
644if __name__ == '__main__':
645    main()