1#!/usr/bin/env python3
  2
  3"""Ingester module for the predicted orbit file type (PRED_ORB).
  4
  5This is an XML file which contains a number of , as shown below:
  6
  7    <OSV>
  8        <TAI>TAI=2020-11-28T00:00:37.000000</TAI> # ignored
  9        <UTC>UTC=2020-11-28T00:00:00.000000</UTC>
 10        <UT1>UT1=2020-11-27T23:59:59.733395</UT1> # ignored
 11        <Absolute_Orbit>+00012</Absolute_Orbit> # ignored
 12        <X unit="m">+5640123.962</X>
 13        <Y unit="m">-5250505.374</Y>
 14        <Z unit="m">-0428687.745</Z>
 15        <VX unit="m/s">+1875.066297</VX>
 16        <VY unit="m/s">+1479.418697</VY>
 17        <VZ unit="m/s">+6557.450945</VZ>
 18        <Quality>0000000000000</Quality> # ingored
 19    </OSV>
 20
 21"""
 22
 23import re
 24import logging
 25from datetime import datetime
 26from datetime import timedelta
 27from collections import namedtuple
 28
 29from chart import settings
 30from chart.project import SID
 31from chart.common.xml import XMLElement
 32from chart.alg import init_algorithm
 33from chart.backend.job import JobStatus
 34from chart.db import ts
 35from chart.db.model.table import TableInfo
 36from chartepssg.alg.settings import scid_to_sid
 37
 38logger = logging.getLogger()
 39
 40
 41# don't automatically delete pre-existing time ranges in the predict orbit table if the filename
 42# shows a new product appears to over a long time range
 43REPLACE_THRESHOLD = timedelta(days=50)
 44
 45NS = 'http://eop-cfi.esa.int/CFI'
 46
 47# Find the OSVs
 48ELEM_DATA_BLOCK = XMLElement.qname(NS, 'Data_Block')
 49ELEM_LIST_OF_OSVS = XMLElement.qname(NS, 'List_of_OSVs')
 50ELEM_OSV = XMLElement.qname(NS, 'OSV')
 51
 52# OSV attributes
 53ELEM_UTC = XMLElement.qname(NS, 'UTC')
 54# ELEM_ABS_ORB = XMLElement.qname(NS, 'Absolute_Orbit')
 55ELEM_X = XMLElement.qname(NS, 'X')
 56ELEM_Y = XMLElement.qname(NS, 'Y')
 57ELEM_Z = XMLElement.qname(NS, 'Z')
 58ELEM_VX = XMLElement.qname(NS, 'VX')
 59ELEM_VY = XMLElement.qname(NS, 'VY')
 60ELEM_VZ = XMLElement.qname(NS, 'VZ')
 61# ELEM_QUALITY = XMLElement.qname(NS, 'Quality')
 62
 63# filename:
 64# SGA1_FDP_FDS__OPE_PRED_ORB___G20210415132921Z_S20210415000000Z_E20210416120000Z.EOF
 65FILENAME_MATCHER = re.compile(
 66    r"^(?P<scid>\w{4})_.*_PRED_ORB_.*Z_"
 67    r"S(?P<start>[0-9]+)Z_"
 68    r"E(?P<stop>[0-9]+)Z\.EOF$"
 69)
 70
 71
 72TIME_DECODER = '%Y%m%d%H%M%S'
 73
 74
 75def fileattr(filename):
 76    """Given a `filename` return a tuple of SID, sensing start and end time.
 77
 78    They look like:
 79    SGA1_FDP_FDS__OPE_PRED_ORB___G20210415132921Z_S20210415000000Z_E20210416120000Z.EOF
 80    """
 81    match = FILENAME_MATCHER.match(filename.name)
 82    if not match:
 83        raise ValueError('File name {f} not recognised as a Predict Orbit  product'.format(
 84            f=filename.name))
 85
 86    groups = match.groupdict()
 87
 88    return (
 89            scid_to_sid(groups["scid"]),
 90            datetime.strptime(groups['start'], TIME_DECODER),
 91            datetime.strptime(groups['stop'], TIME_DECODER))
 92
 93
 94PredictedOrbit = namedtuple('PredictedOrbit', ('sid '
 95                                   'sensing_time '
 96                                   'X '
 97                                   'Y '
 98                                   'Z '
 99                                   'VX '
100                                   'VY '
101                                   'VZ '))
102
103
104def parse_predicted_orbit(filename):
105    """Yield a list of predict orbit OSV objects from `filename`.
106
107    """
108    sid = fileattr(filename)[0]
109    root_elem = XMLElement(filename=filename)
110    data_elem = root_elem.find(ELEM_DATA_BLOCK)
111    list_elem = data_elem.find(ELEM_LIST_OF_OSVS)
112    for osv_elem in list_elem.findall(ELEM_OSV):
113        res = PredictedOrbit(
114            sid=sid,
115            sensing_time=osv_elem.parse_datetime(ELEM_UTC),
116            X=osv_elem.parse_float(ELEM_X),
117            Y=osv_elem.parse_float(ELEM_Y),
118            Z=osv_elem.parse_float(ELEM_Z),
119            VX=osv_elem.parse_float(ELEM_VX),
120            VY=osv_elem.parse_float(ELEM_VY),
121            VZ=osv_elem.parse_float(ELEM_VZ)
122            )
123
124        yield res
125
126
127def ingest_predicted_orbit(filename,
128                     table=TableInfo('PREDICTED_ORBIT'),
129                     replace=True,
130                     force_replace=False):
131    """Insert predict orbit OSV objects from `source` into `table`."""
132    sid, start_time, stop_time = fileattr(filename)
133
134    # delete existing OSVs
135    if replace:
136        duration = stop_time - start_time
137        if duration > REPLACE_THRESHOLD and not force_replace:
138            raise ValueError('Refusing to delete replace {d}'.format(d=duration))
139
140        del_cnt = ts.delete(sid=sid,
141                            table=table,
142                            sensing_start=start_time,
143                            sensing_stop=stop_time,
144                            inclusive=True,  # ORBITREST files do include data
145                            # up to and including the filename stop time
146                            commit=True)  # commit needed because we use different
147        # cursors for insert and delete
148
149        logger.info('Deleted {d} OSVs from {t} for {s} from {strt} to {stop}'.format(
150            d=del_cnt, t=table.name, s=sid, strt=start_time, stop=stop_time))
151
152    # before inserting new ones
153    ins_cur = ts.insert(table=table,
154                        fields=['SENSING_TIME'] +\
155                        SID.insert_fields +\
156                        ['PRODUCT',
157                         'X',
158                         'Y',
159                         'Z',
160                         'VX',
161                         'VY',
162                         'VZ'])
163
164    cc = 0
165    first = None
166    last = None
167    for osv in parse_predicted_orbit(filename):
168        if first is None:
169            first = osv.sensing_time
170
171        last = osv.sensing_time
172        ins_cur.execute(None,
173                        [osv.sensing_time] +\
174                        osv.sid.bind_insert() +\
175                        [1,
176                         osv.X,
177                         osv.Y,
178                         osv.Z,
179                             osv.VX,
180                         osv.VY,
181                         osv.VZ
182                         ])
183
184        cc += 1
185
186    ins_cur.commit()
187
188    logger.info('Ingested {cc} OSVs from {start} to {stop}'.format(
189        cc=cc, start=first, stop=last))
190
191
192def dispatch(wo, resultfile, _):
193    """We are being run by the dispatcher tool."""
194    for job in wo.read_jobs():
195        ingest_predicted_orbit(job.filename)
196        resultfile.add_job(job, JobStatus.COMPLETED)
197
198    logger.info('Predict Orbit file ingestion complete')
199
200
201def main():
202    """Command line entry point."""
203    try:
204        # run from inside dispatcher
205        dispatch(*init_algorithm())
206        return
207
208    except init_algorithm.NotDispatcher:
209        # run as standalone tool
210        raise ValueError('This tool must be run from the dispatcher')
211
212
213if __name__ == '__main__':
214    main()