1#!/usr/bin/env python3
  2
  3"""Ingester module for the Manoeuvre History file type.
  4This is an ASCII file which contains a number of Accelerations data,
  5as shown in this sample below:
  6
  72016/02/22-09:29:35.000    -0.32319361D-10-0.56919703D-06 0.00000000D+00    1
  82016/02/22-09:30:06.623    -0.32319361D-10-0.56919703D-06 0.00000000D+00    0
  92016/02/22-12:10:00.000    -0.33487538D-09-0.56910436D-06 0.00000000D+00    1
 102016/02/22-12:10:31.629    -0.33487538D-09-0.56910436D-06 0.00000000D+00    0
 11
 12"""
 13
 14import re
 15import logging
 16from datetime import datetime
 17from datetime import timedelta
 18from collections import namedtuple
 19
 20from chart.project import SID
 21from chart.alg import init_algorithm
 22from chart.db.model.table import TableInfo
 23from chart.db import ts
 24from chart.alg import JobStatus
 25from chart.db.exceptions import DuplicateDataError
 26from chartepssg.alg.settings import scid_to_sid
 27
 28logger = logging.getLogger()
 29
 30# If the `replace` ingestion option is set, refuse to delete existing data if the range of
 31# the new file exceeds REPLACE_THRESHOLD
 32# (otherwise a file with corrupt metadata might cause us to delete years of data)
 33REPLACE_THRESHOLD = timedelta(hours=25)
 34
 35# Time decoder
 36TIME_DECODER = '%Y%m%d%H%M%S'
 37
 38# store each manoeuvre data line
 39Manoeuvre = namedtuple('Manoeuvre', 'epoch '
 40                                    'acc_comp_1 '
 41                                    'acc_comp_2 '
 42                                    'acc_comp_3 '
 43                                    'flag')
 44
 45# filename example:
 46# SGA1_FDP_FDS__OPE_MANO_HIST__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
 47FILENAME_MATCHER = re.compile(
 48    r'^(?P<scid>[a-zA-Z0-9]+)_'
 49    r'[\w]+_MANO_HIST__'
 50    r'G[0-9]+Z_'
 51    r'S(?P<start>[0-9]+)Z_'
 52    r'E(?P<stop>[0-9]+)Z'
 53    r'.txt$'
 54)
 55
 56
 57def fileattr(filename):
 58    """Given a `filename` return a tuple of SID, sensing start time.
 59
 60    They look like:
 61
 62    SGA1_FDP_FDS__OPE_MANO_HIST__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
 63    """
 64
 65    match = FILENAME_MATCHER.match(filename.name)
 66    if not match:
 67        raise ValueError('File name {f} not recognised as Manoeuvre History product'.format(
 68            f=filename))
 69
 70    groups = match.groupdict()
 71
 72    return (scid_to_sid(groups['scid']),
 73            datetime.strptime(groups['start'], TIME_DECODER),
 74            datetime.strptime(groups['stop'], TIME_DECODER))
 75
 76
 77def to_float(token):
 78    """ Converts a literal string `token` containing FORTRAN double
 79        to float
 80    """
 81    token = token.strip()
 82    token = token.replace('D', 'E')
 83    return float(token)
 84
 85
 86EPOCH_FORMAT = '%Y/%m/%d-%H:%M:%S.%f'
 87EPOCH_SIZE = 23
 88ACC_COMP_SIZE = 15
 89FLAG_SIZE = 4
 90
 91
 92def parse_manoeuvres(filename):
 93    """Yield a list of manoeuvres accelerations from `filename`.
 94    """
 95
 96    with filename.open() as f:
 97
 98        first = False
 99        for line in f:
