1#!/usr/bin/env python3
  2
  3"""Ingester module for the MOC Archive Data Report file type.
  4This is an ASCII file which contains a single report data as shown in this sample below:
  5
  6ID: TKMAtaskManager::VMCOOAS_22254
  7Category: Software
  8Domain: 0
  9Event ID: TKMAtaskManager::VMCOOAS_22254
 10Event Type: Software
 11Generation Time: 2023-11-27T15:41:26.417273
 12Source: E2VALOASSV03
 13Application: TKMAtaskManager
 14Severity: Error
 15Type: Software
 16Workstation: E2VALOASSV03
 17Message: IPCtcpChan::sendMsg: In writev errno=104
 18"""
 19
 20import logging
 21import os
 22import re
 23import zipfile
 24from builtins import Exception
 25from collections import namedtuple
 26from datetime import datetime
 27
 28from chart.project import SID
 29from chart.alg import init_algorithm
 30from chart.events.event import Event
 31from chart.alg import JobStatus
 32from chart.db import ts
 33from chart.db.model.table import TableInfo
 34from chartepssg.alg.settings import scid_to_sid
 35from chart.db.exceptions import DuplicateDataError
 36
 37
 38logger = logging.getLogger()
 39
 40# Event Classname
 41EVENT_CLASS = 'MOC-ARCHIVE-EVENTS'
 42
 43# Time decoder
 44TIME_DECODER = '%Y%m%d%H%M%S'
 45
 46# Number of lines in a single MOC Archive Data Report record
 47NUM_LINES_RECORD = 13
 48
 49# End of File reached
 50END_OF_FILE = -1
 51
 52# store each manoeuvre data line
 53Report = namedtuple('Report',  'msg_id '
 54                                'category '
 55                                'domain '
 56                                'event_id '
 57                                'event_type '
 58                                'gen_time '
 59                                'source '
 60                                'application '
 61                                'severity '
 62                                'msg_type '
 63                                'workstation '
 64                                'message')
 65
 66# filename example:
 67# SGxx_LOG_MOC__VAL_MON_HIST___G20231130135424Z_S20231130090006Z_E20231130160005Z.zip
 68FILENAME_MATCHER_ZIP = re.compile(
 69    r'^(?P<scid>[a-zA-Z0-9]+)_'
 70    r'[\w]+_MON_HIST___'
 71    r'G[0-9]+Z_'
 72    r'S(?P<start>[0-9]+)Z_'
 73    r'E(?P<stop>[0-9]+)Z'
 74    r'.zip$'
 75)
 76FILENAME_MATCHER_TXT = re.compile(
 77    r'^(?P<scid>[a-zA-Z0-9]+)_'
 78    r'[\w]+_MON_HIST___'
 79    r'G[0-9]+Z_'
 80    r'S(?P<start>[0-9]+)Z_'
 81    r'E(?P<stop>[0-9]+)Z'
 82    r'.txt$'
 83)
 84
 85
 86def fileattr(filename):
 87    """Given a `filename` return a tuple of SID, sensing start time.
 88
 89    They look like:
 90    SGxx_LOG_MOC__eee_MON_HIST___GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.zip
 91
 92    where:    eee = OPE | VAL | TST """
 93
 94    if filename.suffix == '.zip':
 95        match = FILENAME_MATCHER_ZIP.match(filename.name)
 96    else:
 97        match = FILENAME_MATCHER_TXT.match(filename.name)
 98
 99    if not match:
