1#!/usr/bin/env python3
2
3"""Ingester module for the MOC Archive Data Report file type.
4This is an ASCII file which contains a single report data as shown in this sample below:
5
6ID: TKMAtaskManager::VMCOOAS_22254
7Category: Software
8Domain: 0
9Event ID: TKMAtaskManager::VMCOOAS_22254
10Event Type: Software
11Generation Time: 2023-11-27T15:41:26.417273
12Source: E2VALOASSV03
13Application: TKMAtaskManager
14Severity: Error
15Type: Software
16Workstation: E2VALOASSV03
17Message: IPCtcpChan::sendMsg: In writev errno=104
18"""
19
20import logging
21import os
22import re
23import zipfile
24from builtins import Exception
25from collections import namedtuple
26from datetime import datetime
27
28from chart.project import SID
29from chart.alg import init_algorithm
30from chart.events.event import Event
31from chart.alg import JobStatus
32from chart.db import ts
33from chart.db.model.table import TableInfo
34from chartepssg.alg.settings import scid_to_sid
35from chart.db.exceptions import DuplicateDataError
36
37
38logger = logging.getLogger()
39
40# Event Classname
41EVENT_CLASS = 'MOC-ARCHIVE-EVENTS'
42
43# Time decoder
44TIME_DECODER = '%Y%m%d%H%M%S'
45
46# Number of lines in a single MOC Archive Data Report record
47NUM_LINES_RECORD = 13
48
49# End of File reached
50END_OF_FILE = -1
51
52# store each manoeuvre data line
53Report = namedtuple('Report', 'msg_id '
54 'category '
55 'domain '
56 'event_id '
57 'event_type '
58 'gen_time '
59 'source '
60 'application '
61 'severity '
62 'msg_type '
63 'workstation '
64 'message')
65
66# filename example:
67# SGxx_LOG_MOC__VAL_MON_HIST___G20231130135424Z_S20231130090006Z_E20231130160005Z.zip
68FILENAME_MATCHER_ZIP = re.compile(
69 r'^(?P<scid>[a-zA-Z0-9]+)_'
70 r'[\w]+_MON_HIST___'
71 r'G[0-9]+Z_'
72 r'S(?P<start>[0-9]+)Z_'
73 r'E(?P<stop>[0-9]+)Z'
74 r'.zip$'
75)
76FILENAME_MATCHER_TXT = re.compile(
77 r'^(?P<scid>[a-zA-Z0-9]+)_'
78 r'[\w]+_MON_HIST___'
79 r'G[0-9]+Z_'
80 r'S(?P<start>[0-9]+)Z_'
81 r'E(?P<stop>[0-9]+)Z'
82 r'.txt$'
83)
84
85
86def fileattr(filename):
87 """Given a `filename` return a tuple of SID, sensing start time.
88
89 They look like:
90 SGxx_LOG_MOC__eee_MON_HIST___GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.zip
91
92 where: eee = OPE | VAL | TST """
93
94 if filename.suffix == '.zip':
95 match = FILENAME_MATCHER_ZIP.match(filename.name)
96 else:
97 match = FILENAME_MATCHER_TXT.match(filename.name)
98
99 if not match:
100 raise ValueError('File name {f} not recognised as MOC Archive Report product'.format(
101 f=filename))
102
103 groups = match.groupdict()
104
105 return (
106 scid_to_sid(groups["scid"]),
107 datetime.strptime(groups['start'], TIME_DECODER),
108 datetime.strptime(groups['stop'], TIME_DECODER))
109
110
111DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
112
113
114def first_token(line):
115 """Extracts the first string token of a `line` within a text file
116 """
117
118 if not isinstance(line, str):
119 line = line.decode('utf-8')
120
121 line = line.strip().replace('\r', '').replace('\n', '')
122 tokens = line.split(': ')
123 return tokens[1]
124
125
126def parse_report(report, input_file):
127 """Return a Satellite Mass Report data from `filename`.
128 """
129
130 # get event data parameters
131 msg_id = first_token(report[0])
132 category = first_token(report[1])
133 domain = int(first_token(report[2]))
134 event_id = first_token(report[3])
135 event_type = first_token(report[4])
136 # generation timestamp
137 gen_time = datetime.strptime(first_token(report[5]), DATE_FORMAT)
138 source = first_token(report[6])
139 application = first_token(report[7])
140 severity = first_token(report[8])
141 msg_type = first_token(report[9])
142 workstation = first_token(report[10])
143 message = first_token(report[11])
144 end_of_rec = report[12]
145
146 while len(end_of_rec) > 1:
147 # message extends over several lines
148 message = message + end_of_rec
149 end_of_rec = next(input_file)
150
151 res = Report(msg_id = msg_id, category = category, domain = domain,
152 event_id = event_id, event_type = event_type, gen_time = gen_time,
153 source = source, application = application, severity = severity,
154 msg_type = msg_type, workstation = workstation, message = message)
155
156 # logger.info('Parsed item: {item}'.format(item=res))
157
158 return res
159
160
161def ingest_moc_events(filename,
162 table=TableInfo('MOC_ARCHIVE_EVENTS')):
163 """Insert Manoeuvre History objects from `source` into `table`."""
164 sid, start_time, stop_time = fileattr(filename)
165
166 # setup insert cursor
167 ins_cur = ts.insert(table=table,
168 fields=['SENSING_TIME'] +\
169 SID.insert_fields +\
170 ['MSG_ID',
171 'CATEGORY',
172 'DOMAIN',
173 'EVENT_ID',
174 'EVENT_TYPE',
175 'GEN_TIME',
176 'SOURCE',
177 'APPLICATION',
178 'SEVERITY',
179 'MSG_TYPE',
180 'WORKSTATION',
181 'MESSAGE'])
182
183 if filename.suffix == '.zip':
184 # Open zip file
185 with zipfile.ZipFile(str(filename)) as zip_file:
186 # Go through zip contents and get only the DBL file within the zip
187 for member in zip_file.infolist():
188 if member.filename.endswith('.txt'):
189 # Read contents of txt file
190 filename = member.filename
191
192 cc = 0
193 first = start_time
194 last = stop_time
195 report = 0
196
197 if filename.suffix == '.txt':
198 with open(filename) as input_file:
199 while report != END_OF_FILE:
200 try:
201 # read file, record by record
202 report = parse_report([next(input_file) for _ in range(NUM_LINES_RECORD)], input_file)
203 first = min(first, report.gen_time)
204 last = max(last, report.gen_time)
205
206 try:
207 ins_cur.execute(None,
208 [report.gen_time] +\
209 sid.bind_insert() +\
210 [report.msg_id,
211 report.category,
212 report.domain,
213 report.event_id,
214 report.event_type,
215 report.gen_time,
216 report.source,
217 report.application,
218 report.severity,
219 report.msg_type,
220 report.workstation,
221 report.message])
222
223 except DuplicateDataError:
224 ins_cur.rollback()
225 # this is a guess but it's the most likely reason for a fail
226 # it might be slightly faster to cache UIDs in code and remove duplicates within
227 # each file first
228 logger.info('Duplicate entry found on {time}: removing old entry'.format(time=report.gen_time))
229 ts.delete(sid=sid,
230 table=table,
231 sensing_time=report.gen_time,
232 inclusive=True)
233
234 ins_cur.execute(None,
235 [report.gen_time] +\
236 sid.bind_insert() +\
237 [report.msg_id,
238 report.category,
239 report.domain,
240 report.event_id,
241 report.event_type,
242 report.gen_time,
243 report.source,
244 report.application,
245 report.severity,
246 report.msg_type,
247 report.workstation,
248 report.message])
249
250 cc += 1
251
252 except StopIteration:
253 # end of file reached
254 logger.info('Ingested {n} MOC Archive Reports as Events from {start} to {end}'.format(
255 n=cc, start=first, end=last))
256 ins_cur.commit()
257 return END_OF_FILE
258
259
260def main():
261 """Command line entry point"""
262 wo, resultfile, _ = init_algorithm()
263
264 for job in wo.read_jobs():
265 ingest_moc_events(job.filename)
266 job.status = JobStatus.COMPLETED
267 resultfile.add_job(job)
268
269 logger.info('MOC Archive Events file ingestion complete')
270
271if __name__ == '__main__':
272 main()