100
101            # jump first line! it's just header info contain SCID and start time
102            # which is already encoded in the filename
103            if not first:
104                first = True
105                continue
106
107            # if not first carry on
108            line = line.strip().replace('\r', '').replace('\n', '')
109
110            # EPOCH
111            start = 0
112            end = EPOCH_SIZE
113            epoch = datetime.strptime(line[start:end], EPOCH_FORMAT)
114
115            # ACC1
116            start = EPOCH_SIZE + 3
117            end = start + ACC_COMP_SIZE + 1
118            acc1 = to_float(line[start: end])
119
120            # ACC2
121            start = end
122            end = start + ACC_COMP_SIZE
123            acc2 = to_float(line[start: end])
124
125            # ACC3
126            start = end
127            end = start + ACC_COMP_SIZE
128            acc3 = to_float(line[start: end])
129
130            # Flag
131            flag = int(line[-1])
132
133            res = Manoeuvre(epoch=epoch,
134                    acc_comp_1=acc1, acc_comp_2=acc2, acc_comp_3=acc3,
135                    flag=flag)
136
137            yield res
138
139
140def ingest_manoeuvres(filename,
141                     table=TableInfo('MANOEUVRE_HISTORY')):
142    """Insert Manoeuvre History objects from `source` into `table`."""
143    sid, start_time, stop_time = fileattr(filename)
144
145    # delete existing manoeuvres
146    # if replace:
147    #    duration = stop_time - start_time
148    #    if duration > REPLACE_THRESHOLD and not force_replace:
149    #        raise ValueError('Refusing to delete replace {d}'.format(d=duration))
150
151    #    del_cnt = ts.delete(sid=sid,
152    #                        table=table,
153    #                        sensing_start=start_time,
154    #                        sensing_stop=stop_time,
155    #                        inclusive=False,
156    #                        commit=True)  # commit needed because we use different
157    # cursors for insert and delete
158
159    # logger.info('Deleted {d} manoeuvres from {t} for {s} from {strt} to {stop}'.format(
160    #        d=del_cnt, t=table.name, s=sid, strt=start_time, stop=stop_time))
161
162    # setup insert cursor
163    ins_cur = ts.insert(table=table,
164                        fields=['SENSING_TIME'] +\
165                        SID.insert_fields +\
166                        ['PRODUCT',
167                            'ACC_COMP_1',
168                            'ACC_COMP_2',
169                            'ACC_COMP_3',
170                            'REC_FLAG'])
171
172    cc = 0
173    first = None
174    last = None
175    for manvr in parse_manoeuvres(filename):
176        if first is None:
177            first = manvr.epoch
178
179        last = manvr.epoch
180        try:
181            ins_cur.execute(None,
182                            [manvr.epoch] +\
183                            sid.bind_insert() +\
184                            [1,
185                             manvr.acc_comp_1,
186                             manvr.acc_comp_2,
187                             manvr.acc_comp_3,
188                             manvr.flag])
189        except DuplicateDataError:
190            ins_cur.rollback()
191            # this is a guess but it's the most likely reason for a fail
192            # it might be slightly faster to cache UIDs in code and remove duplicates within
193            # each file first
194            # logger.info('Duplicate entry found on {time}: removing old entry'.format(time=manvr.epoch))
195            ts.delete(sid=sid,
196                    table=table,
197                    sensing_start=manvr.epoch,
198                    sensing_stop=manvr.epoch,
199                    inclusive=True)
200
201            ins_cur.execute(None,
202                            [manvr.epoch] +\
203                            sid.bind_insert() +\
204                            [1,
205                             manvr.acc_comp_1,
206                             manvr.acc_comp_2,
207                             manvr.acc_comp_3,
208                             manvr.flag])
209
210        # This probably kills performance but when the postgres driver hits a
211        # duplicate data error it puts the connection into an invalid state and we have to rollback,
212        # removing all non-committed entries
213        ins_cur.commit()
214
215        cc += 1
216
217    ins_cur.commit()
218
219    logger.info('Ingested {cc} entries from {start} to {stop}'.format(
220        cc=cc, start=first, stop=last))
221
222
223def main():
224    """Command line entry point"""
225    wo, resultfile, _ = init_algorithm()
226
227    for job in wo.read_jobs():
228        ingest_manoeuvres(job.filename)
229        job.status = JobStatus.COMPLETED
230        resultfile.add_job(job)
231
232    logger.info('Manoeuvre History file ingestion complete')
233
234if __name__ == '__main__':
235    main()