1#!/usr/bin/env python3
  2
  3"""Ingester module for the Manoeuvre Prediction file type.
  4This is an ASCII file usually contains a single Manoevre execution data,
  5as shown in this sample below:
  6
  7First burn     2019/12/11-12:03:58.533    2.091 m/s OOP  OCM   17.402 deg
  8
  9"""
 10
 11import re
 12import logging
 13from datetime import datetime
 14from datetime import timedelta
 15from collections import namedtuple
 16
 17from chart.project import SID
 18from chart.alg import init_algorithm
 19from chart.db.model.table import TableInfo
 20from chart.db import ts
 21from chart.alg import JobStatus
 22from chart.db.exceptions import DuplicateDataError
 23from chartepssg.alg.settings import scid_to_sid
 24
 25logger = logging.getLogger()
 26
 27# If the `replace` ingestion option is set, refuse to delete existing data if the range of
 28# the new file exceeds REPLACE_THRESHOLD
 29# (otherwise a file with corrupt metadata might cause us to delete years of data)
 30REPLACE_THRESHOLD = timedelta(hours=25)
 31
 32# Time decoder
 33TIME_DECODER = '%Y%m%d%H%M%S'
 34
 35# store each manoeuvre data line
 36Manoeuvre = namedtuple('Manoeuvre', 'exec_time '
 37                                    'file_gen_time '
 38                                    'optim_time '
 39                                    'mode '
 40                                    'size '
 41                                    'pso ')
 42
 43# filename example:
 44# SGA1_FDP_FDS__OPE_MANO_PRED__G20220615123845Z_S20220622150000Z_E20220622150000Z.txt
 45FILENAME_MATCHER = re.compile(
 46    r'^(?P<scid>[a-zA-Z0-9]+)_'
 47    r'[\w]+_MANO_PRED__'
 48    r'G[0-9]+Z_'
 49    r'S(?P<start>[0-9]+)Z_'
 50    r'E(?P<stop>[0-9]+)Z'
 51    r'.txt$'
 52)
 53
 54def fileattr(filename):
 55    """Given a `filename` return a tuple of SID, sensing start time.
 56
 57    They look like:
 58    SGA1_FDP_FDS__OPE_MANO_PRED__G20220615123845Z_S20220622150000Z_E20220622150000Z.txt
 59    """
 60
 61    match = FILENAME_MATCHER.match(filename.name)
 62    if not match:
 63        raise ValueError('File name {f} not recognised as Manoeuvre Prediction product'.format(
 64            f=filename))
 65
 66    groups = match.groupdict()
 67
 68    return (scid_to_sid(groups['scid']),
 69            datetime.strptime(groups['start'], TIME_DECODER),
 70            datetime.strptime(groups['stop'], TIME_DECODER))
 71
 72
 73EPOCH_FORMAT = '%Y/%m/%d-%H:%M:%S.%f'
 74
 75EPOCH_LINE_MATCHER = re.compile(
 76    r'^(?P<epoch>[\S]+).*$'
 77)
 78
 79MAN_LINE_MATCHER = re.compile(
 80    r'^(?P<epoch>[\S]+)[\s]+'
 81     r'(?P<mode>.+)[\s]+'
 82     r'(?P<size>[-+]?(?:\d*\.\d+|\d+))[\s]+'
 83     r'(?P<pso>[-+]?(?:\d*\.\d+|\d+))$'
 84)
 85
 86def parse_manoeuvres(filename):
 87    """Yield a list of manoeuvre prediction(s) from `filename`.
 88    """
 89    file_gen_time = None
 90    optim_time = None
 91
 92    i = 0
 93    with open(filename, 'r') as f:
 94        for line in f:
 95            i = i + 1
 96
 97            # File generation time
 98            if i == 2:
 99                match = EPOCH_LINE_MATCHER.match(line)
