1#!/usr/bin/env python3
2
3"""Ingest TM and EV products."""
4
5import re
6import mmap
7import logging
8from datetime import datetime
9
10from chart.alg import init_algorithm
11from chart.alg import init_algorithm
12from chart.products.pus.ccsds_ingest import ingest_stream
13from chart.products.rapidfile.rapid_io import gen_rapid_from_raw
14from chart.alg import JobStatus
15from chart.project import SID, settings
16from chart.products.pus.ccsds_io import gen_ccsds_from_frames
17from chart.products.tf.tf_io import xband_tf_from_raw
18from chart.db.model.table import TableInfo
19from chart.products.products import insert_product
20from chart.products.utils import Segment
21from chart.products.products import ProductResult
22from chartepssg.alg.settings import scid_to_sid
23
24
25logger = logging.getLogger()
26
27# strptime datetime format decoder
28TIME_DECODER = "%Y-%m-%dT%H:%M:%S.%f"
29
30# strptime datetime format decoder for historical filenames
31TIME_DECODER_COMPAT = "%Y%m%d%H%M%S"
32
33# Match TC/TM/EV filenames
34# SGB1_VAL__PARC_TM_2022-04-04T12:30:00.000000__04-04T12:30:50.000000
35FILENAME_MATCHER = re.compile(
36 r'^(?P<scid>\w{4})_'
37 r'(?P<env>\w{3})__'
38 r'.ARC_'
39 r'(?P<type>\w{2})_'
40 r'(?P<start>[0-9T:.-]{26})__'
41 r'(?P<stop>[0-9T:.-]{26})')
42
43
44def fileattr(filename):
45 """Given a `filename` return a tuple of SID, sensing start time.
46
47 >>> from chart.common.path import Path
48 >>> fileattr(Path("SGA1_VAL__PARC_TM_2023-12-07T15:28:32.826654__2023-12-07T15:29:22.637405
49 This function is used by tc_ingester too.
50 """
51 match = FILENAME_MATCHER.match(filename.name)
52 if not match:
53 logger.warning('File name {f} not recognised as a TM, TC or SAT_EV product'.format(f=filename))
54 return None
55
56 groups = match.groupdict()
57
58 sid = scid_to_sid(groups['scid'])
59 env = groups['env']
60 tm_tc_ev = groups['type']
61 start_time = datetime.strptime(groups['start'], TIME_DECODER)
62 stop_time = datetime.strptime(groups['stop'], TIME_DECODER)
63
64 return (sid, start_time)
65
66
67 #for matcher in TM_FILENAME_MATCHERS:
68 # matched = matcher[0].match(filename.name)
69 # if matched is not None:
70 # groups = match.groupdict()
71 # start_time = datetime.strptime(groups['start'], matcher[1])
72 # sid = scid_to_sid(groups['scid'])
73 # env = groups['env']
74 # return (sid, start_time)
75
76 logger.warning('File name {f} not recognised as a TM or SAT_EV product'.format(f=filename))
77 return None
78
79
80def ingest_tm(sid, filename, activity):
81 """Ingest a Telemetry file into the database."""
82
83 attributes = fileattr(filename)
84 start_time = None
85 if attributes is not None:
86 sid, start_time = attributes
87
88 # Make an entry into the PRODUCTS table, remembering the ID to stamp in the TC_STORE table
89 product_id = insert_product(
90 activity=activity,
91 filename=filename,
92 result=ProductResult.IN_PROGRESS,
93 sensing_start=start_time,
94 sid=sid,
95 filesize=filename.size(),
96 mtime=filename.mtime(),
97 commit=True,
98 )
99
100 # Open the file
101 handle = filename.open('rb')
102
103 # Fast memory mapped access
104 buff = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
105
106 # Create our buffer wrapper class which keeps track of the product id and file position
107 data = Segment(buff=buff, file_pos=0, product_id=product_id)
108
109 # Make CCSDS packets
110 source = gen_rapid_from_raw(buff, sid)
111
112 # Extract and write to database
113 tables = ingest_stream(sid, source, replace_existing=True)
114
115 # Mark the PRODUCT as completed
116 insert_product(activity=activity, filename=filename, result=ProductResult.OK)
117
118 return tables
119
120
121def main():
122 """Command line entry point"""
123 wo, resultfile, _ = init_algorithm()
124
125 for job in wo.read_jobs():
126 try:
127 tables = ingest_tm(
128 job.sid, job.filename, job.activity
129 )
130 resultfile.add_job(job, JobStatus.COMPLETED, tables)
131 except FileNotFoundError as e:
132 logger.error('Error processing {f}: {e}'.format(f=job.filename, e=e))
133 resultfile.add_job(job, JobStatus.FAILED)
134
135
136if __name__ == "__main__":
137 main()