100        raise ValueError('File name {f} not recognised as MOC Archive Report product'.format(
101            f=filename))
102
103    groups = match.groupdict()
104
105    return (
106            scid_to_sid(groups["scid"]),
107            datetime.strptime(groups['start'], TIME_DECODER),
108            datetime.strptime(groups['stop'], TIME_DECODER))
109
110
111DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
112
113
114def first_token(line):
115    """Extracts the first string token of a `line` within a text file
116    """
117
118    if not isinstance(line, str):
119        line = line.decode('utf-8')
120
121    line = line.strip().replace('\r', '').replace('\n', '')
122    tokens = line.split(': ')
123    return tokens[1]
124
125
126def parse_report(report, input_file):
127    """Return a Satellite Mass Report data from `filename`.
128    """
129
130    # get event data parameters
131    msg_id = first_token(report[0])
132    category = first_token(report[1])
133    domain = int(first_token(report[2]))
134    event_id = first_token(report[3])
135    event_type = first_token(report[4])
136    # generation timestamp
137    gen_time = datetime.strptime(first_token(report[5]), DATE_FORMAT)
138    source = first_token(report[6])
139    application = first_token(report[7])
140    severity = first_token(report[8])
141    msg_type = first_token(report[9])
142    workstation = first_token(report[10])
143    message = first_token(report[11])
144    end_of_rec = report[12]
145
146    while len(end_of_rec) > 1:
147        # message extends over several lines
148        message = message + end_of_rec
149        end_of_rec = next(input_file)
150
151    res = Report(msg_id = msg_id, category = category, domain = domain,
152                     event_id = event_id, event_type = event_type, gen_time = gen_time,
153                     source = source, application = application, severity = severity,
154                     msg_type = msg_type, workstation = workstation, message = message)
155
156    # logger.info('Parsed item: {item}'.format(item=res))
157
158    return res
159
160
161def ingest_moc_events(filename,
162                      table=TableInfo('MOC_ARCHIVE_EVENTS')):
163    """Insert Manoeuvre History objects from `source` into `table`."""
164    sid, start_time, stop_time = fileattr(filename)
165
166    # setup insert cursor
167    ins_cur = ts.insert(table=table,
168                        fields=['SENSING_TIME'] +\
169                        SID.insert_fields +\
170                        ['MSG_ID',
171                         'CATEGORY',
172                         'DOMAIN',
173                         'EVENT_ID',
174                         'EVENT_TYPE',
175                         'GEN_TIME',
176                         'SOURCE',
177                         'APPLICATION',
178                         'SEVERITY',
179                         'MSG_TYPE',
180                         'WORKSTATION',
181                         'MESSAGE']) 
182
183    if filename.suffix == '.zip':
184        # Open zip file
185        with zipfile.ZipFile(str(filename)) as zip_file:
186            # Go through zip contents and get only the DBL file within the zip
187            for member in zip_file.infolist():
188                if member.filename.endswith('.txt'):
189                    # Read contents of txt file
190                    filename = member.filename
191
192    cc = 0
193    first = start_time
194    last = stop_time
195    report = 0
196
197    if filename.suffix == '.txt':
198        with open(filename) as input_file:
199            while report != END_OF_FILE:
200                try:
201                    # read file, record by record
202                    report = parse_report([next(input_file) for _ in range(NUM_LINES_RECORD)], input_file)
203                    first = min(first, report.gen_time)
204                    last = max(last, report.gen_time)
205
206                    try:
207                        ins_cur.execute(None,
208                                        [report.gen_time] +\
209                                        sid.bind_insert() +\
210                                        [report.msg_id,
211                                         report.category,
212                                         report.domain,
213                                         report.event_id,
214                                         report.event_type,
215                                         report.gen_time,
216                                         report.source,
217                                         report.application,
218                                         report.severity,
219                                         report.msg_type,
220                                         report.workstation,
221                                         report.message])
222
223                    except DuplicateDataError:
224                        ins_cur.rollback()
225                        # this is a guess but it's the most likely reason for a fail
226                        # it might be slightly faster to cache UIDs in code and remove duplicates within
227                        # each file first
228                        logger.info('Duplicate entry found on {time}: removing old entry'.format(time=report.gen_time))
229                        ts.delete(sid=sid,
230                                table=table,
231                                sensing_time=report.gen_time,
232                                inclusive=True)
233
234                        ins_cur.execute(None,
235                                    [report.gen_time] +\
236                                    sid.bind_insert() +\
237                                    [report.msg_id,
238                                     report.category,
239                                     report.domain,
240                                     report.event_id,
241                                     report.event_type,
242                                     report.gen_time,
243                                     report.source,
244                                     report.application,
245                                     report.severity,
246                                     report.msg_type,
247                                     report.workstation,
248                                     report.message])
249
250                    cc += 1
251
252                except StopIteration:
253                    # end of file reached
254                    logger.info('Ingested {n} MOC Archive Reports as Events from {start} to {end}'.format(
255                            n=cc, start=first, end=last))
256                    ins_cur.commit()
257                    return END_OF_FILE 
258
259
260def main():
261    """Command line entry point"""
262    wo, resultfile, _ = init_algorithm()
263
264    for job in wo.read_jobs():
265        ingest_moc_events(job.filename)
266        job.status = JobStatus.COMPLETED
267        resultfile.add_job(job)
268
269    logger.info('MOC Archive Events file ingestion complete')
270
271if __name__ == '__main__':
272    main()