1#!/usr/bin/env python3
  2
  3"""Compute synthetic parameters."""
  4
  5from typing import List
  6from typing import Tuple
  7from typing import Iterable
  8from typing import Callable
  9from typing import Dict
 10from typing import Union
 11from datetime import datetime
 12from datetime import timedelta
 13from collections import defaultdict
 14import json
 15import logging
 16import runpy
 17
 18from chart.project import SID
 19from chart.project import settings
 20from chart.db.model.field import FieldInfo as ParamInfo
 21from chart.alg import init_algorithm
 22from chart.backend.job import JobStatus
 23from chart.alg.pus_synthetics_triggers import reverse_triggers
 24from chart.alg.pus_state import build_state
 25from chart.db import ts
 26from chart.db.model.table import TableInfo
 27from chart.backend.result import ResultTable
 28from chart.db import ts
 29
 30
 31INPUT_TABLENAME = 'TM'
 32OUTPUT_TABLENAME = 'SYNTHETICS'
 33
 34logger = logging.getLogger()
 35
 36# Helper class - it's not really needed and all this stuff can be computed
 37# on the fly, or stored in function attributes, or using classes, but this class seems to make
 38# some bits of this module a bit simpler
 39class SynthFunctionInfo:
 40    def __init__(self,
 41                 output:str,
 42                 inputs:Iterable[str],
 43                 function:Callable):
 44        self.output = output
 45        self.inputs = inputs
 46        self.function = function
 47
 48    def __str__(self):
 49        return '{fn}({ins})'.format(
 50            out=self.output,
 51            ins=', '.join(self.inputs),
 52            fn=self.function.__name__)
 53
 54    def __hash__(self):
 55        # we get stored in `fail_states` below
 56        return hash(self.output)
 57
 58
 59def cal(sid:SID, param:str, raw:Union[int, float]) -> float:
 60    """Quick hack function to calibrate a value."""
 61    # Next release we upgrade the `state` structure to use
 62    # a custom class that implements `.raw` and `.eng` properties
 63    # to the synthetics functions can access them more naturally
 64    # and using a syntax closer to the original code
 65
 66    # This is also where conditional calibrations need to be handled
 67    return cal.tm.fields[param].cal[SID(sid.cal_name())].calibrate_value(raw)
 68
 69# quick optimisation to avoid calling TableInfo constructor too much
 70cal.tm = TableInfo('TM')
 71
 72
 73def filter_params_by_storage(params:Iterable[str],
 74                             storage:TableInfo) -> Iterable[ParamInfo]:
 75    """Yield only the parameters stored in table `storage`, discarding others."""
 76    for param in params:
 77        if param in storage.params:
 78            yield param
 79
 80
 81def call_function(sid:SID,
 82                  function_info:SynthFunctionInfo,
 83                  state:Dict[str, object]):# -> Dict[str, object]:
 84    """Call a synthetic function with a machine state, return updated values.
 85
 86    If any required input synthetic values are missing, then add them
 87    using recursive calls."""
 88    args = [sid]
 89    for inp in function_info.inputs:
 90        if inp in state:
 91            args.append(state[inp])
 92
 93        else:
 94            # require value is missing so we try to fill it
 95            if inp not in reverse_triggers:
 96                # this is a bit dangerous and could hide real problems
 97                # it's also necessary for the case of computing synthetics
 98                # near the start of mission where some raw values are genuinely missing
 99                logger.warning('Not calling {fn} due to missing input {inp}'.format(
100                    fn=function_info.function.__name__, inp=inp))
101                return
102
103            recursive_call = reverse_triggers[inp]
104            new_value = call_function(sid, recursive_call, state)
105            if new_value is None:
106                return None
107
108            args.append(new_value)
109
110    res = function_info.function(*args)
111    state[function_info.output] = res
112    return res
113
114
115def compute_synthetics(
116        sid,
117        start_time,
118        stop_time) -> Iterable[Dict]:
119    # experiment - see if we can just walk back 20 minutes and start processing from there
120    # in order to capture initial values
121    # We probably need to switch to either stats or a new satellite state table
122    # in order to scan back much further
123    # input_time_buffer = timedelta(minutes=20)
124    # For the results
125    source_table = TableInfo(INPUT_TABLENAME)  # this is too slow but fast_load
126    # prevents the list of parameter names from being loaded
127    output_table = TableInfo(OUTPUT_TABLENAME)
128
129    # Keep track of the current state of all telemetry parameters and synthetics
130    # import chartmtg.alg.synthetic_functions
131
132    triggers = build_triggers()
133    state = build_state(
134        source_table,
135        sid,
136        start_time,
137        filter_params_by_storage(triggers.keys(), source_table))[0]  # take the values only
138
139    # Current batch of synthetics to be inserted into a single output row
140    new_params = {}  # synthetic name : value
141
142    # Declare here so the nested function flush() has visibility
143    sensing_time = None
144
145    # delete
146    logger.info('Deleting {sid} from {strt} to {stop} for {tbl}'.format(
147        sid=sid, strt=start_time, stop=stop_time, tbl=output_table.storage_table))
148    ts.delete(sid=sid,
149              sensing_start=start_time,
150              sensing_stop=stop_time,
151              table=output_table.storage_table,
152              commit=True)
153
154    ins_cursor = ts.insert(
155        table=output_table.storage_table,
156        fields=sid.insert_fields + ['SENSING_TIME', 'PARAMS'])
157
158    # Take a single 'row' of new synthetic parameters and write to database
159    def flush():
160        if len(new_params) > 0:
161            ins_cursor.execute(None, sid.bind_insert() + [sensing_time, json.dumps(new_params)])
162            new_params.clear()
163
164    # We walk through every packet in the time range, updating state as we go
165    # and for each parameter which is a synthetics input, call the relevant functions.
166    # Every time sensing time changes and we have new synthetics values flush them to
167    # database as a single row with one timestamp
168    old_sensing_time = None
169    total_packets = 0
170    used_packets = 0
171    output_rows = 0
172
173    # keep track of which parameter's functions are in fail state and not available
174    # as inputs until the state is cleared
175    fail_states:set[str] = set()
176
177    for sensing_time, payload in ts.select(sid=sid,
178                                           sensing_start=start_time,
179                                           sensing_stop=stop_time,
180                                           table=source_table.storage_table,
181                                           fields=('SENSING_TIME', source_table.storage_column)):
182        total_packets += 1
183        # Make a list of any functions triggered by this row
184        trigger_fns = set()
185
186        # debug only
187        changed = {}
188
189        # Find for all trigger parameters in the packet
190        # This is a slow but simple way to find parameters which are also triggers
191        # set.intersect() would do it much faster
192        for param_name in payload.keys():
193            if param_name in triggers:
194                # Optional optimisation - if a parameter is received but it's the same
195                # value as last time, we could ignore it.
196                # This means the plots will look a bit different because users won't
197                # see a row of dots confirming the values were re-computed
198                # if state[param_name] == payload[param_name]:
199                    # continue
200
201                # We found a parameter in this packet which is a synthetics trigger
202                # Record the (potentially updated) value
203                state[param_name] = payload[param_name]
204                changed[param_name] = payload[param_name]
205
206                # Queue all triggered functions for execution
207                # (we don't actually run them until all the other parameters in the packet
208                #  have been considered)
209                for fn in triggers[param_name]:
210                    trigger_fns.add(fn)
211
212                # logging.debug('setting {p} in state from {ov} to {nv}'.format(
213                    # p=param_name, ov=state[param_name], nv=payload[param_name]))
214
215        if len(changed) == 0:
216            continue
217
218        used_packets += 1
219
220        # logging.debug('Packet at {tim} updated raw values {changed} initial triggers {trig}'.format(
221            # tim=sensing_time,
222            # changed=', '.join('{k}:{v}'.format(k=k, v=v) for k, v in changed.items()),
223            # trig=', '.join(t.function.__name__ for t in trigger_fns)))
224
225        # Just an optimisation
226        # if len(trigger_fns) == 0:
227            # continue
228
229        # Now we call the triggered functions.
230        # This is where we should be sorting them based on a tree of dependencies
231        while len(trigger_fns) > 0:
232            fn = trigger_fns.pop()
233
234            # logging.debug('Calling {fn} needs {inp}'.format(
235                # fn=fn.function.__name__, inp=fn.inputs))
236
237            try:
238                new_value = call_function(sid, fn, state)
239            except ValueError as e:
240                if fn.output not in fail_states:
241                    logging.warning('Parameter {p} has entered fail state at {when}: ({e})'.format(
242                        when=sensing_time, p=fn.output, e=e))
243                    fail_states.add(fn.output)
244
245                continue
246
247            if fn.output in fail_states:
248                logging.info('Parameter {p} has left fail state at {when}'.format(
249                    p=fn.output, when=sensing_time))
250
251            new_params[fn.output] = new_value
252
253            for re_fn in triggers[fn.output]:
254                # logging.debug('retrigger {r}'.format(r=re_fn))
255                trigger_fns.add(re_fn)
256
257            # Record the updated satellite status
258            # state[fn.output].update(updates)
259
260            # Buffer new parameters to write all synthetics with the same timestamp
261            # to a database row
262            # new_params[fn.output].update(updates)
263
264        # We could have multiple TM packets with the same timestamp, but the output
265        # synthetics table should only have one row per timestamp. We accumulate new
266        # synthetics values until sensing time changes
267        if sensing_time != old_sensing_time and len(new_params) > 0:
268            flush()
269            output_rows += 1
270
271        old_sensing_time = sensing_time
272
273    if len(new_params) > 0:
274        flush()
275        output_rows += 1
276
277    logger.info('Read {tot} packets used {used} wrote {out} synthetics rows'.format(
278        tot=total_packets, used=used_packets, out=output_rows))
279
280    return [ResultTable(table=output_table, sensing_start=start_time,sensing_stop=stop_time,
281                        sid=sid)]
282
283
284def build_triggers():
285    """Create triggers (input:output) as a reverse of reverse_triggers (output:input)."""
286    triggers = defaultdict(list)
287    for output, sfi in reverse_triggers.items():
288        for inp in sfi.inputs:
289            triggers[inp].append(sfi)
290
291    return triggers
292
293# def is_valid(field:ParamInfo, state) -> bool:
294    # """Handle normal and conditional validity."""
295    # return True
296
297def synthetic(output:str, inputs:Iterable[str]) -> callable:
298    """Decorator to mark a function as one that computes synthetics.
299
300    `inputs` lists the input telemetry and synthetic parameters.
301    Values are supplied raw and the function must call `calibrate` if needed.
302    `output` names the single output value
303
304    A function entry is added to the reversed_triggers data structure.
305    """
306    def decorator(user_function):
307        reverse_triggers[output] = SynthFunctionInfo(output=output,
308                                                     inputs=inputs,
309                                                     function=user_function)
310        return user_function
311
312    return decorator
313
314
315def dispatch(wo, resultfile, _):  # (unused var) pylint: disable=W0613
316    """We have been called by the dispatcher tool."""
317    for job in wo.read_jobs():
318        # Initialise the global cross-module reverse_triggers structure
319        # with our project specific synthetics functions
320        # functions_file = 'chartmtg/alg/synthetic_functions.py'
321        functions_file = settings.PROJECT_ROOT_DIR.joinpath(job.activity.configuration.functions_file)
322        if functions_file.is_file():
323            runpy.run_path(str(functions_file), init_globals=globals())
324
325        else:
326            raise ValueError('{fn} not found'.format(fn=functions_file))
327
328        tables = compute_synthetics(job.sid, job.sensing_start, job.sensing_stop)
329        resultfile.add_job(job, JobStatus.COMPLETED, tables)
330
331
332def main():
333    """Command line entry point."""
334    try:
335        # run from inside dispatcher
336        dispatch(*init_algorithm())
337        return
338
339    except init_algorithm.NotDispatcher:
340        # run as standalone tool
341        pass
342
343    from chart.common.args import ArgumentParser
344    parser = ArgumentParser()
345    parser.add_argument('--db', '--conn', '-d',
346                        metavar='CONNECTION',
347                        help='Use database connection CONNECTION')
348    parser.add_argument('--sid', '-s',
349                        type=ArgumentParser.sid,
350                        help='Source ID to query for')
351    parser.add_argument('--start', '--start-time',
352                        type=ArgumentParser.start_time,
353                        help='Start of time range')
354    parser.add_argument('--stop', '--stop-time',
355                        type=ArgumentParser.stop_time,
356                        help='Stop of time range')
357    parser.add_argument('--show-triggers',
358                        help='Show network of parameters triggering other parameters',
359                        action='store_true')
360    parser.add_argument('--show-state',
361                        help=('Show satellite state featuring all source parameters at given '
362                              'start time'),
363                        action='store_true')
364    args = parser.parse_args()
365    # needs updating to use activity.configuration
366    import chartmtg.alg.synthetic_functions
367    from chart.common.prettyprint import Table
368    t  = Table(headings=('Trigger', 'Output', 'Inputs', 'Function'))
369    if args.show_triggers:
370        triggers = build_triggers()
371        for trigger, sfis in triggers.items():
372            for sfi in sfis:
373                t.append((trigger, sfi.output, ', '.join(sfi.inputs), sfi.function.__name__ + '()'))
374
375        t.write()
376
377        parser.exit()
378
379    if args.db:
380        settings.set_db_name(args.db)
381
382    if args.show_state:
383        triggers = build_triggers()
384        source_params = triggers.keys()
385        state = build_state(args.sid, args.start, source_params)[0]
386        print('state', state)
387        parser.exit()
388
389    compute_synthetics(args.sid, args.start, args.stop)
390
391if __name__ == '__main__':
392    main()