1#!/usr/bin/env python3
2
3"""Ingester module for the Executable Schedule file type (SCHEDFULL).
4
5This is a sample XML file which contains, for each entry, a header for an item that
6may be an Event or a Request as shown below:
7
8 <EVRQ>
9 <EVRQ_Header>
10 <EVRQ_Time>UTC=2020-02-06T00:36:10.920</EVRQ_Time>
11 <EVRQ_Type>Request</EVRQ_Type>
12 <EVRQ_Description>Configure TXA before Core Pass</EVRQ_Description>
13 </EVRQ_Header>
14 <RQ>
15 <RQ_Name>PXBSTXON</RQ_Name>
16 <RQ_Description>Configure TXA before Core Pass</RQ_Description>
17 <RQ_Source>PDGS</RQ_Source>
18 <RQ_Destination>MPS</RQ_Destination>
19 <RQ_Type>Orbital Angle tagged Sequence</RQ_Type>
20 <RQ_Absolute_orbit>38</RQ_Absolute_orbit>
21 <RQ_Deg_from_ANX>46.728</RQ_Deg_from_ANX>
22 <List_of_RQ_Parameters count="0"/>
23 <List_of_RQ_Attributes count="2">
24 <RQ_Attribute>
25 <RQ_Attribute_Name>RQ_ID</RQ_Attribute_Name>
26 <RQ_Attribute_Value>3</RQ_Attribute_Value>
27 </RQ_Attribute>
28 <RQ_Attribute>
29 <RQ_Attribute_Name>OPS_TYPE</RQ_Attribute_Name>
30 <RQ_Attribute_Value>MRC</RQ_Attribute_Value>
31 </RQ_Attribute>
32 </List_of_RQ_Attributes>
33 </RQ>
34 </EVRQ>
35 <EVRQ>
36 <EVRQ_Header>
37 <EVRQ_Time>UTC=2020-02-06T00:37:30.486</EVRQ_Time>
38 <EVRQ_Type>Event</EVRQ_Type>
39 <EVRQ_Description>Acquisition of Signal time for Horizon
40 Mask for Fairbanks</EVRQ_Description>
41 </EVRQ_Header>
42 <EV>
43 <EV_Name>FBK_AOS-HM</EV_Name>
44 <EV_Absolute_orbit>38</EV_Absolute_orbit>
45 <EV_Deg_from_ANX>50.974</EV_Deg_from_ANX>
46 <List_of_EV_Parameters count="0"/>
47 </EV>
48 </EVRQ>
49"""
50
51import re
52import logging
53from datetime import datetime
54from datetime import timedelta
55from collections import namedtuple
56from json import dumps as jsondumps
57
58from chart.common.xml import XMLElement
59from chart.alg import init_algorithm
60from chart.backend.job import JobStatus
61from chart.db import ts
62from chart.db.model.table import TableInfo
63from chart.db import connection
64from chart.db.exceptions import DuplicateDataError
65from chartepssg.alg.settings import scid_to_sid
66
67logger = logging.getLogger()
68
69# don't automatically delete pre-existing time ranges in the EXEC_SCHEDULE table if the filename
70# shows a new product appears to over a long time range
71REPLACE_THRESHOLD = timedelta(days=50)
72
73#NS = 'http://eop-cfi.esa.int/CFI'
74NS = None
75
76# Find the EVRQs
77ELEM_DATA_BLOCK = XMLElement.qname(NS, 'Data_Block')
78ELEM_LIST_OF_EVRQS = XMLElement.qname(NS, 'List_of_EVRQs')
79ELEM_EVRQ = XMLElement.qname(NS, 'EVRQ')
80ELEM_EVRQH = XMLElement.qname(NS, 'EVRQ_Header')
81#ELEM_RQ = XMLElement.qname(NS, 'RQ')
82ELEM_EV = XMLElement.qname(NS, 'EV')
83
84# header
85ELEM_EVRQH_TIME = XMLElement.qname(NS, 'EVRQ_Time')
86ELEM_EVRQH_TYPE = XMLElement.qname(NS, 'EVRQ_Type')
87ELEM_EVRQH_DESCRIPTION = XMLElement.qname(NS, 'EVRQ_Description')
88
89
90# Event
91ELEM_EV_NAME = XMLElement.qname(NS, 'EV_Name')
92ELEM_EV_ABS_ORB = XMLElement.qname(NS, 'EV_Absolute_orbit')
93ELEM_EV_TIME_ANX = XMLElement.qname(NS, 'EV_Time_from_ANX')
94ELEM_EV_DEG_ANX = XMLElement.qname(NS, 'EV_Deg_from_ANX')
95ELEM_EV_LIST_PARMS = XMLElement.qname(NS, 'List_of_EV_Parameters')
96
97# Request
98"""
99ELEM_RQ_NAME = XMLElement.qname(NS, 'RQ_Name')
100ELEM_RQ_SOURCE = XMLElement.qname(NS, 'RQ_Source')
101ELEM_RQ_DESTINATION = XMLElement.qname(NS, 'RQ_Destination')
102ELEM_RQ_TYPE = XMLElement.qname(NS, 'RQ_Type')
103ELEM_RQ_ABS_ORB = XMLElement.qname(NS, 'RQ_Absolute_orbit')
104ELEM_RQ_DEG = XMLElement.qname(NS, 'RQ_Deg_from_ANX')
105ELEM_RQ_EXECTIME = XMLElement.qname(NS, 'RQ_Execution_Time')
106"""
107
108# Event params
109ELEM_EV_LIST_PARMS = XMLElement.qname(NS, 'List_of_EV_Parameters')
110ELEM_EV_PARAM = XMLElement.qname(NS, 'EV_Parameter')
111ELEM_EV_PARAM_NAME = XMLElement.qname(NS, 'EV_Parameter_Name')
112ELEM_EV_PARAM_DESC = XMLElement.qname(NS, 'EV_Parameter_Description')
113#ELEM_RQ_PARAM_REP = XMLElement.qname(NS, 'EV_Parameter_Representation')
114ELEM_EV_PARAM_TYPE = XMLElement.qname(NS, 'EV_Parameter_Type')
115ELEM_EV_PARAM_UNIT = XMLElement.qname(NS, 'EV_Parameter_Unit')
116ELEM_EV_PARAM_VAL = XMLElement.qname(NS, 'EV_Parameter_Value')
117
118"""
119ELEM_RQ_LIST_ATTRIBS = XMLElement.qname(NS, 'List_of_RQ_Attributes')
120ELEM_RQ_ATTRIB = XMLElement.qname(NS, 'RQ_Attribute')
121ELEM_RQ_ATTRIB_NAME = XMLElement.qname(NS, 'RQ_Attribute_Name')
122ELEM_RQ_ATTRIB_VAL = XMLElement.qname(NS, 'RQ_Attribute_Value')
123"""
124
125
126
127# Types
128#REQUEST_TYPE = 'Request'
129EVENT_TYPE = 'Event'
130
131# filename:
132# SGA1_FDP_FDS__OPE_GEOEV______G20220628063345Z_S20220628000000Z_E20220812000000Z.xml
133
134FILENAME_MATCHER = re.compile(
135 r'(?P<scid>[a-zA-Z0-9]+)_'
136 r'[\w]+_GEOEV______'
137 r'G[0-9]+Z_'
138 r'S(?P<start>[0-9]+)Z_'
139 r'E(?P<stop>[0-9]+)Z'
140 r'.xml$')
141
142
143TIME_DECODER = '%Y%m%d%H%M%S'
144
145
146def fileattr(filename):
147 """Given a `filename` return a tuple of SID, sensing start and end time.
148
149 They look like:
150 SGA1_FDP_FDS__OPE_GEOEV______G20220628063345Z_S20220628000000Z_E20220812000000Z.xml
151 """
152 match = FILENAME_MATCHER.match(filename.name)
153 if not match:
154 raise ValueError('File name {f} not recognised as a Geo location product'.format(
155 f=filename.name))
156
157 groups = match.groupdict()
158
159 return (scid_to_sid(groups['scid']),
160 datetime.strptime(groups['start'], TIME_DECODER),
161 datetime.strptime(groups['stop'], TIME_DECODER))
162
163
164EVENT = namedtuple('EVENT', ('time '
165 'name '
166 'description '
167 'abs_orbit '
168 'time_from_anx '
169 'deg_from_anx '
170 'params '
171 ))
172
173
174def parse_geoevents(filename):
175 """Yield a list of geoevents Events/Requests objects from `filename`."""
176
177 root_elem = XMLElement(filename=filename)
178 #data_elem = root_elem.find(ELEM_DATA_BLOCK)
179 list_event_elem = root_elem.find(ELEM_LIST_OF_EVRQS)
180 for event_elem in list_event_elem.findall(ELEM_EVRQ):
181 # header
182 event_h_elem = event_elem.find(ELEM_EVRQH)
183 time = event_h_elem.parse_datetime(ELEM_EVRQH_TIME)
184 item_type = event_h_elem.parse_str(ELEM_EVRQH_TYPE)
185 description = event_h_elem.parse_str(ELEM_EVRQH_DESCRIPTION)
186
187 # actual event or request
188 #if item_type == REQUEST_TYPE:
189 # rq_elem = event_elem.find(ELEM_RQ)
190 # name, data = parse_request(rq_elem)
191 if item_type != EVENT_TYPE:
192 raise ValueError('Geometric Events file contains an unknown event type "{}"'.format(item_type))
193
194 ev_elem = event_elem.find(ELEM_EV)
195 name = ev_elem.parse_str(ELEM_EV_NAME)
196
197 if ev_elem.find(ELEM_EV_ABS_ORB) is not None:
198 orbit = int(ev_elem.parse_int(ELEM_EV_ABS_ORB))
199
200 if ev_elem.find(ELEM_EV_TIME_ANX) is not None:
201 time_anx = float(ev_elem.parse_float(ELEM_EV_TIME_ANX))
202
203 if ev_elem.find(ELEM_EV_DEG_ANX) is not None:
204 deg_anx = float(ev_elem.parse_float(ELEM_EV_DEG_ANX))
205
206 # build up the params
207 params_elem = ev_elem.find(ELEM_EV_LIST_PARMS)
208
209 params = parse_params(params_elem)
210
211 res = EVENT(
212 time=time,
213 description=description,
214 name=name,
215 abs_orbit=orbit,
216 time_from_anx=time_anx,
217 deg_from_anx=deg_anx,
218 params=params if len(params)>0 else None
219 )
220
221 yield res
222
223
224def parse_params(params_elem):
225 """Extrcat Event parameter list"""
226
227 params = []
228
229 # build up the params
230 if params_elem is not None:
231
232 params = []
233
234 for param_elem in params_elem.findall(ELEM_EV_PARAM):
235 param = {}
236 if param_elem.find(ELEM_EV_PARAM_NAME) is not None:
237 param['EV_Parameter_Name'] = param_elem.parse_str(ELEM_EV_PARAM_NAME)
238
239 if param_elem.find(ELEM_EV_PARAM_DESC) is not None:
240 param['EV_Parameter_Description'] = param_elem.parse_str(ELEM_EV_PARAM_DESC)
241
242 if param_elem.find(ELEM_EV_PARAM_TYPE) is not None:
243 param['EV_Parameter_Type'] = param_elem.parse_str(ELEM_EV_PARAM_TYPE)
244
245 if param_elem.find(ELEM_EV_PARAM_UNIT) is not None:
246 param['EV_Parameter_Unit'] = param_elem.parse_str(ELEM_EV_PARAM_UNIT)
247
248 if param_elem.find(ELEM_EV_PARAM_VAL) is not None:
249 param['EV_Parameter_Value'] = param_elem.parse_str(ELEM_EV_PARAM_VAL)
250
251 if len(param) > 0:
252 params.append(param)
253
254 return params
255
256
257
258def ingest_geoevents(filename,
259 table,
260 replace=True,
261 force_replace=False):
262 """Insert GEO Events objects from `source` into `table`."""
263 sid = fileattr(filename)[0]
264
265 rows = []
266 cc = 0
267 first = datetime.max
268 last = datetime.min
269 for event in parse_geoevents(filename):
270 first = min(first, event.time)
271 last = max(last, event.time)
272
273 row = (event.time,
274 sid.bind_insert()[0],
275 1,
276 event.name,
277 event.description,
278 event.abs_orbit,
279 event.time_from_anx,
280 event.deg_from_anx,
281 jsondumps(event.params))
282
283 rows.append(row)
284
285 cc += 1
286
287 # delete existing items
288 if replace:
289 duration = last - first
290 if duration > REPLACE_THRESHOLD and not force_replace:
291 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
292
293 del_cnt = ts.delete(sid=sid,
294 table=table,
295 sensing_start=first,
296 sensing_stop=last,
297 inclusive=True,
298 commit=True) # commit needed because we use different
299 # cursors for insert and delete
300
301 logger.info('Deleted {d} geoevents items from {t} for {s} from {start} to {stop}'.format(
302 d=del_cnt, t=table.name, s=sid, start=first, stop=last))
303
304 insert_values(sid, table, rows)
305
306 logger.info('Ingested {cc} geoevents items from {start} to {stop}'.format(
307 cc=cc, start=first, stop=last))
308
309
310def insert_values(sid, table, rows):
311 """Insert extracted values of `sid` and corresponding `rows` in database `table`."""
312 # connection
313 db_conn = connection.db_connect()
314
315 # Fields
316 fields = ['SENSING_TIME', 'SID_NUM', 'PRODUCT',
317 'NAME', 'DESCRIPTION', 'ABSOLUTE_ORBIT_NUMBER',
318 'TIME_FROM_ANX', 'DEG_FROM_ANX',
319 'PARAMETERS']
320
321 sql = 'INSERT INTO {tablename} ({fs}) VALUES %s'.format(
322 tablename=table.name,
323 fs=','.join(fields),)
324
325 try:
326 db_conn.executemany(sql, rows)
327
328 except DuplicateDataError:
329 db_conn.commit()
330
331 # Remove existing entry before inserting it
332 logger.debug('Duplicate(s) found wiping {sid} from {strt} to {stop}'.format(
333 sid=sid, strt=rows[0][0], stop=rows[-1][0]))
334 ts.delete(sid=sid,
335 table=table,
336 sensing_start=rows[0][0],
337 sensing_stop=rows[-1][0],
338 inclusive=True,
339 commit=False)
340
341 logger.debug('Second insertion attempt')
342 db_conn.executemany(sql, rows)
343
344 # commit ingestion
345 db_conn.commit()
346
347
348def dispatch(wo, resultfile, _):
349 """We are being run by the dispatcher tool."""
350
351 table=TableInfo('GEO_EVENTS')
352
353 for job in wo.read_jobs():
354
355 sid, start, stop = fileattr(job.filename)
356 ingest_geoevents(job.filename, table)
357
358 tables=[{'table': table,
359 'sensing_start': start,
360 'sensing_stop': stop,
361 'sid': sid} ]
362 resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
363
364 logger.info('Geometric Events file ingestion complete')
365
366
367def main():
368 """Command line entry point."""
369 try:
370 # run from inside dispatcher
371 dispatch(*init_algorithm())
372 return
373
374 except init_algorithm.NotDispatcher:
375 # run as standalone tool
376 raise ValueError('This tool must be run from the dispatcher')
377
378
379if __name__ == '__main__':
380 main()