1#!/usr/bin/env python3
2
3"""Compute synthetic parameters."""
4
5from typing import List
6from typing import Tuple
7from typing import Iterable
8from typing import Callable
9from typing import Dict
10from typing import Union
11from datetime import datetime
12from datetime import timedelta
13from collections import defaultdict
14import json
15import logging
16import runpy
17
18from chart.project import SID
19from chart.project import settings
20from chart.db.model.field import FieldInfo as ParamInfo
21from chart.alg import init_algorithm
22from chart.backend.job import JobStatus
23from chart.alg.pus_synthetics_triggers import reverse_triggers
24from chart.alg.pus_state import build_state
25from chart.db import ts
26from chart.db.model.table import TableInfo
27from chart.backend.result import ResultTable
28from chart.db import ts
29
30
31INPUT_TABLENAME = 'TM'
32OUTPUT_TABLENAME = 'SYNTHETICS'
33
34logger = logging.getLogger()
35
36# Helper class - it's not really needed and all this stuff can be computed
37# on the fly, or stored in function attributes, or using classes, but this class seems to make
38# some bits of this module a bit simpler
39class SynthFunctionInfo:
40 def __init__(self,
41 output:str,
42 inputs:Iterable[str],
43 function:Callable):
44 self.output = output
45 self.inputs = inputs
46 self.function = function
47
48 def __str__(self):
49 return '{fn}({ins})'.format(
50 out=self.output,
51 ins=', '.join(self.inputs),
52 fn=self.function.__name__)
53
54 def __hash__(self):
55 # we get stored in `fail_states` below
56 return hash(self.output)
57
58
59def cal(sid:SID, param:str, raw:Union[int, float]) -> float:
60 """Quick hack function to calibrate a value."""
61 # Next release we upgrade the `state` structure to use
62 # a custom class that implements `.raw` and `.eng` properties
63 # to the synthetics functions can access them more naturally
64 # and using a syntax closer to the original code
65
66 # This is also where conditional calibrations need to be handled
67 return cal.tm.fields[param].cal[SID(sid.cal_name())].calibrate_value(raw)
68
69# quick optimisation to avoid calling TableInfo constructor too much
70cal.tm = TableInfo('TM')
71
72
73def filter_params_by_storage(params:Iterable[str],
74 storage:TableInfo) -> Iterable[ParamInfo]:
75 """Yield only the parameters stored in table `storage`, discarding others."""
76 for param in params:
77 if param in storage.params:
78 yield param
79
80
81def call_function(sid:SID,
82 function_info:SynthFunctionInfo,
83 state:Dict[str, object]):# -> Dict[str, object]:
84 """Call a synthetic function with a machine state, return updated values.
85
86 If any required input synthetic values are missing, then add them
87 using recursive calls."""
88 args = [sid]
89 for inp in function_info.inputs:
90 if inp in state:
91 args.append(state[inp])
92
93 else:
94 # require value is missing so we try to fill it
95 if inp not in reverse_triggers:
96 # this is a bit dangerous and could hide real problems
97 # it's also necessary for the case of computing synthetics
98 # near the start of mission where some raw values are genuinely missing
99 logger.warning('Not calling {fn} due to missing input {inp}'.format(
100 fn=function_info.function.__name__, inp=inp))
101 return
102
103 recursive_call = reverse_triggers[inp]
104 new_value = call_function(sid, recursive_call, state)
105 if new_value is None:
106 return None
107
108 args.append(new_value)
109
110 res = function_info.function(*args)
111 state[function_info.output] = res
112 return res
113
114
115def compute_synthetics(
116 sid,
117 start_time,
118 stop_time) -> Iterable[Dict]:
119 # experiment - see if we can just walk back 20 minutes and start processing from there
120 # in order to capture initial values
121 # We probably need to switch to either stats or a new satellite state table
122 # in order to scan back much further
123 # input_time_buffer = timedelta(minutes=20)
124 # For the results
125 source_table = TableInfo(INPUT_TABLENAME) # this is too slow but fast_load
126 # prevents the list of parameter names from being loaded
127 output_table = TableInfo(OUTPUT_TABLENAME)
128
129 # Keep track of the current state of all telemetry parameters and synthetics
130 # import chartmtg.alg.synthetic_functions
131
132 triggers = build_triggers()
133 state = build_state(
134 source_table,
135 sid,
136 start_time,
137 filter_params_by_storage(triggers.keys(), source_table))[0] # take the values only
138
139 # Current batch of synthetics to be inserted into a single output row
140 new_params = {} # synthetic name : value
141
142 # Declare here so the nested function flush() has visibility
143 sensing_time = None
144
145 # delete
146 logger.info('Deleting {sid} from {strt} to {stop} for {tbl}'.format(
147 sid=sid, strt=start_time, stop=stop_time, tbl=output_table.storage_table))
148 ts.delete(sid=sid,
149 sensing_start=start_time,
150 sensing_stop=stop_time,
151 table=output_table.storage_table,
152 commit=True)
153
154 ins_cursor = ts.insert(
155 table=output_table.storage_table,
156 fields=sid.insert_fields + ['SENSING_TIME', 'PARAMS'])
157
158 # Take a single 'row' of new synthetic parameters and write to database
159 def flush():
160 if len(new_params) > 0:
161 ins_cursor.execute(None, sid.bind_insert() + [sensing_time, json.dumps(new_params)])
162 new_params.clear()
163
164 # We walk through every packet in the time range, updating state as we go
165 # and for each parameter which is a synthetics input, call the relevant functions.
166 # Every time sensing time changes and we have new synthetics values flush them to
167 # database as a single row with one timestamp
168 old_sensing_time = None
169 total_packets = 0
170 used_packets = 0
171 output_rows = 0
172
173 # keep track of which parameter's functions are in fail state and not available
174 # as inputs until the state is cleared
175 fail_states:set[str] = set()
176
177 for sensing_time, payload in ts.select(sid=sid,
178 sensing_start=start_time,
179 sensing_stop=stop_time,
180 table=source_table.storage_table,
181 fields=('SENSING_TIME', source_table.storage_column)):
182 total_packets += 1
183 # Make a list of any functions triggered by this row
184 trigger_fns = set()
185
186 # debug only
187 changed = {}
188
189 # Find for all trigger parameters in the packet
190 # This is a slow but simple way to find parameters which are also triggers
191 # set.intersect() would do it much faster
192 for param_name in payload.keys():
193 if param_name in triggers:
194 # Optional optimisation - if a parameter is received but it's the same
195 # value as last time, we could ignore it.
196 # This means the plots will look a bit different because users won't
197 # see a row of dots confirming the values were re-computed
198 # if state[param_name] == payload[param_name]:
199 # continue
200
201 # We found a parameter in this packet which is a synthetics trigger
202 # Record the (potentially updated) value
203 state[param_name] = payload[param_name]
204 changed[param_name] = payload[param_name]
205
206 # Queue all triggered functions for execution
207 # (we don't actually run them until all the other parameters in the packet
208 # have been considered)
209 for fn in triggers[param_name]:
210 trigger_fns.add(fn)
211
212 # logging.debug('setting {p} in state from {ov} to {nv}'.format(
213 # p=param_name, ov=state[param_name], nv=payload[param_name]))
214
215 if len(changed) == 0:
216 continue
217
218 used_packets += 1
219
220 # logging.debug('Packet at {tim} updated raw values {changed} initial triggers {trig}'.format(
221 # tim=sensing_time,
222 # changed=', '.join('{k}:{v}'.format(k=k, v=v) for k, v in changed.items()),
223 # trig=', '.join(t.function.__name__ for t in trigger_fns)))
224
225 # Just an optimisation
226 # if len(trigger_fns) == 0:
227 # continue
228
229 # Now we call the triggered functions.
230 # This is where we should be sorting them based on a tree of dependencies
231 while len(trigger_fns) > 0:
232 fn = trigger_fns.pop()
233
234 # logging.debug('Calling {fn} needs {inp}'.format(
235 # fn=fn.function.__name__, inp=fn.inputs))
236
237 try:
238 new_value = call_function(sid, fn, state)
239 except ValueError as e:
240 if fn.output not in fail_states:
241 logging.warning('Parameter {p} has entered fail state at {when}: ({e})'.format(
242 when=sensing_time, p=fn.output, e=e))
243 fail_states.add(fn.output)
244
245 continue
246
247 if fn.output in fail_states:
248 logging.info('Parameter {p} has left fail state at {when}'.format(
249 p=fn.output, when=sensing_time))
250
251 new_params[fn.output] = new_value
252
253 for re_fn in triggers[fn.output]:
254 # logging.debug('retrigger {r}'.format(r=re_fn))
255 trigger_fns.add(re_fn)
256
257 # Record the updated satellite status
258 # state[fn.output].update(updates)
259
260 # Buffer new parameters to write all synthetics with the same timestamp
261 # to a database row
262 # new_params[fn.output].update(updates)
263
264 # We could have multiple TM packets with the same timestamp, but the output
265 # synthetics table should only have one row per timestamp. We accumulate new
266 # synthetics values until sensing time changes
267 if sensing_time != old_sensing_time and len(new_params) > 0:
268 flush()
269 output_rows += 1
270
271 old_sensing_time = sensing_time
272
273 if len(new_params) > 0:
274 flush()
275 output_rows += 1
276
277 logger.info('Read {tot} packets used {used} wrote {out} synthetics rows'.format(
278 tot=total_packets, used=used_packets, out=output_rows))
279
280 return [ResultTable(table=output_table, sensing_start=start_time,sensing_stop=stop_time,
281 sid=sid)]
282
283
284def build_triggers():
285 """Create triggers (input:output) as a reverse of reverse_triggers (output:input)."""
286 triggers = defaultdict(list)
287 for output, sfi in reverse_triggers.items():
288 for inp in sfi.inputs:
289 triggers[inp].append(sfi)
290
291 return triggers
292
293# def is_valid(field:ParamInfo, state) -> bool:
294 # """Handle normal and conditional validity."""
295 # return True
296
297def synthetic(output:str, inputs:Iterable[str]) -> callable:
298 """Decorator to mark a function as one that computes synthetics.
299
300 `inputs` lists the input telemetry and synthetic parameters.
301 Values are supplied raw and the function must call `calibrate` if needed.
302 `output` names the single output value
303
304 A function entry is added to the reversed_triggers data structure.
305 """
306 def decorator(user_function):
307 reverse_triggers[output] = SynthFunctionInfo(output=output,
308 inputs=inputs,
309 function=user_function)
310 return user_function
311
312 return decorator
313
314
315def dispatch(wo, resultfile, _): # (unused var) pylint: disable=W0613
316 """We have been called by the dispatcher tool."""
317 for job in wo.read_jobs():
318 # Initialise the global cross-module reverse_triggers structure
319 # with our project specific synthetics functions
320 # functions_file = 'chartmtg/alg/synthetic_functions.py'
321 functions_file = settings.PROJECT_ROOT_DIR.joinpath(job.activity.configuration.functions_file)
322 if functions_file.is_file():
323 runpy.run_path(str(functions_file), init_globals=globals())
324
325 else:
326 raise ValueError('{fn} not found'.format(fn=functions_file))
327
328 tables = compute_synthetics(job.sid, job.sensing_start, job.sensing_stop)
329 resultfile.add_job(job, JobStatus.COMPLETED, tables)
330
331
332def main():
333 """Command line entry point."""
334 try:
335 # run from inside dispatcher
336 dispatch(*init_algorithm())
337 return
338
339 except init_algorithm.NotDispatcher:
340 # run as standalone tool
341 pass
342
343 from chart.common.args import ArgumentParser
344 parser = ArgumentParser()
345 parser.add_argument('--db', '--conn', '-d',
346 metavar='CONNECTION',
347 help='Use database connection CONNECTION')
348 parser.add_argument('--sid', '-s',
349 type=ArgumentParser.sid,
350 help='Source ID to query for')
351 parser.add_argument('--start', '--start-time',
352 type=ArgumentParser.start_time,
353 help='Start of time range')
354 parser.add_argument('--stop', '--stop-time',
355 type=ArgumentParser.stop_time,
356 help='Stop of time range')
357 parser.add_argument('--show-triggers',
358 help='Show network of parameters triggering other parameters',
359 action='store_true')
360 parser.add_argument('--show-state',
361 help=('Show satellite state featuring all source parameters at given '
362 'start time'),
363 action='store_true')
364 args = parser.parse_args()
365 # needs updating to use activity.configuration
366 import chartmtg.alg.synthetic_functions
367 from chart.common.prettyprint import Table
368 t = Table(headings=('Trigger', 'Output', 'Inputs', 'Function'))
369 if args.show_triggers:
370 triggers = build_triggers()
371 for trigger, sfis in triggers.items():
372 for sfi in sfis:
373 t.append((trigger, sfi.output, ', '.join(sfi.inputs), sfi.function.__name__ + '()'))
374
375 t.write()
376
377 parser.exit()
378
379 if args.db:
380 settings.set_db_name(args.db)
381
382 if args.show_state:
383 triggers = build_triggers()
384 source_params = triggers.keys()
385 state = build_state(args.sid, args.start, source_params)[0]
386 print('state', state)
387 parser.exit()
388
389 compute_synthetics(args.sid, args.start, args.stop)
390
391if __name__ == '__main__':
392 main()