1#!/usr/bin/env python3
2
3"""Ingester module for the MOC Archive Data Report file type.
4This is an ASCII file which contains a single report data as shown in this sample below:
5
6ID: TKMAtaskManager::VMCOOAS_22254
7Category: Software
8Domain: 0
9Event ID: TKMAtaskManager::VMCOOAS_22254
10Event Type: Software
11Generation Time: 2023-11-27T15:41:26.417273
12Source: E2VALOASSV03
13Application: TKMAtaskManager
14Severity: Error
15Type: Software
16Workstation: E2VALOASSV03
17Message: IPCtcpChan::sendMsg: In writev errno=104
18"""
19
20import logging
21import os
22import re
23import zipfile
24from builtins import Exception
25from collections import namedtuple
26from datetime import datetime
27
28from chart.alg import init_algorithm
29from chart.events.event import Event
30from chart.alg import JobStatus
31from chart.db import ts
32from chart.common.util import nvl_min
33from chart.common.util import nvl_max
34from chartepssg.alg.settings import scid_to_sid
35
36
37logger = logging.getLogger()
38
39# Event Classname
40EVENT_CLASS = 'MOC-ARCHIVE-EVENTS'
41
42# Time decoder
43TIME_DECODER = '%Y%m%d%H%M%S'
44
45# Number of lines in a single MOC Archive Data Report record
46NUM_LINES_RECORD = 13
47
48# End of File reached
49END_OF_FILE = -1
50
51# store each manoeuvre data line
52Report = namedtuple('Report', 'msg_id '
53 'category '
54 'domain '
55 'event_id '
56 'event_type '
57 'gen_time '
58 'source '
59 'application '
60 'severity '
61 'msg_type '
62 'workstation '
63 'message')
64
65# filename example:
66# SGxx_LOG_MOC__VAL_MON_HIST___G20231130135424Z_S20231130090006Z_E20231130160005Z.zip
67FILENAME_MATCHER_ZIP = re.compile(
68 r'^(?P<scid>[a-zA-Z0-9]+)_'
69 r'[\w]+_MON_HIST___'
70 r'G[0-9]+Z_'
71 r'S(?P<start>[0-9]+)Z_'
72 r'E(?P<stop>[0-9]+)Z'
73 r'.zip$'
74)
75FILENAME_MATCHER_TXT = re.compile(
76 r'^(?P<scid>[a-zA-Z0-9]+)_'
77 r'[\w]+_MON_HIST___'
78 r'G[0-9]+Z_'
79 r'S(?P<start>[0-9]+)Z_'
80 r'E(?P<stop>[0-9]+)Z'
81 r'.txt$'
82)
83
84
85def fileattr(filename):
86 """Given a `filename` return a tuple of SID, sensing start time.
87
88 They look like:
89 SGxx_LOG_MOC__eee_MON_HIST___GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.zip
90
91 where: eee = OPE | VAL | TST """
92
93 if filename.suffix == '.zip':
94 match = FILENAME_MATCHER_ZIP.match(filename.name)
95 else:
96 match = FILENAME_MATCHER_TXT.match(filename.name)
97
98 if not match:
99 raise ValueError('File name {f} not recognised as MOC Archive Report product'.format(
100 f=filename))
101
102 groups = match.groupdict()
103
104 return (
105 scid_to_sid(groups["scid"]),
106 datetime.strptime(groups['start'], TIME_DECODER),
107 datetime.strptime(groups['stop'], TIME_DECODER))
108
109
110DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
111
112
113def first_token(line):
114 """Extracts the first string token of a `line` within a text file
115 """
116
117 if not isinstance(line, str):
118 line = line.decode('utf-8')
119
120 line = line.strip().replace('\r', '').replace('\n', '')
121 tokens = line.split(': ')
122 return tokens[1]
123
124
125def parse_report(report, input_file):
126 """Return a Satellite Mass Report data from `filename`.
127 """
128
129 # get event data parameters
130 msg_id = first_token(report[0])
131 category = first_token(report[1])
132 domain = int(first_token(report[2]))
133 event_id = first_token(report[3])
134 event_type = first_token(report[4])
135 # generation timestamp
136 gen_time = datetime.strptime(first_token(report[5]), DATE_FORMAT)
137 source = first_token(report[6])
138 application = first_token(report[7])
139 severity = first_token(report[8])
140 msg_type = first_token(report[9])
141 workstation = first_token(report[10])
142 message = first_token(report[11])
143 end_of_rec = report[12]
144
145 while len(end_of_rec) > 1:
146 # message extends over several lines
147 message = message + end_of_rec
148 end_of_rec = next(input_file)
149
150 res = Report(msg_id = msg_id, category = category, domain = domain,
151 event_id = event_id, event_type = event_type, gen_time = gen_time,
152 source = source, application = application, severity = severity,
153 msg_type = msg_type, workstation = workstation, message = message)
154
155 # logger.info('Parsed item: {item}'.format(item=res))
156
157 return res
158
159
160def process_event(input_file, sid, eventraiser, recs):
161 """Extract a Report Record and generate an Event from it."""
162
163 try:
164 # read file, record by record
165 report = parse_report([next(input_file) for _ in range(NUM_LINES_RECORD)], input_file)
166
167 except StopIteration:
168 # end of file reached
169 return END_OF_FILE
170
171 # Create Event...
172 eventraiser.add(Event(event_classname=EVENT_CLASS,
173 sid=sid,
174 start_time=report.gen_time,
175 instance_properties={
176 'MSG_ID': report.msg_id,
177 'CATEGORY': report.category,
178 'DOMAIN': report.domain,
179 'EVENT_ID': report.event_id,
180 'EVENT_TYPE': report.event_type,
181 'GEN_TIME': report.gen_time,
182 'SOURCE': report.source,
183 'APPLICATION': report.application,
184 'SEVERITY': report.severity,
185 'MSG_TYPE': report.msg_type,
186 'WORKSTATION': report.workstation,
187 'MESSAGE': report.message,
188 }))
189 c = recs + 1
190 return c
191
192def raise_events(sid, sensing_start, sensing_stop, filename, eventraiser):
193 """Insert MOC Archive Report data from `source` into `Event_Class`."""
194
195 recs = 0
196
197 if filename.suffix == '.zip':
198 # Open zip file
199 with zipfile.ZipFile(str(filename)) as zip_file:
200 # Go through zip contents and get only the DBL file within the zip
201 for member in zip_file.infolist():
202 if member.filename.endswith('.txt'):
203 # Read contents of txt file
204 with zip_file.open(member) as input_file:
205 while recs != END_OF_FILE:
206 recs = process_event(input_file, sid, eventraiser, recs)
207
208 elif filename.suffix == '.txt':
209 with open(filename) as input_file:
210 while recs != END_OF_FILE:
211 recs = process_event(input_file, sid, eventraiser, recs)
212
213 logger.info('Ingested {n} MOC Archive Reports as Events from {start} to {end}'.format(n=recs, start=sensing_start, end=sensing_stop))
214
215
216def dispatch(wo, resultfile, eventraiser):
217 """We are being run by the dispatcher tool."""
218 logging.info('Starting {ec} Events'.format(ec=EVENT_CLASS))
219 for job in wo.read_jobs():
220
221 sid, start_time, stop_time = fileattr(job['filename'])
222
223 eventraiser.set_start_time(nvl_min(eventraiser.get_start_time(), start_time))
224 eventraiser.set_stop_time(nvl_max(eventraiser.get_stop_time(), stop_time))
225
226 logging.info('Begin event from {start} to {stop}'.
227 format(start=start_time,
228 stop=stop_time))
229
230 raise_events(sid,
231 start_time,
232 stop_time,
233 job['filename'],
234 eventraiser)
235
236 resultfile.add_job(job, JobStatus.COMPLETED)
237
238 eventraiser.log_report()
239 logging.info('All done')
240
241
242def main():
243 """Command line entry point.\
244 This program can either be called via `dispatcher` or standalone."""
245 dispatch(*init_algorithm())
246
247
248if __name__ == '__main__':
249 main()