1#!/usr/bin/env python3
2
3"""Ingester module for the Manoeuvre History file type.
4This is an ASCII file which contains a number of Accelerations data,
5as shown in this sample below:
6
72016/02/22-09:29:35.000 -0.32319361D-10-0.56919703D-06 0.00000000D+00 1
82016/02/22-09:30:06.623 -0.32319361D-10-0.56919703D-06 0.00000000D+00 0
92016/02/22-12:10:00.000 -0.33487538D-09-0.56910436D-06 0.00000000D+00 1
102016/02/22-12:10:31.629 -0.33487538D-09-0.56910436D-06 0.00000000D+00 0
11
12"""
13
14import re
15import logging
16from datetime import datetime
17from datetime import timedelta
18from collections import namedtuple
19
20from chart.project import SID
21from chart.alg import init_algorithm
22from chart.db.model.table import TableInfo
23from chart.db import ts
24from chart.alg import JobStatus
25from chart.db.exceptions import DuplicateDataError
26from chartepssg.alg.settings import scid_to_sid
27
28logger = logging.getLogger()
29
30# If the `replace` ingestion option is set, refuse to delete existing data if the range of
31# the new file exceeds REPLACE_THRESHOLD
32# (otherwise a file with corrupt metadata might cause us to delete years of data)
33REPLACE_THRESHOLD = timedelta(hours=25)
34
35# Time decoder
36TIME_DECODER = '%Y%m%d%H%M%S'
37
38# store each manoeuvre data line
39Manoeuvre = namedtuple('Manoeuvre', 'epoch '
40 'acc_comp_1 '
41 'acc_comp_2 '
42 'acc_comp_3 '
43 'flag')
44
45# filename example:
46# SGA1_FDP_FDS__OPE_MANO_HIST__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
47FILENAME_MATCHER = re.compile(
48 r'^(?P<scid>[a-zA-Z0-9]+)_'
49 r'[\w]+_MANO_HIST__'
50 r'G[0-9]+Z_'
51 r'S(?P<start>[0-9]+)Z_'
52 r'E(?P<stop>[0-9]+)Z'
53 r'.txt$'
54)
55
56
57def fileattr(filename):
58 """Given a `filename` return a tuple of SID, sensing start time.
59
60 They look like:
61
62 SGA1_FDP_FDS__OPE_MANO_HIST__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
63 """
64
65 match = FILENAME_MATCHER.match(filename.name)
66 if not match:
67 raise ValueError('File name {f} not recognised as Manoeuvre History product'.format(
68 f=filename))
69
70 groups = match.groupdict()
71
72 return (scid_to_sid(groups['scid']),
73 datetime.strptime(groups['start'], TIME_DECODER),
74 datetime.strptime(groups['stop'], TIME_DECODER))
75
76
77def to_float(token):
78 """ Converts a literal string `token` containing FORTRAN double
79 to float
80 """
81 token = token.strip()
82 token = token.replace('D', 'E')
83 return float(token)
84
85
86EPOCH_FORMAT = '%Y/%m/%d-%H:%M:%S.%f'
87EPOCH_SIZE = 23
88ACC_COMP_SIZE = 15
89FLAG_SIZE = 4
90
91
92def parse_manoeuvres(filename):
93 """Yield a list of manoeuvres accelerations from `filename`.
94 """
95
96 with filename.open() as f:
97
98 first = False
99 for line in f:
100
101 # jump first line! it's just header info contain SCID and start time
102 # which is already encoded in the filename
103 if not first:
104 first = True
105 continue
106
107 # if not first carry on
108 line = line.strip().replace('\r', '').replace('\n', '')
109
110 # EPOCH
111 start = 0
112 end = EPOCH_SIZE
113 epoch = datetime.strptime(line[start:end], EPOCH_FORMAT)
114
115 # ACC1
116 start = EPOCH_SIZE + 3
117 end = start + ACC_COMP_SIZE + 1
118 acc1 = to_float(line[start: end])
119
120 # ACC2
121 start = end
122 end = start + ACC_COMP_SIZE
123 acc2 = to_float(line[start: end])
124
125 # ACC3
126 start = end
127 end = start + ACC_COMP_SIZE
128 acc3 = to_float(line[start: end])
129
130 # Flag
131 flag = int(line[-1])
132
133 res = Manoeuvre(epoch=epoch,
134 acc_comp_1=acc1, acc_comp_2=acc2, acc_comp_3=acc3,
135 flag=flag)
136
137 yield res
138
139
140def ingest_manoeuvres(filename,
141 table=TableInfo('MANOEUVRE_HISTORY')):
142 """Insert Manoeuvre History objects from `source` into `table`."""
143 sid, start_time, stop_time = fileattr(filename)
144
145 # delete existing manoeuvres
146 # if replace:
147 # duration = stop_time - start_time
148 # if duration > REPLACE_THRESHOLD and not force_replace:
149 # raise ValueError('Refusing to delete replace {d}'.format(d=duration))
150
151 # del_cnt = ts.delete(sid=sid,
152 # table=table,
153 # sensing_start=start_time,
154 # sensing_stop=stop_time,
155 # inclusive=False,
156 # commit=True) # commit needed because we use different
157 # cursors for insert and delete
158
159 # logger.info('Deleted {d} manoeuvres from {t} for {s} from {strt} to {stop}'.format(
160 # d=del_cnt, t=table.name, s=sid, strt=start_time, stop=stop_time))
161
162 # setup insert cursor
163 ins_cur = ts.insert(table=table,
164 fields=['SENSING_TIME'] +\
165 SID.insert_fields +\
166 ['PRODUCT',
167 'ACC_COMP_1',
168 'ACC_COMP_2',
169 'ACC_COMP_3',
170 'REC_FLAG'])
171
172 cc = 0
173 first = None
174 last = None
175 for manvr in parse_manoeuvres(filename):
176 if first is None:
177 first = manvr.epoch
178
179 last = manvr.epoch
180 try:
181 ins_cur.execute(None,
182 [manvr.epoch] +\
183 sid.bind_insert() +\
184 [1,
185 manvr.acc_comp_1,
186 manvr.acc_comp_2,
187 manvr.acc_comp_3,
188 manvr.flag])
189 except DuplicateDataError:
190 ins_cur.rollback()
191 # this is a guess but it's the most likely reason for a fail
192 # it might be slightly faster to cache UIDs in code and remove duplicates within
193 # each file first
194 # logger.info('Duplicate entry found on {time}: removing old entry'.format(time=manvr.epoch))
195 ts.delete(sid=sid,
196 table=table,
197 sensing_start=manvr.epoch,
198 sensing_stop=manvr.epoch,
199 inclusive=True)
200
201 ins_cur.execute(None,
202 [manvr.epoch] +\
203 sid.bind_insert() +\
204 [1,
205 manvr.acc_comp_1,
206 manvr.acc_comp_2,
207 manvr.acc_comp_3,
208 manvr.flag])
209
210 # This probably kills performance but when the postgres driver hits a
211 # duplicate data error it puts the connection into an invalid state and we have to rollback,
212 # removing all non-committed entries
213 ins_cur.commit()
214
215 cc += 1
216
217 ins_cur.commit()
218
219 logger.info('Ingested {cc} entries from {start} to {stop}'.format(
220 cc=cc, start=first, stop=last))
221
222
223def main():
224 """Command line entry point"""
225 wo, resultfile, _ = init_algorithm()
226
227 for job in wo.read_jobs():
228 ingest_manoeuvres(job.filename)
229 job.status = JobStatus.COMPLETED
230 resultfile.add_job(job)
231
232 logger.info('Manoeuvre History file ingestion complete')
233
234if __name__ == '__main__':
235 main()