1#!/usr/bin/env python3
2
3"""Ingester module for the Orbit Prediction state vectors file type (OEMORBPRED).
4
5This is an XML file which contains a number of , as shown below:
6
7 <stateVector>
8 <EPOCH>2020-12-01T00:00:00.000Z</EPOCH>
9 <X>-6.780290602793E+03</X>
10 <Y>-6.797782401015E+02</Y>
11 <Z> 3.621533494135E+03</Z>
12 <X_DOT>-2.654711849949E+00</X_DOT>
13 <Y_DOT>-3.582661194453E+00</Y_DOT>
14 <Z_DOT>-5.637075170929E+00</Z_DOT>
15 </stateVector>
16
17
18"""
19
20import re
21import logging
22from datetime import datetime
23from datetime import timedelta
24from collections import namedtuple
25
26from chart import settings
27from chart.project import SID
28from chart.common.xml import XMLElement
29from chart.alg import init_algorithm
30from chart.backend.job import JobStatus
31from chart.db import ts
32from chart.db.model.table import TableInfo
33from chart.db.exceptions import DuplicateDataError
34
35from chartepssg.alg.settings import scid_to_sid
36
37
38logger = logging.getLogger()
39
40
41# don't automatically delete pre-existing time ranges in the ORBIT_PREDICTION table if the filename
42# shows a new product appears to over a long time range
43REPLACE_THRESHOLD = timedelta(days=50)
44
45# no namespace for this xml file type
46NS = None
47
48# Find the SVs
49ELEM_BODY = XMLElement.qname(NS, 'body')
50ELEM_SEGMENT = XMLElement.qname(NS, 'segment')
51ELEM_DATA = XMLElement.qname(NS, 'data')
52ELEM_SV = XMLElement.qname(NS, 'stateVector')
53
54# SV attributes
55ELEM_EPOCH = XMLElement.qname(NS, 'EPOCH')
56ELEM_X = XMLElement.qname(NS, 'X')
57ELEM_Y = XMLElement.qname(NS, 'Y')
58ELEM_Z = XMLElement.qname(NS, 'Z')
59ELEM_X_DOT = XMLElement.qname(NS, 'X_DOT')
60ELEM_Y_DOT = XMLElement.qname(NS, 'Y_DOT')
61ELEM_Z_DOT = XMLElement.qname(NS, 'Z_DOT')
62
63
64# filename:
65# SGA1_FDP_FDS__OPE_OEM________G20210421100627Z_S20210421000000Z_E20210428000000Z.xml
66
67FILENAME_MATCHER = re.compile(
68 r"^(?P<scid>\w{4})_.*_OEM_.*Z_"
69 r"S(?P<start>[0-9]+)Z_"
70 r"E(?P<stop>[0-9]+)Z\.xml$"
71)
72
73TIME_DECODER = '%Y%m%d%H%M%S'
74
75
76def fileattr(filename):
77 """Given a `filename` return a tuple of SID, sensing start and end time.
78
79 They look like:
80 SGA1_FDP_FDS__OPE_OEM________G20210421100627Z_S20210421000000Z_E20210428000000Z.xml
81 """
82 match = FILENAME_MATCHER.match(filename.name)
83 if not match:
84 raise ValueError('File name {f} not recognised as a Orbit Prediction product'.format(
85 f=filename.name))
86
87 groups = match.groupdict()
88
89 return (
90 scid_to_sid(groups["scid"]),
91 datetime.strptime(groups['start'], TIME_DECODER),
92 datetime.strptime(groups['stop'], TIME_DECODER))
93
94
95OrbitPred = namedtuple('OrbitPred', ('sid '
96 'epoch '
97 'X '
98 'Y '
99 'Z '
100 'X_DOT '
101 'Y_DOT '
102 'Z_DOT '))
103
104
105def parse_orbitpred(filename):
106 """Yield a list of Orbit Prediction Stae Vector objects from `filename`.
107
108 """
109 sid = fileattr(filename)[0]
110 root_elem = XMLElement(filename=filename)
111 body_elem = root_elem.find(ELEM_BODY)
112
113 segment_elem = body_elem.find(ELEM_SEGMENT)
114 list_elem = segment_elem.find(ELEM_DATA)
115 for sv_elem in list_elem.findall(ELEM_SV):
116 res = OrbitPred(
117 sid=sid,
118 epoch=sv_elem.parse_datetime(ELEM_EPOCH),
119 X=sv_elem.parse_float(ELEM_X),
120 Y=sv_elem.parse_float(ELEM_Y),
121 Z=sv_elem.parse_float(ELEM_Z),
122 X_DOT=sv_elem.parse_float(ELEM_X_DOT),
123 Y_DOT=sv_elem.parse_float(ELEM_Y_DOT),
124 Z_DOT=sv_elem.parse_float(ELEM_Z_DOT)
125 )
126
127 yield res
128
129
130def ingest_orbitpred(filename,
131 table=TableInfo('ORBIT4ANT'),
132 replace=True,
133 force_replace=False):
134 """Insert Orbit prediction State Vector objects from `source` into `table`."""
135 sid, start_time, stop_time = fileattr(filename)
136
137 # delete existing SVs
138 if replace:
139 duration = stop_time - start_time
140 if duration > REPLACE_THRESHOLD and not force_replace:
141 raise ValueError('Refusing to delete replace {d}'.format(d=duration))
142
143 del_cnt = ts.delete(sid=sid,
144 table=table,
145 sensing_start=start_time,
146 sensing_stop=stop_time,
147 inclusive=True,
148 commit=True) # commit needed because we use different
149 # cursors for insert and delete
150
151 logger.info('Deleted {d} SVs from {t} for {s} from {strt} to {stop}'.format(
152 d=del_cnt, t=table.name, s=sid, strt=start_time, stop=stop_time))
153
154 # before inserting new ones
155 ins_cur = ts.insert(table=table,
156 fields=['SENSING_TIME'] +\
157 SID.insert_fields +\
158 ['PRODUCT',
159 'X',
160 'Y',
161 'Z',
162 'X_DOT',
163 'Y_DOT',
164 'Z_DOT'])
165
166 cc = 0
167 first = None
168 last = None
169 for osv in parse_orbitpred(filename):
170 if first is None:
171 first = osv.epoch
172
173 last = osv.epoch
174 try:
175 ins_cur.execute(None,
176 [osv.epoch] +\
177 osv.sid.bind_insert() +\
178 [1,
179 osv.X,
180 osv.Y,
181 osv.Z,
182 osv.X_DOT,
183 osv.Y_DOT,
184 osv.Z_DOT
185 ])
186 cc += 1
187
188 except DuplicateDataError:
189 ins_cur.rollback()
190 logger.warn('Something went wrong Ingesting SV from {start} '.format(start=osv.epoch))
191
192 ins_cur.commit()
193
194 logger.info('Ingested {cc} SVs from {start} to {stop}'.format(
195 cc=cc, start=first, stop=last))
196
197
198def dispatch(wo, resultfile, _):
199 """We are being run by the dispatcher tool."""
200 for job in wo.read_jobs():
201 ingest_orbitpred(job.filename)
202 resultfile.add_job(job, JobStatus.COMPLETED)
203
204 logger.info('Orbit Prediction file ingestion complete')
205
206
207def main():
208 """Command line entry point."""
209 try:
210 # run from inside dispatcher
211 dispatch(*init_algorithm())
212 return
213
214 except init_algorithm.NotDispatcher:
215 # run as standalone tool
216 raise ValueError('This tool must be run from the dispatcher')
217
218
219if __name__ == '__main__':
220 main()