1#!/usr/bin/env python3
 2
 3"""Decode a representative set of TC packets."""
 4
 5import sys
 6import logging
 7
 8from chartepssg import project  # must come first
 9from chart.common.path import Path
10from chart.common.log import init_log
11from chart.project import settings
12from chart.products.rapidfile.rapid_io import gen_rapid_from_raw
13from chart.products.pus.ccsds_ingest import read_tc_packet
14from chart.common.test_utils import show
15from chart.common.test_utils import compare
16from chart.common.traits import is_listlike
17from chart.common import errors
18
19project.init_epssg()
20
21target = sys.stdout
22
23logger = logging.getLogger()
24
25PROJECT_ROOT_DIR = settings.PROJECT_HOME_DIR.parent
26TC_PRODUCT_DIR = PROJECT_ROOT_DIR.joinpath("tests", "application", "products", "tc")
27
28
29def test_tc_CSC00028():
30    """A single TC RAPIDFILE packet."""
31    logger.info("Parsing CSC00028")
32    # buff = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
33    source = gen_rapid_from_raw(
34        TC_PRODUCT_DIR.joinpath("CSC00028.rpd").open("rb").read()
35    )
36    ccsds_container = next(iter(source))
37    packet_def = ccsds_container.packet_def
38    ccsds = ccsds_container.ccsds
39    rows = list(read_tc_packet(packet_def, ccsds_container))
40    # assert len(rows) == 1
41    act = rows[
42        1
43    ]  # for each table to be written to, we get a tuple of TableInfo, values
44
45    # spot check on a few random header values
46    assert ccsds_container.rapid_header.rapid_hdr_data_size == 633
47    assert ccsds_container.tdev_header.tdev_context_id == 5
48    assert ccsds_container.tdev_tc_header.request_details_fixed_size == 328
49    assert ccsds_container.tdev_tc_var_header.overall_verification_status == 38655
50    assert ccsds_container.tdev_tc_var_header.command_name == "CSC00028"
51
52    # full validation of payload
53    req = {
54        "CSP00058": 18,
55        "CSP00059": 1,
56        "CSP00080": 1290028853.174118,  # I think this should be [129...] but we're leaving it
57        # as a scalar because there's only 1 item in the array
58        "CSP00062": ["0x00001e0cc001000719b0010000030e30"],
59    }
60
61    show("Comparing actual", act)
62    show("to req", req)
63    comp = compare(req, act)
64    if comp is not None:
65        target.write("\n".join(comp) + "\n")
66        assert False
67
68
69# def test_P88C0307_ack():
70# """Ack for P88C0307."""
71# logger.info('Parsing P88C0307_ack')
72# source = gen_rapid_from_raw(TC_PRODUCT_DIR.joinpath('P88C0307_Ack.rpd').open('rb'))
73# ccsds_container = next(iter(source))
74# packet_def = ccsds_container.packet_def
75# ccsds = ccsds_container.ccsds
76# assert ccsds is None
77# rows = list(read_tc_packet(packet_def, ccsds_container))
78# assert len(rows) == 1
79# act = rows[0]  # for each table to be written to, we get a tuple of TableInfo, values
80
81# spot check on a few random header values
82# assert ccsds_container.tdev_header.tdev_filing_key == 200