1#!/usr/bin/env python3
2
3"""Ingester module for the Satellite Mass Report file type.
4This is an ASCII file which contains a single report data as shown in this sample below:
5
6 2023-06-21T16:00:00 CALCULATION DATE
7 1.544 XS CO-ORDINATE (METER)
8 -0.007 YS CO-ORDINATE (METER)
9 0.037 ZS CO-ORDINATE (METER)
10 1349.000 TOTAL S/C MASS (KG)
11 229.400 FUEL MASS (KG)
12 785.4 INERTIA XX (M2.KG)
13 2261.2 INERTIA YY (M2.KG)
14 2577.2 INERTIA ZZ (M2.KG)
15 -2.5 INERTIA XY (M2.KG)
16 -2.5 INERTIA YZ (M2.KG)
17 156.5 INERTIA ZX (M2.KG)
18"""
19
20import re
21import logging
22from datetime import datetime
23from collections import namedtuple
24
25from chart import settings
26from chart.project import SID
27from chart.alg import init_algorithm
28from chart.db.model.table import TableInfo
29from chart.db import ts
30from chart.alg import JobStatus
31from chart.db.exceptions import DuplicateDataError
32from chartepssg.alg.settings import scid_to_sid
33
34
35logger = logging.getLogger()
36
37
38# Time decoder
39TIME_DECODER = '%Y%m%d%H%M%S'
40
41# store each manoeuvre data line
42Report = namedtuple('Report', 'calc_date '
43 'xs '
44 'ys '
45 'zs '
46 'total_mass '
47 'fuel_mass '
48 'i_xx '
49 'i_yy '
50 'i_zz '
51 'i_xy '
52 'i_yz '
53 'i_zx ')
54
55# filename example:
56# SGA1_FDP_FDS__OPE_MASS_REPO__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
57FILENAME_MATCHER = re.compile(
58 r'^(?P<scid>[a-zA-Z0-9]+)_'
59 r'[\w]+_MASS_REPO__'
60 r'G[0-9]+Z_'
61 r'S(?P<start>[0-9]+)Z_'
62 r'E(?P<stop>[0-9]+)Z'
63 r'.txt$'
64)
65
66
67def fileattr(filename):
68 """Given a `filename` return a tuple of SID, sensing start time.
69
70 They look like:
71 SGA1_FDP_FDS__OPE_MASS_REPO__GYYYYMMDDhhmmssZ_SYYYYMMDDhhmmssZ_EYYYYMMDDhhmmssZ.txt
72 """
73
74 match = FILENAME_MATCHER.match(filename.name)
75 if not match:
76 raise ValueError('File name {f} not recognised as Satellite Mass Report product'.format(
77 f=filename))
78
79 groups = match.groupdict()
80
81 return (
82 scid_to_sid(groups["scid"]),
83 datetime.strptime(groups['start'], TIME_DECODER),
84 datetime.strptime(groups['stop'], TIME_DECODER))
85
86
87CALC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
88
89
90def first_token(line):
91 """Extracts the first string token of a `line` within a text file
92 """
93 line = line.strip().replace('\r', '').replace('\n', '')
94 tokens = line.split()
95 return tokens[0]
96
97
98def parse_report(filename):
99 """Return a Satellite Mass Report data from `filename`.
100 """
101
102 with open(filename, 'r') as f:
103
104 lines = f.readlines()
105
106 # calculation date
107 calc_date = datetime.strptime(first_token(lines[1]), CALC_DATE_FORMAT)
108
109 # XS
110 xs = float(first_token(lines[2]))
111
112 # YS
113 ys = float(first_token(lines[3]))
114
115 # ZS
116 zs = float(first_token(lines[4]))
117
118 # total mass
119 total_mass = float(first_token(lines[5]))
120
121 # fuel mass
122 fuel_mass = float(first_token(lines[6]))
123
124 # inertia xx
125 i_xx = float(first_token(lines[7]))
126
127 # inertia yy
128 i_yy = float(first_token(lines[8]))
129
130 # inertia zz
131 i_zz = float(first_token(lines[9]))
132
133 # inertia xy
134 i_xy = float(first_token(lines[10]))
135
136 # inertia yz
137 i_yz = float(first_token(lines[11]))
138
139 # inertia zx
140 i_zx = float(first_token(lines[12]))
141
142 res = Report(calc_date=calc_date,
143 xs=xs, ys=ys, zs=zs,
144 total_mass=total_mass, fuel_mass=fuel_mass,
145 i_xx=i_xx, i_yy=i_yy, i_zz=i_zz,
146 i_xy=i_xy, i_yz=i_yz, i_zx=i_zx)
147
148 logger.info('Parsed item: {item}'.format(item=res))
149 return res
150
151
152def ingest_report(filename, table=TableInfo('SAT_MASS_REPORT')):
153 """Insert Satellite Mass Report data from `source` into `table`."""
154
155 sid, start_time, stop_time = fileattr(filename)
156
157 # setup insert cursor
158 ins_cur = ts.insert(table=table,
159 fields=['SENSING_TIME'] +\
160 SID.insert_fields +\
161 ['PRODUCT',
162 'XS',
163 'YS',
164 'ZS',
165 'TOTAL_MASS',
166 'FUEL_MASS',
167 'INERTIA_XX',
168 'INERTIA_YY',
169 'INERTIA_ZZ',
170 'INERTIA_XY',
171 'INERTIA_YZ',
172 'INERTIA_ZX'])
173
174 report = parse_report(filename)
175
176 try:
177 ins_cur.execute(None,
178 [report.calc_date] +\
179 sid.bind_insert() +\
180 [1,
181 report.xs,
182 report.ys,
183 report.zs,
184 report.total_mass,
185 report.fuel_mass,
186 report.i_xx,
187 report.i_yy,
188 report.i_zz,
189 report.i_xy,
190 report.i_yz,
191 report.i_zx])
192
193 except DuplicateDataError:
194 ins_cur.rollback()
195 # If the entry key already exists, delete it and then insert it
196 logger.info('Duplicate entry found on {time}: removing old entry'.format(time=report.calc_date))
197 ts.delete(sid=sid,
198 table=table,
199 sensing_start=report.calc_date,
200 sensing_stop=report.calc_date,
201 inclusive=True)
202
203 ins_cur.execute(None,
204 [report.calc_date] +\
205 sid.bind_insert() +\
206 [1,
207 report.xs,
208 report.ys,
209 report.zs,
210 report.total_mass,
211 report.fuel_mass,
212 report.i_xx,
213 report.i_yy,
214 report.i_zz,
215 report.i_xy,
216 report.i_yz,
217 report.i_zx])
218
219 ins_cur.commit()
220
221 logger.info('Ingested Satellite Mass Report from {start}'.format(start=start_time))
222
223
224def main():
225 """Command line entry point"""
226 wo, resultfile, _ = init_algorithm()
227
228 for job in wo.read_jobs():
229 ingest_report(job.filename)
230 job.status = JobStatus.COMPLETED
231 resultfile.add_job(job)
232
233if __name__ == '__main__':
234 main()