1#!/usr/bin/env python3
2
3"""Ingester module for the Executable Schedule file type (SCHED_HIST).
4
5The data is ingested into 2 separate tables: Acitivities and Actions
6
7This is a sample XML file which contains a schedule header and body with metadata and data.
8data element has entries that could be a Task, Activity or an Event as shown below:
9
10<Schedule_History domain="SGA1">
11 <List_of_Activities count="1">
12 <Activity>
13 <Internal_Id>71fc3c25-f620-4b20-a37e-77d061fc9c1d</Internal_Id>
14 <Name>AuthenticationMode_SGA1</Name>
15 <Proc_Id>BUC_P1</Proc_Id>
16 <Type>GR_ACT</Type>
17 <Class>OAS_PE</Class>
18 <Scheduled_Time>UTC=2022-07-12T08:40:16.279</Scheduled_Time>
19 <Execution_Time>UTC=2022-07-12T08:40:16.279</Execution_Time>
20 <Trigger_Time>UTC=2022-07-12T08:40:16.279</Trigger_Time>
21 <Absolute_Orbit>9</Absolute_Orbit>
22 <PSO>53.439</PSO>
23 <Final_Status>Expired</Final_Status>
24 <List_of_Parameters count="1"/>
25 <List_of_Actions count="8">
26 <Action>
27 <Timestamp>UTC=2022-07-12T08:26:27.861</Timestamp>
28 <Message>Activity loaded from file SGA1_SCH_MPS__BUC_SCHEX______G20220712000000Z_S20220712070000Z_E20220713000000Z.xml</Message>
29 <Status>Not Released</Status>
30 </Action>
31 ...
32 </List_of_Actions>
33 </Activity>
34 </List_of_Activities>
35 <List_of_Timeline_Actions count="3">
36 <TML_Action>
37 <Timestamp>UTC=2022-07-12T08:26:03.115</Timestamp>
38 <Message>SGA1 Timeline status changed to RUNNING</Message>
39 <Status>RUNNING</Status>
40 </TML_Action>
41 ...
42 </List_of_Timeline_Actions>
43</Schedule_History>
44"""
45
46import re
47import logging
48from datetime import datetime
49from datetime import timedelta
50from collections import namedtuple
51
52from chart.common.xml import XMLElement
53from chart.alg import init_algorithm
54from chart.backend.job import JobStatus
55from chart.db import ts
56from chart.db.model.table import TableInfo
57from chart.db import connection
58from chart.db.exceptions import DuplicateDataError
59from chart.common.xml import XMLElementNotFound
60
61from chartepssg.alg.common import jsondumps, UnknownToken
62from chartepssg.alg.settings import scid_to_sid
63
64
65logger = logging.getLogger()
66
67
68# don't automatically delete pre-existing time ranges in the EXEC_SCHEDULE table if the filename
69# shows a new product appears to over a long time range
70REPLACE_THRESHOLD = timedelta(days=50)
71
72
73# Types
74TYPE_ACTIVITY = 'Activity'
75TYPE_TML_ACTION = 'TML_Action'
76
77# Tables used to ingest the schedule
78TABLE_ACTIVITIES = TableInfo('SCHEDULE_ACTIVITIES')
79TABLE_TML_ACTIONS = TableInfo('SCHEDULE_TML_ACTIONS')
80
81
82# filename:
83# SGxx_LOG_OAS__BUC_SCH_HIST___G20220712085643Z_S20220712084016Z_E20220712085216Z.xml
84
85FILENAME_MATCHER = re.compile(
86 r'(?P<scid>[a-zA-Z0-9]+)_'
87 r'[\w]+_SCH_HIST___'
88 r'G[0-9]+Z_'
89 r'S(?P<start>[0-9]+)Z_'
90 r'E(?P<stop>[0-9]+)Z'
91 r'.xml$')
92
93
94TIME_DECODER = '%Y%m%d%H%M%S'
95
96
97def fileattr(filename):
98 """Given a `filename` return a tuple of SID, sensing start and end time.
99
100 They look like:
101 SGxx_LOG_OAS__BUC_SCH_HIST___G20220712085643Z_S20220712084016Z_E20220712085216Z.xml
102 """
103 match = FILENAME_MATCHER.match(filename.name)
104 if not match:
105 raise ValueError('File name {f} not recognised as an Executable Schedule product'.format(
106 f=filename.name))
107
108 groups = match.groupdict()
109
110 return (scid_to_sid(groups['scid']),
111 datetime.strptime(groups['start'], TIME_DECODER),
112 datetime.strptime(groups['stop'], TIME_DECODER))
113
114
115ACTIVITY = namedtuple('ACTIVITY',
116 'id '
117 'name '
118 'proc_id '
119 'ssid '
120 'schedule_time'
121 'exec_time '
122 'trig_time '
123 'abs_orbit '
124 'pso '
125 'final_status '
126 'actions '
127 'params '
128 )
129
130TML_ACTION = namedtuple('TML_ACTION',
131 'timestamp '
132 'message '
133 'staus '
134 )
135
136
137def tag(name):
138 """Convert tag name string to XMLElement object."""
139 return XMLElement.qname(None, name)
140
141
142def parse_schedule(filename):
143 """Yield a list of schedule items with schedule data from `filename`."""
144 root_elem = XMLElement(filename=filename)
145 domain = root_elem.elem.get('domain')
146 sid = scid_to_sid(domain)
147
148 #
149 # Activities
150 #
151 activities = []
152 list_activities_elem = root_elem.find(tag('List_of_Activities'))
153 for activity_elem in list_activities_elem.findall(tag('Activity')):
154 # Activity
155 activities.append(parse_activity(activity_elem))
156
157 #
158 # TML Actions
159 #
160 tml_actions = []
161 list_tml_actions_elem = root_elem.find(tag('List_of_Timeline_Actions'))
162 for action_elem in list_tml_actions_elem.findall(tag('TML_Action')):
163 # Action
164 tml_actions.append(parse_action(action_elem))
165
166 return sid, activities, tml_actions
167
168
169def parse_activity(elem):
170 """Parse the 'Activity' element in the schedule 'List_of_Activities'."""
171 activity = dict()
172
173 activity['Scheduled_Time'] = elem.parse_datetime(tag('Scheduled_Time'))
174 activity['Internal_Id'] = elem.parse_str(tag('Internal_Id'))
175 activity['Name'] = elem.parse_str(tag('Name'))
176 activity['Proc_Id'] = elem.parse_str(tag('Proc_Id'), None)
177 activity['SSID'] = elem.parse_str(tag('SSID'), None)
178 activity['Type'] = elem.parse_str(tag('Type'))
179 activity['Class'] = elem.parse_str(tag('Class'))
180 activity['Execution_Time'] = elem.parse_datetime(tag('Execution_Time'), None)
181 activity['Trigger_Time'] = elem.parse_datetime(tag('Trigger_Time'), None)
182 activity['Absolute_Orbit'] = elem.parse_int(tag('Absolute_Orbit'))
183 activity['PSO'] = elem.parse_float(tag('PSO'))
184 activity['Final_Status'] = elem.parse_str(tag('Final_Status'))
185
186 # Parameters
187 params = parse_list(elem, 'List_of_Parameters', ('Name', 'Type', 'Value'))
188 if params is None or len(params) > 0:
189 activity['Parameters'] = jsondumps(params)
190
191 # Actions
192 actions = list()
193 list_actions_elem = elem.find(tag('List_of_Actions'))
194 for action_elem in list_actions_elem.findall(tag('Action')):
195 # Action
196 actions.append(parse_action(action_elem))
197
198 if len(actions) > 0:
199 activity['Actions'] = jsondumps(actions)
200
201 return activity
202
203
204
205def parse_list(elem, parent_tag, child_tags):
206 """Parse the 'List_of_Parameters' list element under Activity item."""
207 items = list()
208 args_elem = elem.find(tag(parent_tag))
209 if args_elem is not None:
210 for arg in args_elem.findall():
211 argument = dict()
212 for tag_name in child_tags:
213 if arg.find(tag(tag_name)) is not None:
214 argument[tag_name] = arg.parse_str(tag(tag_name))
215
216 items.append(argument)
217
218 return None if len(items) == 0 else items
219
220
221def parse_action(elem):
222 """Parse the Action element in the schedule and returns its details as json data."""
223 action = dict()
224
225 if elem.find(tag('Timestamp')) is not None:
226 action['Timestamp'] = elem.parse_datetime(tag('Timestamp'))
227 if elem.find(tag('Message')) is not None:
228 action['Message'] = elem.parse_str(tag('Message'))
229 if elem.find(tag('Status')) is not None:
230 action['Status'] = elem.parse_str(tag('Status'))
231
232 return action
233
234
235def ingest_schedule(filename,
236 replace=True,
237 force_replace=False):
238 """Insert schedule history."""
239 sid, start, stop = fileattr(filename)
240 table_activities = TABLE_ACTIVITIES
241 table_actions = TABLE_TML_ACTIONS
242
243 # delete existing items
244 if replace:
245 duration = stop - start
246 if duration > REPLACE_THRESHOLD and not force_replace:
247 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
248 # delete activites
249 del_cnt = ts.delete(table=table_activities,
250 sid=sid,
251 sensing_start=start,
252 sensing_stop=stop,
253 inclusive=False,
254 commit=True)
255
256 logger.info('Deleted {d} Schedule Activities from {t} from {start} to {stop}'.format(
257 d=del_cnt, t=table_activities.name, start=start, stop=stop))
258
259 # delete actions
260 del_cnt = ts.delete(table=table_actions,
261 sid=sid,
262 sensing_start=start,
263 sensing_stop=stop,
264 inclusive=False,
265 commit=True)
266
267 logger.info('Deleted {d} Schedule TML Actions from {t} from {start} to {stop}'.format(
268 d=del_cnt, t=table_actions.name, start=start, stop=stop))
269
270 sid, activities, tml_actions = parse_schedule(filename)
271
272 # get the earliest start and latest stop so no events is missed
273 # when CHART event ingestion job is trrigered
274 epochs = set([activity['Scheduled_Time'] for activity in activities]
275 + [action['Timestamp'] for action in tml_actions])
276 start = min(min(epochs), start)
277 stop = max(max(epochs), stop)
278
279 # insert the rows
280 # Activities
281 activity_fields = ['SID_NUM', 'SENSING_TIME', 'INTERNAL_ID',
282 'NAME', 'PROC_ID', 'SSID', 'TYPE', 'CLASS',
283 'EXECUTION_TIME', 'TRIGGER_TIME', 'ABS_ORBIT', 'PSO',
284 'FINAL_STATUS', 'PARAMETERS', 'ACTIONS']
285
286 rows = [[sid.bind_insert()[0]] + list(item.values()) for item in activities]
287 cc = insert_values(table_activities, activity_fields, rows)
288 logger.info('Ingested {cc} Schedule Activities from {start} to {stop}'.format(cc=cc, start=start, stop=stop))
289
290 # TNL Action
291 tml_action_fields = ['SID_NUM', 'SENSING_TIME', 'MESSAGE', 'STATUS']
292 rows = [[sid.bind_insert()[0]] + list(item.values()) for item in tml_actions]
293 cc = insert_values(table_actions, tml_action_fields, rows)
294 logger.info('Ingested {cc} Schedule TML Actions from {start} to {stop}'.format(
295 cc=cc, start=start, stop=stop))
296
297 return sid, start, stop
298
299
300def insert_values(table, fields, rows):
301 """"Insert extracted values of `sid` and corresponding `rows` in database `table`."""
302 db_conn = connection.db_connect()
303
304 cc = 0
305
306 sql = 'INSERT INTO {table} ({fs}) VALUES %s '.format(
307 table=table.name,
308 fs=','.join(fields),)
309
310 # then insert transitions
311 for item in rows:
312 try:
313 # this is not efficient, but the only way to detect and remove duplicates
314 # with data that has JSON
315 db_conn.executemany(sql, [item])
316 db_conn.commit()
317
318 except DuplicateDataError as ex:
319 # Remove existing entry before inserting it
320 logger.warning('Duplicate(s) found {ex}. Deleting existing item'.format(ex=ex))
321
322 # delete
323 #db_conn.commit()
324 sql_del = ('DELETE FROM {table} WHERE SID_NUM = :sid_num '
325 'AND SENSING_TIME = :timestamp' ).format(table=table.name)
326 db_conn.execute(sql_del, sid_num=item[0], timestamp=item[1])
327 db_conn.commit()
328
329 # try again
330 db_conn.executemany(sql, [item])
331 db_conn.commit()
332
333 cc += 1
334
335 # commit ingestion
336 db_conn.commit()
337 return cc
338
339
340def main():
341 """Command line entry point."""
342 wo, resultfile, _ = init_algorithm()
343
344 for job in wo.read_jobs():
345
346 # ingest
347 sid, start, stop = ingest_schedule(job.filename)
348
349 #_, start, stop = fileattr(job.filename)
350
351 tables = [{'table': TABLE_ACTIVITIES,
352 'sensing_start': start,
353 'sensing_stop': stop,
354 'sid': sid},
355 {'table': TABLE_TML_ACTIONS,
356 'sensing_start': start,
357 'sensing_stop': stop,
358 'sid': sid},
359 ]
360
361 resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
362
363
364if __name__ == '__main__':
365 main()