1#!/usr/bin/env python3
  2
  3"""Test AUX data file ingestion decoding as part of CHART-EPSSG decoding and ingestion function.
  4
  5These tests verify the following are Fac. req IDs and Service Def. Req (which
  6include reqs that are verified by the existance of these auto tests):
  7    REQ_IFC_2010: MONA-A&R-410
  8    REQ_ING_050: MONA-A&R-130
  9    REQ_ING_3010: MONA-A&R-410
 10    REQ_TST_010: MONA-SYS-940
 11    REQ_TST_020: MONA-SYS-940
 12
 13"""
 14
 15import sys
 16from datetime import datetime
 17from lxml import etree
 18import pytest
 19
 20# import pytest_pgsql
 21
 22from chartepssg import project  # must come first
 23from chart.common.path import Path
 24from chart.common.log import init_log
 25from chart.common.traits import is_listlike
 26from chart.common.test_utils import show
 27from chart.common.test_utils import compare
 28from chart.project import settings
 29from chart.project import SID
 30from chart.common.xml import XMLElement
 31
 32from chartepssg.alg import satmassrep_ingester
 33from chartepssg.alg import manopredic_ingester
 34from chartepssg.alg import manohist_ingester
 35from chartepssg.alg import manoscalef_ingester
 36from chartepssg.alg import orbitpred_ingester
 37from chartepssg.alg import geolocation_ingester
 38from chartepssg.alg import wimpyfile_ingester
 39from chartepssg.alg import execschedule_ingester
 40
 41project.init_epssg()
 42
 43PROJECT_ROOT_DIR = settings.PROJECT_HOME_DIR.parent
 44AUX_PRODUCT_DIR = PROJECT_ROOT_DIR.joinpath("tests", "application", "products", "aux")
 45
 46SATMASSREP_FILE = (
 47    "S6A_xx_SATMASSREP_20230621T160000_20230621T160000_20230621T160000_MOC__OPE.txt"
 48)
 49MANOPREDIC_FILE = (
 50    "S6A_xx_MANOPREDIC_20191211T120358_20191211T120358_20230621T160000_MOC__OPE.txt"
 51)
 52MANOHIST_FILE = (
 53    "S6A_xx_MANOHIST___20160222T092935_20191211T121100_20230621T160000_MOC__OPE.txt"
 54)
 55MANOSCALEF_FILE = (
 56    "S6A_xx_MANOSCALEF_20160222T092950_20191211T120356_20230621T160000_MOC__OPE.txt"
 57)
 58OEMORBPRED_FILE = (
 59    "S6A_xx_OEMORBPRED_20201201T000000_20201211T000000_20201201T000003_MOC__TCE.xml"
 60)
 61ORBITREST_FILE = "S6A_xx_ORBITREST__20201128T000000_20201208T000000_20201201T000000_MOC__TCE.EOF"  # Data used for geo-location
 62WIMPY_FILE = (
 63    "S6A_xx_WIMPYFILE__20230621T163000_20230628T163000_20230621T160000_MOC__OPE.txt"
 64)
 65SCHEDFULL_FILE = (
 66    "S6A_xx_SCHEDFULL__20200206T000000_20200215T215831_20200205T134109_MOC__OPE.EOF"
 67)
 68
 69SCID_TO_SID = settings.SCID_TO_SID_MAP
 70
 71
 72def print_out(msg, obj):
 73    out = msg.format(obj)
 74    out += "\n"
 75    sys.stdout.write(out)
 76
 77
 78def test_SATMASSREP_decoder():
 79    """Decoder test for SATMASSREP aux files.
 80    Compare product file decoding against expected tuple values"""
 81
 82    # From visual inspection we have the followwing value
 83    # 2023-06-21T16:00:00 CALCULATION DATE
 84    # 1.544 XS CO-ORDINATE (METER)
 85    # -0.007 YS CO-ORDINATE (METER)
 86    # 0.037 ZS CO-ORDINATE (METER)
 87    # 1349.000 TOTAL S/C MASS (KG)
 88    # 229.400 FUEL MASS (KG)
 89    # 785.4 INERTIA XX (M2.KG)
 90    # 2261.2 INERTIA YY (M2.KG)
 91    # 2577.2 INERTIA ZZ (M2.KG)
 92    # -2.5 INERTIA XY (M2.KG)
 93    # -2.5 INERTIA YZ (M2.KG)
 94    # 156.5 INERTIA ZX (M2.KG)
 95    expected_result = satmassrep_ingester.Report(
 96        sid=SID(SCID_TO_SID.get("S6A")),
 97        calc_date=datetime(
 98            2023, 6, 21, 16, 0, 0
 99        ),  # calc_date=datetime.strptime('2023-06-21T16:00:00', satmassrep_ingester.CALC_DATE_FORMAT),
100        xs=1.544,
101        ys=-0.007,
102        zs=0.037,
103        total_mass=1349.00,
104        fuel_mass=229.400,
105        i_xx=785.4,
106        i_yy=2261.2,
107        i_zz=2577.2,
108        i_xy=-2.5,
109        i_yz=-2.5,
110        i_zx=156.5,
111    )
112
113    # decode file
114    actual_result = satmassrep_ingester.parse_report(
115        AUX_PRODUCT_DIR.joinpath(SATMASSREP_FILE)
116    )
117
118    assert actual_result == expected_result
119    print_out("SATMASSREP decoder test passed for result: {}", expected_result)
120
121
122def test_MANOPREDIC_decoder():
123    """Decoder test for MANOPREDIC aux files.
124    Compare product file decoding against expected tuple values"""
125
126    # From visual inspection we have the followwing first and only value
127    # First burn     2019/12/11-12:03:58.533    2.091 m/s OOP  OCM   17.402 deg
128    expected_result = manopredic_ingester.Manoeuvre(
129        sid=SID(SCID_TO_SID.get("S6A")),
130        exec_time=datetime(
131            2019, 12, 11, 12, 3, 58, 533000
132        ),  # datetime.strptime('2019/12/11-12:03:58.533', manopredic_ingester.EXEC_TIME_FORMAT),
133        burn_id="First burn",
134        delta_val=2.091,
135        delta_unit="m/s",
136        mode_1="OOP",
137        mode_2="OCM",
138        pso_val=17.402,
139        pso_unit="deg",
140    )
141
142    # decode file
143    actual_results = list(
144        manopredic_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOPREDIC_FILE))
145    )
146
147    assert actual_results[0] == expected_result
148    print_out("MANOPREDIC decoder test passed for result: {}", expected_result)
149
150
151def test_MANOHIST_decoder():
152    """Decoder test for MANOHIST aux files.
153    Compare product file decoding against expected tuple values"""
154
155    # From visual inspection we have the followwing first and last value in the test file
156    # first entry
157    # 2016/02/22-09:29:35.000    -0.32319361D-10-0.56919703D-06 0.00000000D+00    1
158    expected_result_1st = manohist_ingester.Manoeuvre(
159        sid=SID(SCID_TO_SID.get("S6A")),
160        epoch=datetime(
161            2016, 2, 22, 9, 29, 35, 0
162        ),  # datetime.strptime('2016/02/22-09:29:35.000', manohist_ingester.EPOCH_FORMAT),
163        acc_comp_1=-0.32319361e-10,
164        acc_comp_2=-0.56919703e-06,
165        acc_comp_3=0.0,
166        flag=1,
167    )
168
169    # last entry
170    # 2019/12/11-12:11:00.810    -0.49240007D-08 0.13296058D-07 0.25735859D-05    0
171    expected_result_last = manohist_ingester.Manoeuvre(
172        sid=SID(SCID_TO_SID.get("S6A")),
173        epoch=datetime(
174            2019, 12, 11, 12, 11, 0, 810000
175        ),  # datetime.strptime('2019/12/11-12:11:00.810', manohist_ingester.EPOCH_FORMAT),
176        acc_comp_1=-0.49240007e-08,
177        acc_comp_2=0.13296058e-07,
178        acc_comp_3=0.25735859e-05,
179        flag=0,
180    )
181
182    # decode file
183    ENTRY_COUNT = 88
184    actual_results = list(
185        manohist_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOHIST_FILE))
186    )
187
188    assert ENTRY_COUNT == len(actual_results)
189    assert actual_results[0] == expected_result_1st
190    print_out("MANOHIST decoder test passed for first result: {}", expected_result_1st)
191    assert actual_results[ENTRY_COUNT - 1] == expected_result_last
192    print_out("MANOHIST decoder test passed for last result: {}", expected_result_last)
193
194
195def test_MANOSCALEF_decoder():
196    """Decoder test for MANOSCALEF aux files.
197    Compare product file decoding against expected tuple values"""
198
199    # From visual inspection we have the following first and last value in the test file
200    # first entry
201    # 2016/02/22-09:29:35.000 0.18947368D+03 0.90949050D+00 0.00000000D+00   0.0000000   0.0000000   0.0000000  1
202    expected_result_1st = manoscalef_ingester.Manoeuvre(
203        sid=SID(SCID_TO_SID.get("S6A")),
204        start_time=datetime(
205            2016, 2, 22, 9, 29, 35, 0
206        ),  # datetime.strptime('2016/02/22-09:29:35.000', manoscalef_ingester.TIME_FORMAT),
207        factor1=0.18947368e03,
208        factor2=0.90949050e00,
209        factor3=0.0,
210        sigma1=0.0,
211        sigma2=0.0,
212        sigma3=0.0,
213        validity=1,
214    )
215
216    # last entry
217    # 2019/12/11-11:56:51.888-0.33002972D+08 0.98092502D+00 0.99928044D+00   0.0000000   0.0000000   0.0000000  1
218    expected_result_last = manoscalef_ingester.Manoeuvre(
219        sid=SID(SCID_TO_SID.get("S6A")),
220        start_time=datetime(
221            2019, 12, 11, 11, 56, 51, 888000
222        ),  # datetime.strptime('2019/12/11-11:56:51.888', manoscalef_ingester.TIME_FORMAT),
223        factor1=-0.33002972e08,
224        factor2=0.98092502e00,
225        factor3=0.99928044e00,
226        sigma1=0.0,
227        sigma2=0.0,
228        sigma3=0.0,
229        validity=1,
230    )
231
232    # decode file
233    ENTRY_COUNT = 44
234    actual_results = list(
235        manoscalef_ingester.parse_manoeuvres(AUX_PRODUCT_DIR.joinpath(MANOSCALEF_FILE))
236    )
237
238    assert ENTRY_COUNT == len(actual_results)
239    assert actual_results[0] == expected_result_1st
240    print_out(
241        "MANOSCALEF decoder test passed for first result: {}", expected_result_1st
242    )
243    assert actual_results[ENTRY_COUNT - 1] == expected_result_last
244    print_out(
245        "MANOSCALEF decoder test passed for last result: {}", expected_result_last
246    )
247
248
249# @pytest.mark.skip
250def test_OEMORBPRED_decoder():
251    """Decoder test for OEMORBPRED aux files.
252    Compare product file decoding against expected tuple values"""
253
254    # From visual inspection we have the followwing first and last value in the test file
255    # first entry
256    OSV = (
257        "<stateVector>"
258        r"     <EPOCH>2020-12-01T00:00:00.000Z</EPOCH>"
259        r"     <X>-6.780290602793E+03</X>"
260        r"     <Y>-6.797782401015E+02</Y>"
261        r"     <Z> 3.621533494135E+03</Z>"
262        r"      <X_DOT>-2.654711849949E+00</X_DOT>"
263        r"     <Y_DOT>-3.582661194453E+00</Y_DOT>"
264        r"     <Z_DOT>-5.637075170929E+00</Z_DOT>"
265        r"</stateVector>"
266    )
267
268    sv_elem = XMLElement(elem=etree.fromstring(OSV))
269    expected_result_1st = orbitpred_ingester.OrbitPred(
270        sid=SID(SCID_TO_SID.get("S6A")),
271        epoch=sv_elem.parse_datetime(orbitpred_ingester.ELEM_EPOCH),
272        X=sv_elem.parse_float(orbitpred_ingester.ELEM_X),
273        Y=sv_elem.parse_float(orbitpred_ingester.ELEM_Y),
274        Z=sv_elem.parse_float(orbitpred_ingester.ELEM_Z),
275        X_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_X_DOT),
276        Y_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Y_DOT),
277        Z_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Z_DOT),
278    )
279
280    # last entry
281    OSV = (
282        "<stateVector>"
283        r"    <EPOCH>2020-12-11T00:00:00.000Z</EPOCH>"
284        r"    <X>-7.708596908701E+03</X>"
285        r"    <Y> 2.943034417082E+02</Y>"
286        r"    <Z> 2.368717141878E+02</Z>"
287        r"    <X_DOT>-3.136954824396E-01</X_DOT>"
288        r"    <Y_DOT>-2.910077728944E+00</Y_DOT>"
289        r"    <Z_DOT>-6.565386930871E+00</Z_DOT>"
290        r"</stateVector>"
291    )
292
293    sv_elem = XMLElement(elem=etree.fromstring(OSV))
294    expected_result_last = orbitpred_ingester.OrbitPred(
295        sid=SID(SCID_TO_SID.get("S6A")),
296        epoch=sv_elem.parse_datetime(orbitpred_ingester.ELEM_EPOCH),
297        X=sv_elem.parse_float(orbitpred_ingester.ELEM_X),
298        Y=sv_elem.parse_float(orbitpred_ingester.ELEM_Y),
299        Z=sv_elem.parse_float(orbitpred_ingester.ELEM_Z),
300        X_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_X_DOT),
301        Y_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Y_DOT),
302        Z_DOT=sv_elem.parse_float(orbitpred_ingester.ELEM_Z_DOT),
303    )
304
305    # decode file
306    ENTRY_COUNT = 14401
307    actual_results = list(
308        orbitpred_ingester.parse_orbitpred(AUX_PRODUCT_DIR.joinpath(OEMORBPRED_FILE))
309    )
310
311    assert ENTRY_COUNT == len(actual_results)
312    assert actual_results[0] == expected_result_1st
313    print_out(
314        "OEMORBPRED decoder test passed for first result: {}", expected_result_1st
315    )
316    assert actual_results[ENTRY_COUNT - 1] == expected_result_last
317    print_out(
318        "OEMORBPRED decoder test passed for last result: {}", expected_result_last
319    )
320
321
322# @pytest.mark.skip
323def test_ORBITREST_decoder():
324    """Decoder test for ORBITREST aux files a.k.a. Geo-Location data
325    Compare product file decoding against expected tuple values"""
326
327    # From visual inspection we have the followwing first and last value in the test file
328    # added a namespace to be consistent with the XML source files and ingester
329    # first entry
330    OSV = (
331        '<OSV xmlns="http://eop-cfi.esa.int/CFI">'
332        r"    <TAI>TAI=2020-11-28T00:00:37.000000</TAI>"
333        r"    <UTC>UTC=2020-11-28T00:00:00.000000</UTC>"
334        r"    <UT1>UT1=2020-11-27T23:59:59.733395</UT1>"
335        r"    <Absolute_Orbit>+00012</Absolute_Orbit>"
336        r'    <X unit="m">+5640123.962</X>'
337        r'    <Y unit="m">-5250505.374</Y>'
338        r'    <Z unit="m">-0428687.745</Z>'
339        r'    <VX unit="m/s">+1875.066297</VX>'
340        r'    <VY unit="m/s">+1479.418697</VY>'
341        r'    <VZ unit="m/s">+6557.450945</VZ>'
342        r"    <Quality>0000000000000</Quality>"
343        r"</OSV>"
344    )
345
346    sv_elem = XMLElement(elem=etree.fromstring(OSV))
347    expected_result_1st = geolocation_ingester.GeoLocation(
348        sid=SID(SCID_TO_SID.get("S6A")),
349        sensing_time=sv_elem.parse_datetime(geolocation_ingester.ELEM_UTC),
350        X=sv_elem.parse_float(geolocation_ingester.ELEM_X),
351        Y=sv_elem.parse_float(geolocation_ingester.ELEM_Y),
352        Z=sv_elem.parse_float(geolocation_ingester.ELEM_Z),
353        VX=sv_elem.parse_float(geolocation_ingester.ELEM_VX),
354        VY=sv_elem.parse_float(geolocation_ingester.ELEM_VY),
355        VZ=sv_elem.parse_float(geolocation_ingester.ELEM_VZ),
356    )
357
358    # added a namespace to be consistent with the XML source files and ingester
359    # last entry
360    OSV = (
361        '<OSV xmlns="http://eop-cfi.esa.int/CFI">'
362        r"    <TAI>TAI=2020-12-08T00:00:37.000000</TAI>"
363        r"    <UTC>UTC=2020-12-08T00:00:00.000000</UTC>"
364        r"     <UT1>UT1=2020-12-07T23:59:59.729687</UT1>"
365        r"    <Absolute_Orbit>+00141</Absolute_Orbit>"
366        r'    <X unit="m">+3418036.177</X>'
367        r'    <Y unit="m">-6217588.551</Y>'
368        r'    <Z unit="m">+3034870.831</Z>'
369        r'    <VX unit="m/s">+1105.387922</VX>'
370        r'    <VY unit="m/s">+3504.261900</VY>'
371        r'    <VZ unit="m/s">+5928.448157</VZ>'
372        r"    <Quality>0000000000000</Quality>"
373        r"</OSV>"
374    )
375
376    sv_elem = XMLElement(elem=etree.fromstring(OSV))
377    expected_result_last = geolocation_ingester.GeoLocation(
378        sid=SID(SCID_TO_SID.get("S6A")),
379        sensing_time=sv_elem.parse_datetime(geolocation_ingester.ELEM_UTC),
380        X=sv_elem.parse_float(geolocation_ingester.ELEM_X),
381        Y=sv_elem.parse_float(geolocation_ingester.ELEM_Y),
382        Z=sv_elem.parse_float(geolocation_ingester.ELEM_Z),
383        VX=sv_elem.parse_float(geolocation_ingester.ELEM_VX),
384        VY=sv_elem.parse_float(geolocation_ingester.ELEM_VY),
385        VZ=sv_elem.parse_float(geolocation_ingester.ELEM_VZ),
386    )
387
388    # decode file
389    ENTRY_COUNT = 28801
390    actual_results = list(
391        geolocation_ingester.parse_geolocation(AUX_PRODUCT_DIR.joinpath(ORBITREST_FILE))
392    )
393
394    assert ENTRY_COUNT == len(actual_results)
395    assert actual_results[0] == expected_result_1st
396    print_out("ORBITREST decoder test passed for first result: {}", expected_result_1st)
397    assert actual_results[ENTRY_COUNT - 1] == expected_result_last
398    print_out("ORBITREST decoder test passed for last result: {}", expected_result_last)
399
400
401def test_SCHEDFULL_decoder():
402    """Decoder test for SCHEDFULL aux files a.k.a. Executable schedule
403    Compare product file decoding against expected tuple values"""
404
405    # From visual inspection we have the followwing first request item and first event item values
406    # in the test file
407    #
408    # first Request entry
409    """
410       <EVRQ>
411            <EVRQ_Header>
412               <EVRQ_Time>UTC=2020-02-05T23:59:59.930</EVRQ_Time>
413               <EVRQ_Type>Request</EVRQ_Type>
414               <EVRQ_Description>Start Schedule Generation Request</EVRQ_Description>
415            </EVRQ_Header>
416            <RQ>
417               <RQ_Name>SCHED_ST</RQ_Name>
418               <RQ_Description>Start Schedule Generation Request</RQ_Description>
419               <RQ_Source>PDGS</RQ_Source>
420               <RQ_Destination>MPS</RQ_Destination>
421               <RQ_Type>Ground Manual Schedule</RQ_Type>
422               <List_of_RQ_Parameters count="1">
423                  <RQ_Parameter>
424                     <RQ_Parameter_Name>TIME_1</RQ_Parameter_Name>
425                     <RQ_Parameter_Description>0</RQ_Parameter_Description>
426                     <RQ_Parameter_Representation>Raw</RQ_Parameter_Representation>
427                     <RQ_Parameter_Value>2020.037.00.00.00.000</RQ_Parameter_Value>
428                  </RQ_Parameter>
429               </List_of_RQ_Parameters>
430               <List_of_RQ_Attributes count="0"/>
431            </RQ>
432         </EVRQ>
433    """
434
435    expected_result_rq = execschedule_ingester.RQEV(
436        sid=SID(SCID_TO_SID.get("S6A")),
437        time=datetime(2020, 2, 5, 23, 59, 59, 930000),
438        name="SCHED_ST",
439        item_type="Request",
440        description="Start Schedule Generation Request",
441        data={
442            "RQ_Name": "SCHED_ST",
443            "RQ_Type": "Ground Manual Schedule",
444            "RQ_Source": "PDGS",
445            "RQ_Destination": "MPS",
446            "List_of_RQ_Parameters": [
447                {
448                    "RQ_Parameter_Name": "TIME_1",
449                    "RQ_Parameter_Value": "2020.037.00.00.00.000",
450                    "RQ_Parameter_Description": "0",
451                    "RQ_Parameter_Representation": "Raw",
452                }
453            ],
454        },
455    )
456
457    # First Event entry
458    """
459        <EVRQ>
460            <EVRQ_Header>
461               <EVRQ_Time>UTC=2020-02-06T00:37:30.486</EVRQ_Time>
462               <EVRQ_Type>Event</EVRQ_Type>
463               <EVRQ_Description>Acquisition of Signal time for Horizon Mask for Fairbanks</EVRQ_Description>
464            </EVRQ_Header>
465            <EV>
466               <EV_Name>FBK_AOS-HM</EV_Name>
467               <EV_Absolute_orbit>38</EV_Absolute_orbit>
468               <EV_Deg_from_ANX>50.974</EV_Deg_from_ANX>
469               <List_of_EV_Parameters count="0"/>
470            </EV>
471         </EVRQ>
472    """
473
474    expected_result_ev = execschedule_ingester.RQEV(
475        sid=SID(SCID_TO_SID.get("S6A")),
476        time=datetime(2020, 2, 6, 0, 37, 30, 486000),
477        name="FBK_AOS-HM",
478        item_type="Event",
479        description="Acquisition of Signal time for Horizon Mask for Fairbanks",
480        data={
481            "EV_Name": "FBK_AOS-HM",
482            "EV_Absolute_orbit": 38,
483            "EV_Deg_from_ANX": 50.974,
484        },
485    )
486
487    # decode file
488    ENTRY_COUNT = 1304
489    actual_results = list(
490        execschedule_ingester.parse_schedule(AUX_PRODUCT_DIR.joinpath(SCHEDFULL_FILE))
491    )
492
493    assert ENTRY_COUNT == len(actual_results)
494    assert actual_results[0] == expected_result_rq
495    print_out("SCHEDFULL decoder test passed for first result: {}", expected_result_rq)
496
497    assert actual_results[3] == expected_result_ev
498    print_out("SCHEDFULL decoder test passed for last result: {}", expected_result_ev)
499
500
501def test_WIMPYFILE_decoder():
502    """Decoder test for WIMPYFILE aux files.
503    Compare product file decoding against expected tuple values"""
504
505    # We're going to set the file metadata, and check them against the test data file
506    expected_result = (
507        SID(SCID_TO_SID.get("S6A")),
508        datetime(
509            2023, 6, 21, 16, 30
510        ),  # datetime.strptime('20230621T163000', wimpyfile_ingester.TIME_DECODER),
511        datetime(
512            2023, 6, 28, 16, 30
513        ),  # datetime.strptime('20230628T163000', wimpyfile_ingester.TIME_DECODER)
514    )
515
516    # decode file metadata
517    actual_result = wimpyfile_ingester.fileattr(AUX_PRODUCT_DIR.joinpath(WIMPY_FILE))
518
519    actual_result == expected_result
520    print_out("WIMPYFILE decoder test passed for result: {}", expected_result)
521
522
523if __name__ == "__main__":
524
525    test_SATMASSREP_decoder()
526    test_MANOPREDIC_decoder()
527    test_MANOHIST_decoder()
528    test_MANOPREDIC_decoder()
529    test_MANOSCALEF_decoder()
530    test_OEMORBPRED_decoder()
531    test_ORBITREST_decoder()
532    test_WIMPYFILE_decoder()
533    test_SCHEDFULL_decoder()