1#!/usr/bin/env python3
  2
  3"""Ingester module for the Executable Schedule file type (SCHEDFULL).
  4
  5This is a sample XML file which contains, for each entry, a header for an item that
  6may be an Event or a Request as shown below:
  7
  8          <EVRQ>
  9            <EVRQ_Header>
 10               <EVRQ_Time>UTC=2020-02-06T00:36:10.920</EVRQ_Time>
 11               <EVRQ_Type>Request</EVRQ_Type>
 12               <EVRQ_Description>Configure TXA before Core Pass</EVRQ_Description>
 13            </EVRQ_Header>
 14            <RQ>
 15               <RQ_Name>PXBSTXON</RQ_Name>
 16               <RQ_Description>Configure TXA before Core Pass</RQ_Description>
 17               <RQ_Source>PDGS</RQ_Source>
 18               <RQ_Destination>MPS</RQ_Destination>
 19               <RQ_Type>Orbital Angle tagged Sequence</RQ_Type>
 20               <RQ_Absolute_orbit>38</RQ_Absolute_orbit>
 21               <RQ_Deg_from_ANX>46.728</RQ_Deg_from_ANX>
 22               <List_of_RQ_Parameters count="0"/>
 23               <List_of_RQ_Attributes count="2">
 24                  <RQ_Attribute>
 25                     <RQ_Attribute_Name>RQ_ID</RQ_Attribute_Name>
 26                     <RQ_Attribute_Value>3</RQ_Attribute_Value>
 27                  </RQ_Attribute>
 28                  <RQ_Attribute>
 29                     <RQ_Attribute_Name>OPS_TYPE</RQ_Attribute_Name>
 30                     <RQ_Attribute_Value>MRC</RQ_Attribute_Value>
 31                  </RQ_Attribute>
 32               </List_of_RQ_Attributes>
 33            </RQ>
 34         </EVRQ>
 35         <EVRQ>
 36            <EVRQ_Header>
 37               <EVRQ_Time>UTC=2020-02-06T00:37:30.486</EVRQ_Time>
 38               <EVRQ_Type>Event</EVRQ_Type>
 39               <EVRQ_Description>Acquisition of Signal time for Horizon
 40                    Mask for Fairbanks</EVRQ_Description>
 41            </EVRQ_Header>
 42            <EV>
 43               <EV_Name>FBK_AOS-HM</EV_Name>
 44               <EV_Absolute_orbit>38</EV_Absolute_orbit>
 45               <EV_Deg_from_ANX>50.974</EV_Deg_from_ANX>
 46               <List_of_EV_Parameters count="0"/>
 47            </EV>
 48         </EVRQ>
 49"""
 50
 51import re
 52import logging
 53from datetime import datetime
 54from datetime import timedelta
 55from collections import namedtuple
 56from json import dumps as jsondumps
 57
 58from chart.common.xml import XMLElement
 59from chart.alg import init_algorithm
 60from chart.backend.job import JobStatus
 61from chart.db import ts
 62from chart.db.model.table import TableInfo
 63from chart.db import connection
 64from chart.db.exceptions import DuplicateDataError
 65from chartepssg.alg.settings import scid_to_sid
 66
 67logger = logging.getLogger()
 68
 69# don't automatically delete pre-existing time ranges in the EXEC_SCHEDULE table if the filename
 70# shows a new product appears to over a long time range
 71REPLACE_THRESHOLD = timedelta(days=50)
 72
 73#NS = 'http://eop-cfi.esa.int/CFI'
 74NS = None
 75
 76# Find the EVRQs
 77ELEM_DATA_BLOCK = XMLElement.qname(NS, 'Data_Block')
 78ELEM_LIST_OF_EVRQS = XMLElement.qname(NS, 'List_of_EVRQs')
 79ELEM_EVRQ = XMLElement.qname(NS, 'EVRQ')
 80ELEM_EVRQH = XMLElement.qname(NS, 'EVRQ_Header')
 81#ELEM_RQ = XMLElement.qname(NS, 'RQ')
 82ELEM_EV = XMLElement.qname(NS, 'EV')
 83
 84#  header
 85ELEM_EVRQH_TIME = XMLElement.qname(NS, 'EVRQ_Time')
 86ELEM_EVRQH_TYPE = XMLElement.qname(NS, 'EVRQ_Type')
 87ELEM_EVRQH_DESCRIPTION = XMLElement.qname(NS, 'EVRQ_Description')
 88
 89
 90# Event
 91ELEM_EV_NAME = XMLElement.qname(NS, 'EV_Name')
 92ELEM_EV_ABS_ORB = XMLElement.qname(NS, 'EV_Absolute_orbit')
 93ELEM_EV_TIME_ANX = XMLElement.qname(NS, 'EV_Time_from_ANX')
 94ELEM_EV_DEG_ANX = XMLElement.qname(NS, 'EV_Deg_from_ANX')
 95ELEM_EV_LIST_PARMS = XMLElement.qname(NS, 'List_of_EV_Parameters')
 96
 97# Request
 98"""
 99ELEM_RQ_NAME = XMLElement.qname(NS, 'RQ_Name')
100ELEM_RQ_SOURCE = XMLElement.qname(NS, 'RQ_Source')
101ELEM_RQ_DESTINATION = XMLElement.qname(NS, 'RQ_Destination')
102ELEM_RQ_TYPE = XMLElement.qname(NS, 'RQ_Type')
103ELEM_RQ_ABS_ORB = XMLElement.qname(NS, 'RQ_Absolute_orbit')
104ELEM_RQ_DEG = XMLElement.qname(NS, 'RQ_Deg_from_ANX')
105ELEM_RQ_EXECTIME = XMLElement.qname(NS, 'RQ_Execution_Time')
106"""
107
108# Event params
109ELEM_EV_LIST_PARMS = XMLElement.qname(NS, 'List_of_EV_Parameters')
110ELEM_EV_PARAM = XMLElement.qname(NS, 'EV_Parameter')
111ELEM_EV_PARAM_NAME = XMLElement.qname(NS, 'EV_Parameter_Name')
112ELEM_EV_PARAM_DESC = XMLElement.qname(NS, 'EV_Parameter_Description')
113#ELEM_RQ_PARAM_REP = XMLElement.qname(NS, 'EV_Parameter_Representation')
114ELEM_EV_PARAM_TYPE = XMLElement.qname(NS, 'EV_Parameter_Type')
115ELEM_EV_PARAM_UNIT = XMLElement.qname(NS, 'EV_Parameter_Unit')
116ELEM_EV_PARAM_VAL = XMLElement.qname(NS, 'EV_Parameter_Value')
117
118"""
119ELEM_RQ_LIST_ATTRIBS = XMLElement.qname(NS, 'List_of_RQ_Attributes')
120ELEM_RQ_ATTRIB = XMLElement.qname(NS, 'RQ_Attribute')
121ELEM_RQ_ATTRIB_NAME = XMLElement.qname(NS, 'RQ_Attribute_Name')
122ELEM_RQ_ATTRIB_VAL = XMLElement.qname(NS, 'RQ_Attribute_Value')
123"""
124
125
126
127# Types
128#REQUEST_TYPE = 'Request'
129EVENT_TYPE = 'Event'
130
131# filename:
132# SGA1_FDP_FDS__OPE_GEOEV______G20220628063345Z_S20220628000000Z_E20220812000000Z.xml
133
134FILENAME_MATCHER = re.compile(
135    r'(?P<scid>[a-zA-Z0-9]+)_'
136    r'[\w]+_GEOEV______'
137    r'G[0-9]+Z_'
138    r'S(?P<start>[0-9]+)Z_'
139    r'E(?P<stop>[0-9]+)Z'
140    r'.xml$')
141
142
143TIME_DECODER = '%Y%m%d%H%M%S'
144
145
146def fileattr(filename):
147    """Given a `filename` return a tuple of SID, sensing start and end time.
148
149    They look like:
150    SGA1_FDP_FDS__OPE_GEOEV______G20220628063345Z_S20220628000000Z_E20220812000000Z.xml
151    """
152    match = FILENAME_MATCHER.match(filename.name)
153    if not match:
154        raise ValueError('File name {f} not recognised as a Geo location product'.format(
155            f=filename.name))
156
157    groups = match.groupdict()
158
159    return (scid_to_sid(groups['scid']),
160            datetime.strptime(groups['start'], TIME_DECODER),
161            datetime.strptime(groups['stop'], TIME_DECODER))
162
163
164EVENT = namedtuple('EVENT', ('time '
165                            'name '
166                            'description '
167                            'abs_orbit '
168                            'time_from_anx '
169                            'deg_from_anx '
170                            'params '
171                            ))
172
173
174def parse_geoevents(filename):
175    """Yield a list of geoevents Events/Requests objects from `filename`."""
176
177    root_elem = XMLElement(filename=filename)
178    #data_elem = root_elem.find(ELEM_DATA_BLOCK)
179    list_event_elem = root_elem.find(ELEM_LIST_OF_EVRQS)
180    for event_elem in list_event_elem.findall(ELEM_EVRQ):
181        # header
182        event_h_elem = event_elem.find(ELEM_EVRQH)
183        time = event_h_elem.parse_datetime(ELEM_EVRQH_TIME)
184        item_type = event_h_elem.parse_str(ELEM_EVRQH_TYPE)
185        description = event_h_elem.parse_str(ELEM_EVRQH_DESCRIPTION)
186
187        # actual event or request
188        #if item_type == REQUEST_TYPE:
189        #    rq_elem = event_elem.find(ELEM_RQ)
190        #    name, data = parse_request(rq_elem)
191        if item_type != EVENT_TYPE:
192            raise ValueError('Geometric Events file contains an unknown event type "{}"'.format(item_type))
193
194        ev_elem = event_elem.find(ELEM_EV)
195        name = ev_elem.parse_str(ELEM_EV_NAME)
196
197        if ev_elem.find(ELEM_EV_ABS_ORB) is not None:
198            orbit = int(ev_elem.parse_int(ELEM_EV_ABS_ORB))
199
200        if ev_elem.find(ELEM_EV_TIME_ANX) is not None:
201            time_anx = float(ev_elem.parse_float(ELEM_EV_TIME_ANX))
202
203        if ev_elem.find(ELEM_EV_DEG_ANX) is not None:
204            deg_anx = float(ev_elem.parse_float(ELEM_EV_DEG_ANX))
205
206        # build up the params
207        params_elem = ev_elem.find(ELEM_EV_LIST_PARMS)
208
209        params = parse_params(params_elem)
210
211        res = EVENT(
212            time=time,
213            description=description,
214            name=name,
215            abs_orbit=orbit,
216            time_from_anx=time_anx,
217            deg_from_anx=deg_anx,
218            params=params if len(params)>0 else None
219            )
220
221        yield res
222
223
224def parse_params(params_elem):
225    """Extrcat Event parameter list"""
226
227    params = []
228
229    # build up the params
230    if params_elem is not None:
231
232        params = []
233
234        for param_elem in params_elem.findall(ELEM_EV_PARAM):
235            param = {}
236            if param_elem.find(ELEM_EV_PARAM_NAME) is not None:
237                param['EV_Parameter_Name'] = param_elem.parse_str(ELEM_EV_PARAM_NAME)
238
239            if param_elem.find(ELEM_EV_PARAM_DESC) is not None:
240                param['EV_Parameter_Description'] = param_elem.parse_str(ELEM_EV_PARAM_DESC)
241
242            if param_elem.find(ELEM_EV_PARAM_TYPE) is not None:
243                param['EV_Parameter_Type'] = param_elem.parse_str(ELEM_EV_PARAM_TYPE)
244
245            if param_elem.find(ELEM_EV_PARAM_UNIT) is not None:
246                param['EV_Parameter_Unit'] = param_elem.parse_str(ELEM_EV_PARAM_UNIT)
247
248            if param_elem.find(ELEM_EV_PARAM_VAL) is not None:
249                param['EV_Parameter_Value'] = param_elem.parse_str(ELEM_EV_PARAM_VAL)
250
251            if len(param) > 0:
252                params.append(param)
253
254    return params
255
256
257
258def ingest_geoevents(filename,
259                     table,
260                     replace=True,
261                     force_replace=False):
262    """Insert GEO Events objects from `source` into `table`."""
263    sid = fileattr(filename)[0]
264
265    rows = []
266    cc = 0
267    first = datetime.max
268    last = datetime.min
269    for event in parse_geoevents(filename):
270        first = min(first, event.time)
271        last = max(last, event.time)
272
273        row = (event.time,
274                    sid.bind_insert()[0],
275                    1,
276                    event.name,
277                    event.description,
278                    event.abs_orbit,
279                     event.time_from_anx,
280                     event.deg_from_anx,
281                    jsondumps(event.params))
282
283        rows.append(row)
284
285        cc += 1
286
287    # delete existing items
288    if replace:
289        duration = last - first
290        if duration > REPLACE_THRESHOLD and not force_replace:
291            raise ValueError('Refusing to delete replace {d}'.format(d=duration))
292
293        del_cnt = ts.delete(sid=sid,
294                            table=table,
295                            sensing_start=first,
296                            sensing_stop=last,
297                            inclusive=True,
298                            commit=True)  # commit needed because we use different
299        # cursors for insert and delete
300
301        logger.info('Deleted {d} geoevents items from {t} for {s} from {start} to {stop}'.format(
302                d=del_cnt, t=table.name, s=sid, start=first, stop=last))
303
304    insert_values(sid, table, rows)
305
306    logger.info('Ingested {cc} geoevents items from {start} to {stop}'.format(
307        cc=cc, start=first, stop=last))
308
309
310def insert_values(sid, table, rows):
311    """Insert extracted values of `sid` and corresponding `rows` in database `table`."""
312    # connection
313    db_conn = connection.db_connect()
314
315    # Fields
316    fields = ['SENSING_TIME', 'SID_NUM', 'PRODUCT',
317                'NAME', 'DESCRIPTION', 'ABSOLUTE_ORBIT_NUMBER',
318                'TIME_FROM_ANX', 'DEG_FROM_ANX',
319                 'PARAMETERS']
320
321    sql = 'INSERT INTO {tablename} ({fs}) VALUES  %s'.format(
322        tablename=table.name,
323        fs=','.join(fields),)
324
325    try:
326        db_conn.executemany(sql, rows)
327
328    except DuplicateDataError:
329        db_conn.commit()
330
331        # Remove existing entry before inserting it
332        logger.debug('Duplicate(s) found wiping {sid} from {strt} to {stop}'.format(
333            sid=sid, strt=rows[0][0], stop=rows[-1][0]))
334        ts.delete(sid=sid,
335                table=table,
336                sensing_start=rows[0][0],
337                sensing_stop=rows[-1][0],
338                inclusive=True,
339                commit=False)
340
341        logger.debug('Second insertion attempt')
342        db_conn.executemany(sql, rows)
343
344    # commit ingestion
345    db_conn.commit()
346
347
348def dispatch(wo, resultfile, _):
349    """We are being run by the dispatcher tool."""
350
351    table=TableInfo('GEO_EVENTS')
352
353    for job in wo.read_jobs():
354
355        sid, start, stop = fileattr(job.filename)
356        ingest_geoevents(job.filename, table)
357
358        tables=[{'table': table,
359                'sensing_start': start,
360                'sensing_stop': stop,
361                'sid': sid} ]
362        resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
363
364    logger.info('Geometric Events file ingestion complete')
365
366
367def main():
368    """Command line entry point."""
369    try:
370        # run from inside dispatcher
371        dispatch(*init_algorithm())
372        return
373
374    except init_algorithm.NotDispatcher:
375        # run as standalone tool
376        raise ValueError('This tool must be run from the dispatcher')
377
378
379if __name__ == '__main__':
380    main()