1#!/usr/bin/env python3
2
3"""Ingester module for the Manoeuvre Prediction file type.
4This is an ASCII file usually contains a single Manoevre execution data,
5as shown in this sample below:
6
7First burn 2019/12/11-12:03:58.533 2.091 m/s OOP OCM 17.402 deg
8
9"""
10
11import re
12import logging
13from datetime import datetime
14from datetime import timedelta
15from collections import namedtuple
16
17from chart.project import SID
18from chart.alg import init_algorithm
19from chart.db.model.table import TableInfo
20from chart.db import ts
21from chart.alg import JobStatus
22from chart.db.exceptions import DuplicateDataError
23from chartepssg.alg.settings import scid_to_sid
24
25logger = logging.getLogger()
26
27# If the `replace` ingestion option is set, refuse to delete existing data if the range of
28# the new file exceeds REPLACE_THRESHOLD
29# (otherwise a file with corrupt metadata might cause us to delete years of data)
30REPLACE_THRESHOLD = timedelta(hours=25)
31
32# Time decoder
33TIME_DECODER = '%Y%m%d%H%M%S'
34
35# store each manoeuvre data line
36Manoeuvre = namedtuple('Manoeuvre', 'exec_time '
37 'file_gen_time '
38 'optim_time '
39 'mode '
40 'size '
41 'pso ')
42
43# filename example:
44# SGA1_FDP_FDS__OPE_MANO_PRED__G20220615123845Z_S20220622150000Z_E20220622150000Z.txt
45FILENAME_MATCHER = re.compile(
46 r'^(?P<scid>[a-zA-Z0-9]+)_'
47 r'[\w]+_MANO_PRED__'
48 r'G[0-9]+Z_'
49 r'S(?P<start>[0-9]+)Z_'
50 r'E(?P<stop>[0-9]+)Z'
51 r'.txt$'
52)
53
54def fileattr(filename):
55 """Given a `filename` return a tuple of SID, sensing start time.
56
57 They look like:
58 SGA1_FDP_FDS__OPE_MANO_PRED__G20220615123845Z_S20220622150000Z_E20220622150000Z.txt
59 """
60
61 match = FILENAME_MATCHER.match(filename.name)
62 if not match:
63 raise ValueError('File name {f} not recognised as Manoeuvre Prediction product'.format(
64 f=filename))
65
66 groups = match.groupdict()
67
68 return (scid_to_sid(groups['scid']),
69 datetime.strptime(groups['start'], TIME_DECODER),
70 datetime.strptime(groups['stop'], TIME_DECODER))
71
72
73EPOCH_FORMAT = '%Y/%m/%d-%H:%M:%S.%f'
74
75EPOCH_LINE_MATCHER = re.compile(
76 r'^(?P<epoch>[\S]+).*$'
77)
78
79MAN_LINE_MATCHER = re.compile(
80 r'^(?P<epoch>[\S]+)[\s]+'
81 r'(?P<mode>.+)[\s]+'
82 r'(?P<size>[-+]?(?:\d*\.\d+|\d+))[\s]+'
83 r'(?P<pso>[-+]?(?:\d*\.\d+|\d+))$'
84)
85
86def parse_manoeuvres(filename):
87 """Yield a list of manoeuvre prediction(s) from `filename`.
88 """
89 file_gen_time = None
90 optim_time = None
91
92 i = 0
93 with open(filename, 'r') as f:
94 for line in f:
95 i = i + 1
96
97 # File generation time
98 if i == 2:
99 match = EPOCH_LINE_MATCHER.match(line)
100 if not match:
101 raise ValueError('Line {} at {} does not match value File generation time'.format(line, i))
102
103 groups = match.groupdict()
104 file_gen_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
105 continue
106
107 # File generation time
108 if i == 3:
109 match = EPOCH_LINE_MATCHER.match(line)
110 if not match:
111 raise ValueError('Line {} at {} does not match value Manoeuvre optimisation time'.format(line, i))
112
113 groups = match.groupdict()
114 optim_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
115 continue
116
117 if i > 4:
118 match = MAN_LINE_MATCHER.match(line)
119 if not match:
120 raise ValueError('Line {} at {} does not match Manoeuvre details'.format(line, i))
121
122 groups = match.groupdict()
123 exec_time = datetime.strptime(groups['epoch'], EPOCH_FORMAT)
124 mode = groups['mode'].strip()
125 size = float(groups['size'])
126 pso = float(groups['pso'])
127
128 res = Manoeuvre(exec_time=exec_time,
129 file_gen_time=file_gen_time,
130 optim_time=optim_time,
131 mode=mode,
132 size=size,
133 pso=pso)
134
135 logger.info('Parsed item: {item}'.format(item=res))
136 yield res
137
138
139def ingest_manoeuvres(filename, sid, table):
140 """Insert Manoeuvre Prediction objects from `source` into `table`."""
141
142 # setup insert cursor
143 ins_cur = ts.insert(table=table,
144 fields=['SENSING_TIME'] +\
145 SID.insert_fields +\
146 ['PRODUCT',
147 'FILE_GEN_TIME',
148 'OPTIM_TIME',
149 'MODE',
150 'SIZE',
151 'PSO'])
152
153 cc = 0
154 first = None
155 last = None
156 for manvr in parse_manoeuvres(filename):
157 if first is None:
158 first = manvr.exec_time
159
160 last = manvr.exec_time
161 try:
162 ins_cur.execute(None,
163 [manvr.exec_time] +\
164 sid.bind_insert() +\
165 [1,
166 manvr.file_gen_time,
167 manvr.optim_time,
168 manvr.mode,
169 manvr.size,
170 manvr.pso])
171 except DuplicateDataError:
172 ins_cur.rollback()
173 # this is a guess but it's the most likely reason for a fail
174 # it might be slightly faster to cache UIDs in code and remove duplicates within
175 # each file first
176 logger.info('Duplicate entry found on {time}: removing old entry'.format(time=manvr.exec_time))
177 ts.delete(sid=sid,
178 table=table,
179 sensing_start=manvr.exec_time,
180 sensing_stop=manvr.exec_time,
181 inclusive=True)
182
183 ins_cur.execute(None,
184 [manvr.exec_time] +\
185 sid.bind_insert() +\
186 [1,
187 manvr.file_gen_time,
188 manvr.optim_time,
189 manvr.mode,
190 manvr.size,
191 manvr.pso])
192
193 # This probably kills performance but when the postgres driver hits a
194 # duplicate data error it puts the connection into an invalid state and we have to rollback,
195 # removing all non-committed entries
196 ins_cur.commit()
197 cc += 1
198
199 ins_cur.commit()
200
201 logger.info('Ingested {cc} entries from {start} to {stop}'.format(
202 cc=cc, start=first, stop=last))
203
204
205def main():
206 """Command line entry point"""
207 wo, resultfile, _ = init_algorithm()
208
209 table=TableInfo('MANOEUVRE_PREDICTION')
210 for job in wo.read_jobs():
211 sid, start, stop = fileattr(job.filename)
212
213 ingest_manoeuvres(job.filename, sid, table)
214 job.status = JobStatus.COMPLETED
215
216 stop = stop+timedelta(microseconds=1)
217
218 tables=[{'table': table,
219 'sensing_start': start,
220 'sensing_stop': stop,
221 'sid': sid} ]
222 resultfile.add_job(job, JobStatus.COMPLETED, tables=tables)
223
224 logger.info('Manoeuvre Prediction file ingestion complete')
225
226
227if __name__ == '__main__':
228 main()