100                if not match:
101                    raise ValueError('Line {} at {} does not match value File generation time'.format(line, i))
102
103                groups = match.groupdict()
104                file_gen_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
105                continue
106
107               # File generation time
108            if i == 3:
109                match = EPOCH_LINE_MATCHER.match(line)
110                if not match:
111                    raise ValueError('Line {} at {} does not match value Manoeuvre optimisation time'.format(line, i))
112
113                groups = match.groupdict()
114                optim_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
115                continue
116
117            if i > 4:
118                match = MAN_LINE_MATCHER.match(line)
119                if not match:
120                    raise ValueError('Line {} at {} does not match Manoeuvre details'.format(line, i))
121
122                groups = match.groupdict()
123                exec_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
124                mode = groups['mode'].strip()
125                size = float(groups['size'])
126                pso = float(groups['pso'])
127
128                res = Manoeuvre(exec_time=exec_time,
129                    file_gen_time=file_gen_time,
130                    optim_time=optim_time,
131                    mode=mode,
132                     size=size,
133                    pso=pso)
134
135                logger.info('Parsed item: {item}'.format(item=res))
136                yield res
137
138
139def ingest_manoeuvres(filename, sid, table):
140    """Insert Manoeuvre Prediction objects from `source` into `table`."""
141
142    # setup insert cursor
143    ins_cur = ts.insert(table=table,
144                        fields=['SENSING_TIME'] +\
145                        SID.insert_fields +\
146                        ['PRODUCT',
147                            'FILE_GEN_TIME',
148                            'OPTIM_TIME',
149                            'MODE',
150                            'SIZE',
151                            'PSO'])
152
153    cc = 0
154    first = None
155    last = None
156    for manvr in parse_manoeuvres(filename):
157        if first is None:
158            first = manvr.exec_time
159
160        last = manvr.exec_time
161        try:
162            ins_cur.execute(None,
163                            [manvr.exec_time] +\
164                            sid.bind_insert() +\
165                            [1,
166                             manvr.file_gen_time,
167                             manvr.optim_time,
168                             manvr.mode,
169                             manvr.size,
170                             manvr.pso])
171        except DuplicateDataError:
172            ins_cur.rollback()
173            # this is a guess but it's the most likely reason for a fail
174            # it might be slightly faster to cache UIDs in code and remove duplicates within
175            # each file first
176            logger.info('Duplicate entry found on {time}: removing old entry'.format(time=manvr.exec_time))
177            ts.delete(sid=sid,
178                    table=table,
179                    sensing_start=manvr.exec_time,
180                    sensing_stop=manvr.exec_time,
181                    inclusive=True)
182
183            ins_cur.execute(None,
184                            [manvr.exec_time] +\
185                            sid.bind_insert() +\
186                            [1,
187                            manvr.file_gen_time,
188                             manvr.optim_time,
189                             manvr.mode,
190                             manvr.size,
191                             manvr.pso])
192
193        # This probably kills performance but when the postgres driver hits a
194        # duplicate data error it puts the connection into an invalid state and we have to rollback,
195        # removing all non-committed entries
196        ins_cur.commit()
197        cc += 1
198
199    ins_cur.commit()
200
201    logger.info('Ingested {cc} entries from {start} to {stop}'.format(
202        cc=cc, start=first, stop=last))
203
204
205def main():
206    """Command line entry point"""
207    wo, resultfile, _ = init_algorithm()
208
209    table=TableInfo('MANOEUVRE_PREDICTION')
210    for job in wo.read_jobs():
211        sid, start, stop = fileattr(job.filename)
212
213        ingest_manoeuvres(job.filename, sid, table)
214        job.status = JobStatus.COMPLETED
215
216        stop = stop+timedelta(microseconds=1)
217
218        tables=[{'table': table,
219                'sensing_start': start,
220                'sensing_stop': stop,
221                'sid': sid} ]
222        resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
223
224    logger.info('Manoeuvre Prediction file ingestion complete')
225
226
227if __name__ == '__main__':
228    main()