1#!/usr/bin/env python3
  2
  3"""Ingester module for the Executable Schedule file type (SCHED_HIST).
  4
  5The data is ingested into 2 separate tables: Acitivities and Actions
  6
  7This is a sample XML file which contains a schedule header and body with metadata and data.
  8data element has entries that could be a Task, Activity or an Event as shown below:
  9
 10<Schedule_History domain="SGA1">
 11  <List_of_Activities count="1">
 12    <Activity>
 13      <Internal_Id>71fc3c25-f620-4b20-a37e-77d061fc9c1d</Internal_Id>
 14      <Name>AuthenticationMode_SGA1</Name>
 15      <Proc_Id>BUC_P1</Proc_Id>
 16      <Type>GR_ACT</Type>
 17      <Class>OAS_PE</Class>
 18      <Scheduled_Time>UTC=2022-07-12T08:40:16.279</Scheduled_Time>
 19      <Execution_Time>UTC=2022-07-12T08:40:16.279</Execution_Time>
 20      <Trigger_Time>UTC=2022-07-12T08:40:16.279</Trigger_Time>
 21      <Absolute_Orbit>9</Absolute_Orbit>
 22      <PSO>53.439</PSO>
 23      <Final_Status>Expired</Final_Status>
 24      <List_of_Parameters count="1"/>
 25      <List_of_Actions count="8">
 26        <Action>
 27          <Timestamp>UTC=2022-07-12T08:26:27.861</Timestamp>
 28          <Message>Activity loaded from file SGA1_SCH_MPS__BUC_SCHEX______G20220712000000Z_S20220712070000Z_E20220713000000Z.xml</Message>
 29          <Status>Not Released</Status>
 30        </Action>
 31      ...
 32      </List_of_Actions>
 33    </Activity>
 34  </List_of_Activities>
 35  <List_of_Timeline_Actions count="3">
 36    <TML_Action>
 37      <Timestamp>UTC=2022-07-12T08:26:03.115</Timestamp>
 38      <Message>SGA1 Timeline status changed to RUNNING</Message>
 39      <Status>RUNNING</Status>
 40    </TML_Action>
 41    ...
 42  </List_of_Timeline_Actions>
 43</Schedule_History>
 44"""
 45
 46import re
 47import logging
 48from datetime import datetime
 49from datetime import timedelta
 50from collections import namedtuple
 51
 52from chart.common.xml import XMLElement
 53from chart.alg import init_algorithm
 54from chart.backend.job import JobStatus
 55from chart.db import ts
 56from chart.db.model.table import TableInfo
 57from chart.db import connection
 58from chart.db.exceptions import DuplicateDataError
 59from chart.common.xml import XMLElementNotFound
 60
 61from chartepssg.alg.common import jsondumps, UnknownToken
 62from chartepssg.alg.settings import scid_to_sid
 63
 64
 65logger = logging.getLogger()
 66
 67
 68# don't automatically delete pre-existing time ranges in the EXEC_SCHEDULE table if the filename
 69# shows a new product appears to over a long time range
 70REPLACE_THRESHOLD = timedelta(days=50)
 71
 72
 73# Types
 74TYPE_ACTIVITY = 'Activity'
 75TYPE_TML_ACTION = 'TML_Action'
 76
 77# Tables used to ingest the schedule
 78TABLE_ACTIVITIES = TableInfo('SCHEDULE_ACTIVITIES')
 79TABLE_TML_ACTIONS = TableInfo('SCHEDULE_TML_ACTIONS')
 80
 81
 82# filename:
 83# SGxx_LOG_OAS__BUC_SCH_HIST___G20220712085643Z_S20220712084016Z_E20220712085216Z.xml
 84
 85FILENAME_MATCHER = re.compile(
 86        r'(?P<scid>[a-zA-Z0-9]+)_'
 87        r'[\w]+_SCH_HIST___'
 88        r'G[0-9]+Z_'
 89        r'S(?P<start>[0-9]+)Z_'
 90        r'E(?P<stop>[0-9]+)Z'
 91        r'.xml$')
 92
 93
 94TIME_DECODER = '%Y%m%d%H%M%S'
 95
 96
 97def fileattr(filename):
 98    """Given a `filename` return a tuple of SID, sensing start and end time.
 99
100    They look like:
101    SGxx_LOG_OAS__BUC_SCH_HIST___G20220712085643Z_S20220712084016Z_E20220712085216Z.xml
102    """
103    match = FILENAME_MATCHER.match(filename.name)
104    if not match:
105        raise ValueError('File name {f} not recognised as an Executable Schedule product'.format(
106            f=filename.name))
107
108    groups = match.groupdict()
109
110    return (scid_to_sid(groups['scid']),
111            datetime.strptime(groups['start'], TIME_DECODER),
112            datetime.strptime(groups['stop'], TIME_DECODER))
113
114
115ACTIVITY = namedtuple('ACTIVITY',
116                         'id '
117                          'name '
118                          'proc_id '
119                          'ssid '
120                          'schedule_time'
121                          'exec_time '
122                          'trig_time '
123                          'abs_orbit '
124                          'pso '
125                          'final_status '
126                          'actions '
127                          'params '
128                         )
129
130TML_ACTION = namedtuple('TML_ACTION',
131                        'timestamp '
132                        'message '
133                        'staus '
134                        )
135
136
137def tag(name):
138    """Convert tag name string to XMLElement object."""
139    return XMLElement.qname(None, name)
140
141
142def parse_schedule(filename):
143    """Yield a list of schedule items with schedule data from `filename`."""
144    root_elem = XMLElement(filename=filename)
145    domain = root_elem.elem.get('domain')
146    sid = scid_to_sid(domain)
147
148    #
149    # Activities
150    #
151    activities = []
152    list_activities_elem = root_elem.find(tag('List_of_Activities'))
153    for activity_elem in list_activities_elem.findall(tag('Activity')):
154        # Activity
155        activities.append(parse_activity(activity_elem))
156
157    #
158    # TML Actions
159    #
160    tml_actions = []
161    list_tml_actions_elem = root_elem.find(tag('List_of_Timeline_Actions'))
162    for action_elem in list_tml_actions_elem.findall(tag('TML_Action')):
163        # Action
164        tml_actions.append(parse_action(action_elem))
165
166    return sid, activities, tml_actions
167
168
169def parse_activity(elem):
170    """Parse the 'Activity' element in the schedule 'List_of_Activities'."""
171    activity = dict()
172
173    activity['Scheduled_Time'] = elem.parse_datetime(tag('Scheduled_Time'))
174    activity['Internal_Id'] = elem.parse_str(tag('Internal_Id'))
175    activity['Name'] = elem.parse_str(tag('Name'))
176    activity['Proc_Id'] = elem.parse_str(tag('Proc_Id'), None)
177    activity['SSID'] = elem.parse_str(tag('SSID'), None)
178    activity['Type'] = elem.parse_str(tag('Type'))
179    activity['Class'] = elem.parse_str(tag('Class'))
180    activity['Execution_Time'] = elem.parse_datetime(tag('Execution_Time'), None)
181    activity['Trigger_Time'] = elem.parse_datetime(tag('Trigger_Time'), None)
182    activity['Absolute_Orbit'] = elem.parse_int(tag('Absolute_Orbit'))
183    activity['PSO'] = elem.parse_float(tag('PSO'))
184    activity['Final_Status'] = elem.parse_str(tag('Final_Status'))
185
186    # Parameters
187    params = parse_list(elem, 'List_of_Parameters', ('Name', 'Type', 'Value'))
188    if params is None or len(params) > 0:
189        activity['Parameters'] = jsondumps(params)
190
191    # Actions
192    actions = list()
193    list_actions_elem = elem.find(tag('List_of_Actions'))
194    for action_elem in list_actions_elem.findall(tag('Action')):
195        # Action
196        actions.append(parse_action(action_elem))
197
198    if len(actions) > 0:
199        activity['Actions'] = jsondumps(actions)
200
201    return activity
202
203
204
205def parse_list(elem, parent_tag, child_tags):
206    """Parse the 'List_of_Parameters' list element under Activity item."""
207    items = list()
208    args_elem = elem.find(tag(parent_tag))
209    if args_elem is not None:
210        for arg in args_elem.findall():
211            argument = dict()
212            for tag_name in child_tags:
213                if arg.find(tag(tag_name)) is not None:
214                    argument[tag_name] = arg.parse_str(tag(tag_name))
215
216            items.append(argument)
217
218    return None if len(items) == 0 else items
219
220
221def parse_action(elem):
222    """Parse the Action element in the schedule and returns its details as json data."""
223    action = dict()
224
225    if elem.find(tag('Timestamp')) is not None:
226        action['Timestamp'] = elem.parse_datetime(tag('Timestamp'))
227    if elem.find(tag('Message')) is not None:
228        action['Message'] = elem.parse_str(tag('Message'))
229    if elem.find(tag('Status')) is not None:
230        action['Status'] = elem.parse_str(tag('Status'))
231
232    return action
233
234
235def ingest_schedule(filename,
236                     replace=True,
237                     force_replace=False):
238    """Insert schedule history."""
239    sid, start, stop = fileattr(filename)
240    table_activities = TABLE_ACTIVITIES
241    table_actions = TABLE_TML_ACTIONS
242
243    # delete existing items
244    if replace:
245        duration = stop - start
246        if duration > REPLACE_THRESHOLD and not force_replace:
247            raise ValueError('Refusing to delete replace {d}'.format(d=duration))
248        # delete activites
249        del_cnt = ts.delete(table=table_activities,
250                            sid=sid,
251                            sensing_start=start,
252                            sensing_stop=stop,
253                            inclusive=False,
254                            commit=True)
255
256        logger.info('Deleted {d} Schedule Activities from {t} from {start} to {stop}'.format(
257            d=del_cnt, t=table_activities.name, start=start, stop=stop))
258
259        # delete actions
260        del_cnt = ts.delete(table=table_actions,
261                            sid=sid,
262                            sensing_start=start,
263                            sensing_stop=stop,
264                            inclusive=False,
265                            commit=True)
266
267        logger.info('Deleted {d} Schedule TML Actions from {t} from {start} to {stop}'.format(
268            d=del_cnt, t=table_actions.name, start=start, stop=stop))
269
270    sid, activities, tml_actions = parse_schedule(filename)
271
272    # get the earliest start and latest stop so no events is missed
273    # when CHART event ingestion job is trrigered
274    epochs = set([activity['Scheduled_Time'] for activity in activities]
275            + [action['Timestamp'] for action in tml_actions])
276    start = min(min(epochs), start)
277    stop = max(max(epochs), stop)
278
279    # insert the rows
280    # Activities
281    activity_fields = ['SID_NUM', 'SENSING_TIME', 'INTERNAL_ID',
282            'NAME', 'PROC_ID', 'SSID', 'TYPE', 'CLASS',
283            'EXECUTION_TIME', 'TRIGGER_TIME', 'ABS_ORBIT', 'PSO',
284            'FINAL_STATUS', 'PARAMETERS', 'ACTIONS']
285
286    rows = [[sid.bind_insert()[0]] + list(item.values()) for item in activities]
287    cc = insert_values(table_activities, activity_fields, rows)
288    logger.info('Ingested {cc} Schedule Activities from {start} to {stop}'.format(cc=cc, start=start, stop=stop))
289
290    # TNL Action
291    tml_action_fields = ['SID_NUM', 'SENSING_TIME', 'MESSAGE', 'STATUS']
292    rows = [[sid.bind_insert()[0]] + list(item.values()) for item in tml_actions]
293    cc = insert_values(table_actions, tml_action_fields, rows)
294    logger.info('Ingested {cc} Schedule TML Actions from {start} to {stop}'.format(
295        cc=cc, start=start, stop=stop))
296
297    return sid, start, stop
298
299
300def insert_values(table, fields, rows):
301    """"Insert extracted values of `sid` and corresponding `rows` in database `table`."""
302    db_conn = connection.db_connect()
303
304    cc = 0
305
306    sql = 'INSERT INTO {table} ({fs}) VALUES  %s '.format(
307        table=table.name,
308        fs=','.join(fields),)
309
310    # then insert transitions
311    for item in rows:
312        try:
313            # this is not efficient, but the only way to detect and remove duplicates
314            # with data that has JSON
315            db_conn.executemany(sql, [item])
316            db_conn.commit()
317
318        except DuplicateDataError as ex:
319            # Remove existing entry before inserting it
320            logger.warning('Duplicate(s) found {ex}. Deleting existing item'.format(ex=ex))
321
322            # delete
323            #db_conn.commit()
324            sql_del = ('DELETE FROM {table} WHERE SID_NUM = :sid_num '
325                    'AND SENSING_TIME = :timestamp' ).format(table=table.name)
326            db_conn.execute(sql_del, sid_num=item[0], timestamp=item[1])
327            db_conn.commit()
328
329            # try again
330            db_conn.executemany(sql, [item])
331            db_conn.commit()
332
333        cc += 1
334
335    # commit ingestion
336    db_conn.commit()
337    return cc
338
339
340def main():
341    """Command line entry point."""
342    wo, resultfile, _ = init_algorithm()
343
344    for job in wo.read_jobs():
345
346        # ingest
347        sid, start, stop = ingest_schedule(job.filename)
348
349        #_, start, stop = fileattr(job.filename)
350
351        tables = [{'table': TABLE_ACTIVITIES,
352                   'sensing_start': start,
353                   'sensing_stop': stop,
354                   'sid': sid},
355                    {'table': TABLE_TML_ACTIONS,
356                   'sensing_start': start,
357                   'sensing_stop': stop,
358                   'sid': sid},
359                ]
360
361        resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
362
363
364if __name__ == '__main__':
365    main()