1#!/usr/bin/env python3
  2
  3"""Ingest TM and EV products."""
  4
  5import re
  6import mmap
  7import logging
  8from datetime import datetime
  9
 10from chart.alg import init_algorithm
 11from chart.alg import init_algorithm
 12from chart.products.pus.ccsds_ingest import ingest_stream
 13from chart.products.rapidfile.rapid_io import gen_rapid_from_raw
 14from chart.alg import JobStatus
 15from chart.project import SID, settings
 16from chart.products.pus.ccsds_io import gen_ccsds_from_frames
 17from chart.products.tf.tf_io import xband_tf_from_raw
 18from chart.db.model.table import TableInfo
 19from chart.products.products import insert_product
 20from chart.products.utils import Segment
 21from chart.products.products import ProductResult
 22from chartepssg.alg.settings import scid_to_sid
 23
 24
 25logger = logging.getLogger()
 26
 27# strptime datetime format decoder
 28TIME_DECODER = "%Y-%m-%dT%H:%M:%S.%f"
 29
 30# strptime datetime format decoder for historical filenames
 31TIME_DECODER_COMPAT = "%Y%m%d%H%M%S"
 32
 33# Match TC/TM/EV filenames
 34#     SGB1_VAL__PARC_TM_2022-04-04T12:30:00.000000__04-04T12:30:50.000000
 35FILENAME_MATCHER = re.compile(
 36            r'^(?P<scid>\w{4})_'
 37            r'(?P<env>\w{3})__'
 38            r'.ARC_'
 39            r'(?P<type>\w{2})_'
 40            r'(?P<start>[0-9T:.-]{26})__'
 41            r'(?P<stop>[0-9T:.-]{26})')
 42
 43
 44def fileattr(filename):
 45    """Given a `filename` return a tuple of SID, sensing start time.
 46
 47    >>> from chart.common.path import Path
 48    >>> fileattr(Path("SGA1_VAL__PARC_TM_2023-12-07T15:28:32.826654__2023-12-07T15:29:22.637405
 49    This function is used by tc_ingester too.
 50    """
 51    match = FILENAME_MATCHER.match(filename.name)
 52    if not match:
 53        logger.warning('File name {f} not recognised as a TM, TC or SAT_EV product'.format(f=filename))
 54        return None
 55
 56    groups = match.groupdict()
 57
 58    sid = scid_to_sid(groups['scid'])
 59    env = groups['env']
 60    tm_tc_ev = groups['type']
 61    start_time = datetime.strptime(groups['start'], TIME_DECODER)
 62    stop_time = datetime.strptime(groups['stop'], TIME_DECODER)
 63
 64    return (sid, start_time)
 65
 66
 67    #for matcher in TM_FILENAME_MATCHERS:
 68    #    matched = matcher[0].match(filename.name)
 69    #    if matched is not None:
 70    #        groups = match.groupdict()
 71    #        start_time = datetime.strptime(groups['start'], matcher[1])
 72    #        sid = scid_to_sid(groups['scid'])
 73    #        env = groups['env']
 74    #        return (sid, start_time)
 75
 76    logger.warning('File name {f} not recognised as a TM or SAT_EV product'.format(f=filename))
 77    return None
 78
 79
 80def ingest_tm(sid, filename, activity):
 81    """Ingest a Telemetry file into the database."""
 82
 83    attributes = fileattr(filename)
 84    start_time = None
 85    if attributes is not None:
 86        sid, start_time = attributes
 87
 88    # Make an entry into the PRODUCTS table, remembering the ID to stamp in the TC_STORE table
 89    product_id = insert_product(
 90        activity=activity,
 91        filename=filename,
 92        result=ProductResult.IN_PROGRESS,
 93        sensing_start=start_time,
 94        sid=sid,
 95        filesize=filename.size(),
 96        mtime=filename.mtime(),
 97        commit=True,
 98    )
 99
100    # Open the file
101    handle = filename.open('rb')
102
103    # Fast memory mapped access
104    buff = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
105
106    # Create our buffer wrapper class which keeps track of the product id and file position
107    data = Segment(buff=buff, file_pos=0, product_id=product_id)
108
109    # Make CCSDS packets
110    source = gen_rapid_from_raw(buff, sid)
111
112    # Extract and write to database
113    tables = ingest_stream(sid, source, replace_existing=True)
114
115    # Mark the PRODUCT as completed
116    insert_product(activity=activity, filename=filename, result=ProductResult.OK)
117
118    return tables
119
120
121def main():
122    """Command line entry point"""
123    wo, resultfile, _ = init_algorithm()
124
125    for job in wo.read_jobs():
126        try:
127            tables = ingest_tm(
128                job.sid, job.filename, job.activity
129            )
130            resultfile.add_job(job, JobStatus.COMPLETED, tables)
131        except FileNotFoundError as e:
132            logger.error('Error processing {f}: {e}'.format(f=job.filename, e=e))
133            resultfile.add_job(job, JobStatus.FAILED)
134
135
136if __name__ == "__main__":
137    main()