1#!/usr/bin/env python3
2
3"""Ingester module for the predicted orbit file type (PRED_ORB).
4
5This is an XML file which contains a number of , as shown below:
6
7 <OSV>
8 <TAI>TAI=2020-11-28T00:00:37.000000</TAI> # ignored
9 <UTC>UTC=2020-11-28T00:00:00.000000</UTC>
10 <UT1>UT1=2020-11-27T23:59:59.733395</UT1> # ignored
11 <Absolute_Orbit>+00012</Absolute_Orbit> # ignored
12 <X unit="m">+5640123.962</X>
13 <Y unit="m">-5250505.374</Y>
14 <Z unit="m">-0428687.745</Z>
15 <VX unit="m/s">+1875.066297</VX>
16 <VY unit="m/s">+1479.418697</VY>
17 <VZ unit="m/s">+6557.450945</VZ>
18 <Quality>0000000000000</Quality> # ingored
19 </OSV>
20
21"""
22
23import re
24import logging
25from datetime import datetime
26from datetime import timedelta
27from collections import namedtuple
28
29from chart import settings
30from chart.project import SID
31from chart.common.xml import XMLElement
32from chart.alg import init_algorithm
33from chart.backend.job import JobStatus
34from chart.db import ts
35from chart.db.model.table import TableInfo
36from chartepssg.alg.settings import scid_to_sid
37
38logger = logging.getLogger()
39
40
41# don't automatically delete pre-existing time ranges in the predict orbit table if the filename
42# shows a new product appears to over a long time range
43REPLACE_THRESHOLD = timedelta(days=50)
44
45NS = 'http://eop-cfi.esa.int/CFI'
46
47# Find the OSVs
48ELEM_DATA_BLOCK = XMLElement.qname(NS, 'Data_Block')
49ELEM_LIST_OF_OSVS = XMLElement.qname(NS, 'List_of_OSVs')
50ELEM_OSV = XMLElement.qname(NS, 'OSV')
51
52# OSV attributes
53ELEM_UTC = XMLElement.qname(NS, 'UTC')
54# ELEM_ABS_ORB = XMLElement.qname(NS, 'Absolute_Orbit')
55ELEM_X = XMLElement.qname(NS, 'X')
56ELEM_Y = XMLElement.qname(NS, 'Y')
57ELEM_Z = XMLElement.qname(NS, 'Z')
58ELEM_VX = XMLElement.qname(NS, 'VX')
59ELEM_VY = XMLElement.qname(NS, 'VY')
60ELEM_VZ = XMLElement.qname(NS, 'VZ')
61# ELEM_QUALITY = XMLElement.qname(NS, 'Quality')
62
63# filename:
64# SGA1_FDP_FDS__OPE_PRED_ORB___G20210415132921Z_S20210415000000Z_E20210416120000Z.EOF
65FILENAME_MATCHER = re.compile(
66 r"^(?P<scid>\w{4})_.*_PRED_ORB_.*Z_"
67 r"S(?P<start>[0-9]+)Z_"
68 r"E(?P<stop>[0-9]+)Z\.EOF$"
69)
70
71
72TIME_DECODER = '%Y%m%d%H%M%S'
73
74
75def fileattr(filename):
76 """Given a `filename` return a tuple of SID, sensing start and end time.
77
78 They look like:
79 SGA1_FDP_FDS__OPE_PRED_ORB___G20210415132921Z_S20210415000000Z_E20210416120000Z.EOF
80 """
81 match = FILENAME_MATCHER.match(filename.name)
82 if not match:
83 raise ValueError('File name {f} not recognised as a Predict Orbit product'.format(
84 f=filename.name))
85
86 groups = match.groupdict()
87
88 return (
89 scid_to_sid(groups["scid"]),
90 datetime.strptime(groups['start'], TIME_DECODER),
91 datetime.strptime(groups['stop'], TIME_DECODER))
92
93
94PredictedOrbit = namedtuple('PredictedOrbit', ('sid '
95 'sensing_time '
96 'X '
97 'Y '
98 'Z '
99 'VX '
100 'VY '
101 'VZ '))
102
103
104def parse_predicted_orbit(filename):
105 """Yield a list of predict orbit OSV objects from `filename`.
106
107 """
108 sid = fileattr(filename)[0]
109 root_elem = XMLElement(filename=filename)
110 data_elem = root_elem.find(ELEM_DATA_BLOCK)
111 list_elem = data_elem.find(ELEM_LIST_OF_OSVS)
112 for osv_elem in list_elem.findall(ELEM_OSV):
113 res = PredictedOrbit(
114 sid=sid,
115 sensing_time=osv_elem.parse_datetime(ELEM_UTC),
116 X=osv_elem.parse_float(ELEM_X),
117 Y=osv_elem.parse_float(ELEM_Y),
118 Z=osv_elem.parse_float(ELEM_Z),
119 VX=osv_elem.parse_float(ELEM_VX),
120 VY=osv_elem.parse_float(ELEM_VY),
121 VZ=osv_elem.parse_float(ELEM_VZ)
122 )
123
124 yield res
125
126
127def ingest_predicted_orbit(filename,
128 table=TableInfo('PREDICTED_ORBIT'),
129 replace=True,
130 force_replace=False):
131 """Insert predict orbit OSV objects from `source` into `table`."""
132 sid, start_time, stop_time = fileattr(filename)
133
134 # delete existing OSVs
135 if replace:
136 duration = stop_time - start_time
137 if duration > REPLACE_THRESHOLD and not force_replace:
138 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
139
140 del_cnt = ts.delete(sid=sid,
141 table=table,
142 sensing_start=start_time,
143 sensing_stop=stop_time,
144 inclusive=True, # ORBITREST files do include data
145 # up to and including the filename stop time
146 commit=True) # commit needed because we use different
147 # cursors for insert and delete
148
149 logger.info('Deleted {d} OSVs from {t} for {s} from {strt} to {stop}'.format(
150 d=del_cnt, t=table.name, s=sid, strt=start_time, stop=stop_time))
151
152 # before inserting new ones
153 ins_cur = ts.insert(table=table,
154 fields=['SENSING_TIME'] +\
155 SID.insert_fields +\
156 ['PRODUCT',
157 'X',
158 'Y',
159 'Z',
160 'VX',
161 'VY',
162 'VZ'])
163
164 cc = 0
165 first = None
166 last = None
167 for osv in parse_predicted_orbit(filename):
168 if first is None:
169 first = osv.sensing_time
170
171 last = osv.sensing_time
172 ins_cur.execute(None,
173 [osv.sensing_time] +\
174 osv.sid.bind_insert() +\
175 [1,
176 osv.X,
177 osv.Y,
178 osv.Z,
179 osv.VX,
180 osv.VY,
181 osv.VZ
182 ])
183
184 cc += 1
185
186 ins_cur.commit()
187
188 logger.info('Ingested {cc} OSVs from {start} to {stop}'.format(
189 cc=cc, start=first, stop=last))
190
191
192def dispatch(wo, resultfile, _):
193 """We are being run by the dispatcher tool."""
194 for job in wo.read_jobs():
195 ingest_predicted_orbit(job.filename)
196 resultfile.add_job(job, JobStatus.COMPLETED)
197
198 logger.info('Predict Orbit file ingestion complete')
199
200
201def main():
202 """Command line entry point."""
203 try:
204 # run from inside dispatcher
205 dispatch(*init_algorithm())
206 return
207
208 except init_algorithm.NotDispatcher:
209 # run as standalone tool
210 raise ValueError('This tool must be run from the dispatcher')
211
212
213if __name__ == '__main__':
214 main()