1#!/usr/bin/env python3
2
3"""Test AUX data file ingestion decoding as part of CHART-EPSSG decoding and ingestion function.
4
5These tests verify the following are Fac. req IDs and Service Def. Req (which
6include reqs that are verified by the existance of these auto tests):
7 REQ_IFC_2010: MONA-A&R-410
8 REQ_ING_050: MONA-A&R-130
9 REQ_ING_3010: MONA-A&R-410
10 REQ_TST_010: MONA-SYS-940
11 REQ_TST_020: MONA-SYS-940
12
13"""
14
15import sys
16from datetime import datetime
17from lxml import etree
18import pytest
19
20# import pytest_pgsql
21
22from chartepssg import project # must come first
23from chart.common.path import Path
24from chart.common.log import init_log
25from chart.common.traits import is_listlike
26from chart.common.test_utils import show
27from chart.common.test_utils import compare
28from chart.project import settings
29from chart.project import SID
30from chart.common.xml import XMLElement
31
32from chartepssg.alg import satmassrep_ingester
33from chartepssg.alg import manopredic_ingester
34from chartepssg.alg import manohist_ingester
35from chartepssg.alg import manoscalef_ingester
36from chartepssg.alg import orbitpred_ingester
37from chartepssg.alg import geolocation_ingester
38from chartepssg.alg import wimpyfile_ingester
39from chartepssg.alg import execschedule_ingester
40
41project.init_epssg()
42
43PROJECT_ROOT_DIR = settings.PROJECT_HOME_DIR.parent
44AUX_PRODUCT_DIR = PROJECT_ROOT_DIR.joinpath("tests", "application", "products", "aux")
45
46SATMASSREP_FILE = (
47 "S6A_xx_SATMASSREP_20230621T160000_20230621T160000_20230621T160000_MOC__OPE.txt"
48)
49MANOPREDIC_FILE = (
50 "S6A_xx_MANOPREDIC_20191211T120358_20191211T120358_20230621T160000_MOC__OPE.txt"
51)
52MANOHIST_FILE = (
53 "S6A_xx_MANOHIST___20160222T092935_20191211T121100_20230621T160000_MOC__OPE.txt"
54)
55MANOSCALEF_FILE = (
56 "S6A_xx_MANOSCALEF_20160222T092950_20191211T120356_20230621T160000_MOC__OPE.txt"
57)
58OEMORBPRED_FILE = (
59 "S6A_xx_OEMORBPRED_20201201T000000_20201211T000000_20201201T000003_MOC__TCE.xml"
60)
61ORBITREST_FILE = "S6A_xx_ORBITREST__20201128T000000_20201208T000000_20201201T000000_MOC__TCE.EOF" # Data used for geo-location
62WIMPY_FILE = (
63 "S6A_xx_WIMPYFILE__20230621T163000_20230628T163000_20230621T160000_MOC__OPE.txt"
64)
65SCHEDFULL_FILE = (
66 "S6A_xx_SCHEDFULL__20200206T000000_20200215T215831_20200205T134109_MOC__OPE.EOF"
67)
68
69SCID_TO_SID = settings.SCID_TO_SID_MAP
70
71
72def print_out(msg, obj):
73 out = msg.format(obj)
74 out += "\n"
75 sys.stdout.write(out)
76
77
78def test_SATMASSREP_decoder():
79 """Decoder test for SATMASSREP aux files.
80 Compare product file decoding against expected tuple values"""
81
82 # From visual inspection we have the followwing value
83 # 2023-06-21T16:00:00 CALCULATION DATE
84 # 1.544 XS CO-ORDINATE (METER)
85 # -0.007 YS CO-ORDINATE (METER)
86 # 0.037 ZS CO-ORDINATE (METER)
87 # 1349.000 TOTAL S/C MASS (KG)
88 # 229.400 FUEL MASS (KG)
89 # 785.4 INERTIA XX (M2.KG)
90 # 2261.2 INERTIA YY (M2.KG)
91 # 2577.2 INERTIA ZZ (M2.KG)
92 # -2.5 INERTIA XY (M2.KG)
93 # -2.5 INERTIA YZ (M2.KG)
94 # 156.5 INERTIA ZX (M2.KG)
95 expected_result = satmassrep_ingester.Report(
96 sid=SID(SCID_TO_SID.get("S6A")),
97 calc_date=datetime(
98 2023, 6, 21, 16, 0, 0
99 ), # calc_date=datetime.strptime('2023-06-21T16:00:00', satmassrep_ingester.CALC_DATE_FORMAT),
100 xs=1.544,
101 ys=-0.007,
102 zs=0.037,
103 total_mass=1349.00,
104 fuel_mass=229.400,
105 i_xx=785.4,
106 i_yy=2261.2,
107 i_zz=2577.2,
108 i_xy=-2.5,
109 i_yz=-2.5,
110 i_zx=156.5,
111 )
112
113 # decode file
114 actual_result = satmassrep_ingester.parse_report(
115 AUX_PRODUCT_DIR.joinpath(SATMASSREP_FILE)
116 )
117
118 assert actual_result == expected_result
119 print_out("SATMASSREP decoder test passed for result: {}", expected_result)
120
121
122def test_MANOPREDIC_decoder():
123 """Decoder test for MANOPREDIC aux files.
124 Compare product file decoding against expected tuple values"""
125
126 # From visual inspection we have the followwing first and only value
127 # First burn 2019/12/11-12:03:58.533 2.091 m/s OOP OCM 17.402 deg
128 expected_result = manopredic_ingester.Manoeuvre(
129 sid=SID(SCID_TO_SID.get("S6A")),
130 exec_time=datetime(
131 2019, 12, 11, 12, 3, 58, 533000
132 ), # datetime.strptime('2019/12/11-12:03:58.533', manopredic_ingester.EXEC_TIME_FORMAT),
133 burn_id="First burn",
134 delta_val=2.091,
135 delta_unit="m/s",
136 mode_1="OOP",
137 mode_2="OCM",
138 pso_val=17.402,
139 pso_unit="deg",
140 )
141
142 # decode file
143 actual_results = list(
144 manopredic_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOPREDIC_FILE))
145 )
146
147 assert actual_results[0] == expected_result
148 print_out("MANOPREDIC decoder test passed for result: {}", expected_result)
149
150
151def test_MANOHIST_decoder():
152 """Decoder test for MANOHIST aux files.
153 Compare product file decoding against expected tuple values"""
154
155 # From visual inspection we have the followwing first and last value in the test file
156 # first entry
157 # 2016/02/22-09:29:35.000 -0.32319361D-10-0.56919703D-06 0.00000000D+00 1
158 expected_result_1st = manohist_ingester.Manoeuvre(
159 sid=SID(SCID_TO_SID.get("S6A")),
160 epoch=datetime(
161 2016, 2, 22, 9, 29, 35, 0
162 ), # datetime.strptime('2016/02/22-09:29:35.000', manohist_ingester.EPOCH_FORMAT),
163 acc_comp_1=-0.32319361e-10,
164 acc_comp_2=-0.56919703e-06,
165 acc_comp_3=0.0,
166 flag=1,
167 )
168
169 # last entry
170 # 2019/12/11-12:11:00.810 -0.49240007D-08 0.13296058D-07 0.25735859D-05 0
171 expected_result_last = manohist_ingester.Manoeuvre(
172 sid=SID(SCID_TO_SID.get("S6A")),
173 epoch=datetime(
174 2019, 12, 11, 12, 11, 0, 810000
175 ), # datetime.strptime('2019/12/11-12:11:00.810', manohist_ingester.EPOCH_FORMAT),
176 acc_comp_1=-0.49240007e-08,
177 acc_comp_2=0.13296058e-07,
178 acc_comp_3=0.25735859e-05,
179 flag=0,
180 )
181
182 # decode file
183 ENTRY_COUNT = 88
184 actual_results = list(
185 manohist_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOHIST_FILE))
186 )
187
188 assert ENTRY_COUNT == len(actual_results)
189 assert actual_results[0] == expected_result_1st
190 print_out("MANOHIST decoder test passed for first result: {}", expected_result_1st)
191 assert actual_results[ENTRY_COUNT - 1] == expected_result_last
192 print_out("MANOHIST decoder test passed for last result: {}", expected_result_last)
193
194
195def test_MANOSCALEF_decoder():
196 """Decoder test for MANOSCALEF aux files.
197 Compare product file decoding against expected tuple values"""
198
199 # From visual inspection we have the following first and last value in the test file
200 # first entry
201 # 2016/02/22-09:29:35.000 0.18947368D+03 0.90949050D+00 0.00000000D+00 0.0000000 0.0000000 0.0000000 1
202 expected_result_1st = manoscalef_ingester.Manoeuvre(
203 sid=SID(SCID_TO_SID.get("S6A")),
204 start_time=datetime(
205 2016, 2, 22, 9, 29, 35, 0
206 ), # datetime.strptime('2016/02/22-09:29:35.000', manoscalef_ingester.TIME_FORMAT),
207 factor1=0.18947368e03,
208 factor2=0.90949050e00,
209 factor3=0.0,
210 sigma1=0.0,
211 sigma2=0.0,
212 sigma3=0.0,
213 validity=1,
214 )
215
216 # last entry
217 # 2019/12/11-11:56:51.888-0.33002972D+08 0.98092502D+00 0.99928044D+00 0.0000000 0.0000000 0.0000000 1
218 expected_result_last = manoscalef_ingester.Manoeuvre(
219 sid=SID(SCID_TO_SID.get("S6A")),
220 start_time=datetime(
221 2019, 12, 11, 11, 56, 51, 888000
222 ), # datetime.strptime('2019/12/11-11:56:51.888', manoscalef_ingester.TIME_FORMAT),
223 factor1=-0.33002972e08,
224 factor2=0.98092502e00,
225 factor3=0.99928044e00,
226 sigma1=0.0,
227 sigma2=0.0,
228 sigma3=0.0,
229 validity=1,
230 )
231
232 # decode file
233 ENTRY_COUNT = 44
234 actual_results = list(
235 manoscalef_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOSCALEF_FILE))
236 )
237
238 assert ENTRY_COUNT == len(actual_results)
239 assert actual_results[0] == expected_result_1st
240 print_out(
241 "MANOSCALEF decoder test passed for first result: {}", expected_result_1st
242 )
243 assert actual_results[ENTRY_COUNT - 1] == expected_result_last
244 print_out(
245 "MANOSCALEF decoder test passed for last result: {}", expected_result_last
246 )
247
248
249# @pytest.mark.skip
250def test_OEMORBPRED_decoder():
251 """Decoder test for OEMORBPRED aux files.
252 Compare product file decoding against expected tuple values"""
253
254 # From visual inspection we have the followwing first and last value in the test file
255 # first entry
256 OSV = (
257 "<stateVector>"
258 r" <EPOCH>2020-12-01T00:00:00.000Z</EPOCH>"
259 r" <X>-6.780290602793E+03</X>"
260 r" <Y>-6.797782401015E+02</Y>"
261 r" <Z> 3.621533494135E+03</Z>"
262 r" <X_DOT>-2.654711849949E+00</X_DOT>"
263 r" <Y_DOT>-3.582661194453E+00</Y_DOT>"
264 r" <Z_DOT>-5.637075170929E+00</Z_DOT>"
265 r"</stateVector>"
266 )
267
268 sv_elem = XMLElement(elem=etree.fromstring(OSV))
269 expected_result_1st = orbitpred_ingester.OrbitPred(
270 sid=SID(SCID_TO_SID.get("S6A")),
271 epoch=sv_elem.parse_datetime(orbitpred_ingester.ELEM_EPOCH),
272 X=sv_elem.parse_float(orbitpred_ingester.ELEM_X),
273 Y=sv_elem.parse_float(orbitpred_ingester.ELEM_Y),
274 Z=sv_elem.parse_float(orbitpred_ingester.ELEM_Z),
275 X_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_X_DOT),
276 Y_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Y_DOT),
277 Z_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Z_DOT),
278 )
279
280 # last entry
281 OSV = (
282 "<stateVector>"
283 r" <EPOCH>2020-12-11T00:00:00.000Z</EPOCH>"
284 r" <X>-7.708596908701E+03</X>"
285 r" <Y> 2.943034417082E+02</Y>"
286 r" <Z> 2.368717141878E+02</Z>"
287 r" <X_DOT>-3.136954824396E-01</X_DOT>"
288 r" <Y_DOT>-2.910077728944E+00</Y_DOT>"
289 r" <Z_DOT>-6.565386930871E+00</Z_DOT>"
290 r"</stateVector>"
291 )
292
293 sv_elem = XMLElement(elem=etree.fromstring(OSV))
294 expected_result_last = orbitpred_ingester.OrbitPred(
295 sid=SID(SCID_TO_SID.get("S6A")),
296 epoch=sv_elem.parse_datetime(orbitpred_ingester.ELEM_EPOCH),
297 X=sv_elem.parse_float(orbitpred_ingester.ELEM_X),
298 Y=sv_elem.parse_float(orbitpred_ingester.ELEM_Y),
299 Z=sv_elem.parse_float(orbitpred_ingester.ELEM_Z),
300 X_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_X_DOT),
301 Y_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Y_DOT),
302 Z_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Z_DOT),
303 )
304
305 # decode file
306 ENTRY_COUNT = 14401
307 actual_results = list(
308 orbitpred_ingester.parse_orbitpred(AUX_PRODUCT_DIR.joinpath(OEMORBPRED_FILE))
309 )
310
311 assert ENTRY_COUNT == len(actual_results)
312 assert actual_results[0] == expected_result_1st
313 print_out(
314 "OEMORBPRED decoder test passed for first result: {}", expected_result_1st
315 )
316 assert actual_results[ENTRY_COUNT - 1] == expected_result_last
317 print_out(
318 "OEMORBPRED decoder test passed for last result: {}", expected_result_last
319 )
320
321
322# @pytest.mark.skip
323def test_ORBITREST_decoder():
324 """Decoder test for ORBITREST aux files a.k.a. Geo-Location data
325 Compare product file decoding against expected tuple values"""
326
327 # From visual inspection we have the followwing first and last value in the test file
328 # added a namespace to be consistent with the XML source files and ingester
329 # first entry
330 OSV = (
331 '<OSV xmlns="http://eop-cfi.esa.int/CFI">'
332 r" <TAI>TAI=2020-11-28T00:00:37.000000</TAI>"
333 r" <UTC>UTC=2020-11-28T00:00:00.000000</UTC>"
334 r" <UT1>UT1=2020-11-27T23:59:59.733395</UT1>"
335 r" <Absolute_Orbit>+00012</Absolute_Orbit>"
336 r' <X unit="m">+5640123.962</X>'
337 r' <Y unit="m">-5250505.374</Y>'
338 r' <Z unit="m">-0428687.745</Z>'
339 r' <VX unit="m/s">+1875.066297</VX>'
340 r' <VY unit="m/s">+1479.418697</VY>'
341 r' <VZ unit="m/s">+6557.450945</VZ>'
342 r" <Quality>0000000000000</Quality>"
343 r"</OSV>"
344 )
345
346 sv_elem = XMLElement(elem=etree.fromstring(OSV))
347 expected_result_1st = geolocation_ingester.GeoLocation(
348 sid=SID(SCID_TO_SID.get("S6A")),
349 sensing_time=sv_elem.parse_datetime(geolocation_ingester.ELEM_UTC),
350 X=sv_elem.parse_float(geolocation_ingester.ELEM_X),
351 Y=sv_elem.parse_float(geolocation_ingester.ELEM_Y),
352 Z=sv_elem.parse_float(geolocation_ingester.ELEM_Z),
353 VX=sv_elem.parse_float(geolocation_ingester.ELEM_VX),
354 VY=sv_elem.parse_float(geolocation_ingester.ELEM_VY),
355 VZ=sv_elem.parse_float(geolocation_ingester.ELEM_VZ),
356 )
357
358 # added a namespace to be consistent with the XML source files and ingester
359 # last entry
360 OSV = (
361 '<OSV xmlns="http://eop-cfi.esa.int/CFI">'
362 r" <TAI>TAI=2020-12-08T00:00:37.000000</TAI>"
363 r" <UTC>UTC=2020-12-08T00:00:00.000000</UTC>"
364 r" <UT1>UT1=2020-12-07T23:59:59.729687</UT1>"
365 r" <Absolute_Orbit>+00141</Absolute_Orbit>"
366 r' <X unit="m">+3418036.177</X>'
367 r' <Y unit="m">-6217588.551</Y>'
368 r' <Z unit="m">+3034870.831</Z>'
369 r' <VX unit="m/s">+1105.387922</VX>'
370 r' <VY unit="m/s">+3504.261900</VY>'
371 r' <VZ unit="m/s">+5928.448157</VZ>'
372 r" <Quality>0000000000000</Quality>"
373 r"</OSV>"
374 )
375
376 sv_elem = XMLElement(elem=etree.fromstring(OSV))
377 expected_result_last = geolocation_ingester.GeoLocation(
378 sid=SID(SCID_TO_SID.get("S6A")),
379 sensing_time=sv_elem.parse_datetime(geolocation_ingester.ELEM_UTC),
380 X=sv_elem.parse_float(geolocation_ingester.ELEM_X),
381 Y=sv_elem.parse_float(geolocation_ingester.ELEM_Y),
382 Z=sv_elem.parse_float(geolocation_ingester.ELEM_Z),
383 VX=sv_elem.parse_float(geolocation_ingester.ELEM_VX),
384 VY=sv_elem.parse_float(geolocation_ingester.ELEM_VY),
385 VZ=sv_elem.parse_float(geolocation_ingester.ELEM_VZ),
386 )
387
388 # decode file
389 ENTRY_COUNT = 28801
390 actual_results = list(
391 geolocation_ingester.parse_geolocation(AUX_PRODUCT_DIR.joinpath(ORBITREST_FILE))
392 )
393
394 assert ENTRY_COUNT == len(actual_results)
395 assert actual_results[0] == expected_result_1st
396 print_out("ORBITREST decoder test passed for first result: {}", expected_result_1st)
397 assert actual_results[ENTRY_COUNT - 1] == expected_result_last
398 print_out("ORBITREST decoder test passed for last result: {}", expected_result_last)
399
400
401def test_SCHEDFULL_decoder():
402 """Decoder test for SCHEDFULL aux files a.k.a. Executable schedule
403 Compare product file decoding against expected tuple values"""
404
405 # From visual inspection we have the followwing first request item and first event item values
406 # in the test file
407 #
408 # first Request entry
409 """
410 <EVRQ>
411 <EVRQ_Header>
412 <EVRQ_Time>UTC=2020-02-05T23:59:59.930</EVRQ_Time>
413 <EVRQ_Type>Request</EVRQ_Type>
414 <EVRQ_Description>Start Schedule Generation Request</EVRQ_Description>
415 </EVRQ_Header>
416 <RQ>
417 <RQ_Name>SCHED_ST</RQ_Name>
418 <RQ_Description>Start Schedule Generation Request</RQ_Description>
419 <RQ_Source>PDGS</RQ_Source>
420 <RQ_Destination>MPS</RQ_Destination>
421 <RQ_Type>Ground Manual Schedule</RQ_Type>
422 <List_of_RQ_Parameters count="1">
423 <RQ_Parameter>
424 <RQ_Parameter_Name>TIME_1</RQ_Parameter_Name>
425 <RQ_Parameter_Description>0</RQ_Parameter_Description>
426 <RQ_Parameter_Representation>Raw</RQ_Parameter_Representation>
427 <RQ_Parameter_Value>2020.037.00.00.00.000</RQ_Parameter_Value>
428 </RQ_Parameter>
429 </List_of_RQ_Parameters>
430 <List_of_RQ_Attributes count="0"/>
431 </RQ>
432 </EVRQ>
433 """
434
435 expected_result_rq = execschedule_ingester.RQEV(
436 sid=SID(SCID_TO_SID.get("S6A")),
437 time=datetime(2020, 2, 5, 23, 59, 59, 930000),
438 name="SCHED_ST",
439 item_type="Request",
440 description="Start Schedule Generation Request",
441 data={
442 "RQ_Name": "SCHED_ST",
443 "RQ_Type": "Ground Manual Schedule",
444 "RQ_Source": "PDGS",
445 "RQ_Destination": "MPS",
446 "List_of_RQ_Parameters": [
447 {
448 "RQ_Parameter_Name": "TIME_1",
449 "RQ_Parameter_Value": "2020.037.00.00.00.000",
450 "RQ_Parameter_Description": "0",
451 "RQ_Parameter_Representation": "Raw",
452 }
453 ],
454 },
455 )
456
457 # First Event entry
458 """
459 <EVRQ>
460 <EVRQ_Header>
461 <EVRQ_Time>UTC=2020-02-06T00:37:30.486</EVRQ_Time>
462 <EVRQ_Type>Event</EVRQ_Type>
463 <EVRQ_Description>Acquisition of Signal time for Horizon Mask for Fairbanks</EVRQ_Description>
464 </EVRQ_Header>
465 <EV>
466 <EV_Name>FBK_AOS-HM</EV_Name>
467 <EV_Absolute_orbit>38</EV_Absolute_orbit>
468 <EV_Deg_from_ANX>50.974</EV_Deg_from_ANX>
469 <List_of_EV_Parameters count="0"/>
470 </EV>
471 </EVRQ>
472 """
473
474 expected_result_ev = execschedule_ingester.RQEV(
475 sid=SID(SCID_TO_SID.get("S6A")),
476 time=datetime(2020, 2, 6, 0, 37, 30, 486000),
477 name="FBK_AOS-HM",
478 item_type="Event",
479 description="Acquisition of Signal time for Horizon Mask for Fairbanks",
480 data={
481 "EV_Name": "FBK_AOS-HM",
482 "EV_Absolute_orbit": 38,
483 "EV_Deg_from_ANX": 50.974,
484 },
485 )
486
487 # decode file
488 ENTRY_COUNT = 1304
489 actual_results = list(
490 execschedule_ingester.parse_schedule(AUX_PRODUCT_DIR.joinpath(SCHEDFULL_FILE))
491 )
492
493 assert ENTRY_COUNT == len(actual_results)
494 assert actual_results[0] == expected_result_rq
495 print_out("SCHEDFULL decoder test passed for first result: {}", expected_result_rq)
496
497 assert actual_results[3] == expected_result_ev
498 print_out("SCHEDFULL decoder test passed for last result: {}", expected_result_ev)
499
500
501def test_WIMPYFILE_decoder():
502 """Decoder test for WIMPYFILE aux files.
503 Compare product file decoding against expected tuple values"""
504
505 # We're going to set the file metadata, and check them against the test data file
506 expected_result = (
507 SID(SCID_TO_SID.get("S6A")),
508 datetime(
509 2023, 6, 21, 16, 30
510 ), # datetime.strptime('20230621T163000', wimpyfile_ingester.TIME_DECODER),
511 datetime(
512 2023, 6, 28, 16, 30
513 ), # datetime.strptime('20230628T163000', wimpyfile_ingester.TIME_DECODER)
514 )
515
516 # decode file metadata
517 actual_result = wimpyfile_ingester.fileattr(AUX_PRODUCT_DIR.joinpath(WIMPY_FILE))
518
519 actual_result == expected_result
520 print_out("WIMPYFILE decoder test passed for result: {}", expected_result)
521
522
523if __name__ == "__main__":
524
525 test_SATMASSREP_decoder()
526 test_MANOPREDIC_decoder()
527 test_MANOHIST_decoder()
528 test_MANOPREDIC_decoder()
529 test_MANOSCALEF_decoder()
530 test_OEMORBPRED_decoder()
531 test_ORBITREST_decoder()
532 test_WIMPYFILE_decoder()
533 test_SCHEDFULL_decoder()