1#!/usr/bin/env python3
2
3"""Decode a representative set of TC packets."""
4
5import sys
6import logging
7
8from chartepssg import project # must come first
9from chart.common.path import Path
10from chart.common.log import init_log
11from chart.project import settings
12from chart.products.rapidfile.rapid_io import gen_rapid_from_raw
13from chart.products.pus.ccsds_ingest import read_tc_packet
14from chart.common.test_utils import show
15from chart.common.test_utils import compare
16from chart.common.traits import is_listlike
17from chart.common import errors
18
19project.init_epssg()
20
21target = sys.stdout
22
23logger = logging.getLogger()
24
25PROJECT_ROOT_DIR = settings.PROJECT_HOME_DIR.parent
26TC_PRODUCT_DIR = PROJECT_ROOT_DIR.joinpath("tests", "application", "products", "tc")
27
28
29def test_tc_CSC00028():
30 """A single TC RAPIDFILE packet."""
31 logger.info("Parsing CSC00028")
32 # buff = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
33 source = gen_rapid_from_raw(
34 TC_PRODUCT_DIR.joinpath("CSC00028.rpd").open("rb").read()
35 )
36 ccsds_container = next(iter(source))
37 packet_def = ccsds_container.packet_def
38 ccsds = ccsds_container.ccsds
39 rows = list(read_tc_packet(packet_def, ccsds_container))
40 # assert len(rows) == 1
41 act = rows[
42 1
43 ] # for each table to be written to, we get a tuple of TableInfo, values
44
45 # spot check on a few random header values
46 assert ccsds_container.rapid_header.rapid_hdr_data_size == 633
47 assert ccsds_container.tdev_header.tdev_context_id == 5
48 assert ccsds_container.tdev_tc_header.request_details_fixed_size == 328
49 assert ccsds_container.tdev_tc_var_header.overall_verification_status == 38655
50 assert ccsds_container.tdev_tc_var_header.command_name == "CSC00028"
51
52 # full validation of payload
53 req = {
54 "CSP00058": 18,
55 "CSP00059": 1,
56 "CSP00080": 1290028853.174118, # I think this should be [129...] but we're leaving it
57 # as a scalar because there's only 1 item in the array
58 "CSP00062": ["0x00001e0cc001000719b0010000030e30"],
59 }
60
61 show("Comparing actual", act)
62 show("to req", req)
63 comp = compare(req, act)
64 if comp is not None:
65 target.write("\n".join(comp) + "\n")
66 assert False
67
68
69# def test_P88C0307_ack():
70# """Ack for P88C0307."""
71# logger.info('Parsing P88C0307_ack')
72# source = gen_rapid_from_raw(TC_PRODUCT_DIR.joinpath('P88C0307_Ack.rpd').open('rb'))
73# ccsds_container = next(iter(source))
74# packet_def = ccsds_container.packet_def
75# ccsds = ccsds_container.ccsds
76# assert ccsds is None
77# rows = list(read_tc_packet(packet_def, ccsds_container))
78# assert len(rows) == 1
79# act = rows[0] # for each table to be written to, we get a tuple of TableInfo, values
80
81# spot check on a few random header values
82# assert ccsds_container.tdev_header.tdev_filing_key == 200