1#!/usr/bin/env python3
2
3"""Implementation of functions to find PUS satellite state at time."""
4
5from datetime import datetime
6from datetime import timedelta
7from typing import Iterable
8from typing import Tuple
9from typing import Dict
10import logging
11
12logger = logging.getLogger()
13
14from chart.project import SID
15from chart.db.model.table import TableInfo
16from chart.db.model.field import FieldInfo as ParamInfo
17from chart.db import ts
18from chart.db.func import SensingTime
19from chart.db.func import Reversed
20from chart.plots.sampling import Sampling
21
22def build_state(
23 source_table:TableInfo,
24 sid:SID,
25 sensing_time:datetime,
26 params:Iterable[ParamInfo],
27 details:bool=False) -> Tuple[Dict[ParamInfo,object]]:
28# -> (Dict[ParamInfo, object], Dict[ParamInfo, datetime], Dict[ParamInfo, int])
29# Returns 3 dicts giving values, timestamps and SPIDs
30
31 """Build a subset of the satellites status at a particular time.
32
33 The function scans back through time, to launch if needed, until it finds the last
34 known value of each requested parameter. Return value is a dict of ParamInfo
35 against value for each requested parameter. If any can't be found anywhere in the
36 database prior to `sensing_time` a None is returned in the dict for that parameter.
37
38 Return value is a tuple whose first value is a dict of param_name:raw_value.
39 The second and third results are dict of param:time and param:spid if `details` is set.
40 If not set the times and spids may be empty, depending on whether stats had to be used.
41 """
42 # here we make a dump simple algorithm that just scans through 20 minutes of AP
43 # data. What we should do though is use stats to quickly scan back to launch
44 remaining = list(params)
45 logger.info('Building machine state for {sid} at {time} for {cc}: {params}'.format(
46 sid=sid,
47 time=sensing_time,
48 cc=len(remaining),
49 params=', '.join(r if source_table.fields[r].calibration_name is None else '{r} ({c})'.format(
50 r=r, c=source_table.fields[r].calibration_name) for r in remaining)))
51 # Record the most recent value found for each parameter
52 state = {}
53 # (for API use only) record the timestamp and SPID each parameter used
54 spids = {}
55 times = {}
56 # We scan backwards from the starting time, one block at a time, looking for the most
57 # recent packet containing each of the requested parameters
58 BLOCK_LENGTH = timedelta(minutes=1)
59 # If the first block doesn't contain everything we check a few more before quitting
60 # (a function could be implemented to check stats instead, starting with the biggest
61 # available and zooming in)
62 # 20 is nice and fast and used to work
63 # in v3.21 we added param F88D080K with inputs P99EA57X, P99EA58X, P99EA59V
64 # which are not sent often but are sent on 2024-02-02
65 BLOCK_COUNT = 20
66 remaining_set = set(remaining)
67 cc = 0
68 # first phase - scan back BLOCK_COUNT iterations of BLOCK_LENGTH to quickly fill
69 # about 90% of the required parameters, before we try anything clever with stats
70 for _ in range(BLOCK_COUNT):
71 if cc % 1 == 0:
72 logging.debug('Building initial state need {rem} scan {strt}'.format(
73 rem=len(remaining), strt=sensing_time - BLOCK_LENGTH))
74
75 cc += 1
76 # examine the payload of every packet in the block
77 for sensing_time, spid, payload, in ts.select(
78 sid=sid,
79 sensing_start=sensing_time - BLOCK_LENGTH,
80 sensing_stop=sensing_time,
81 table=source_table.storage_table, # to access PAYLOAD field of raw packets
82 fields=(SensingTime,
83 source_table.storage_table.fields['SPID'],
84 source_table.storage_table.fields['PAYLOAD']),
85 calibrated=False, # the individual funcs apply calibration as needed
86 ordering=Reversed('sensing_time')):
87 # check what parameters we got
88 packet_set = set(payload.keys())
89 # Quickly identify which if any of the remaining required parameters are in this packet
90 found = remaining_set.intersection(packet_set)
91 for f in found:
92 if f not in state:
93 # Only debug log the first time we find each parameter
94 # Subsequent values will replace the initial one in the result, but
95 # we don't bother logging that
96 logging.debug('Found initial {param} val {val} at {time}'.format(
97 param=f, val=payload[f], time=sensing_time))
98
99 state[f] = payload[f]
100 times[f] = sensing_time
101 spids[f] = spid
102 # trap the case of a row containing multiple triggers but only a subset are needed
103 # to complete the state
104 if f in remaining:
105 remaining.remove(f)
106
107 # exit the block if we found everything
108 if len(remaining) == 0:
109 break
110
111 # don't search any more blocks if we found everything
112 if len(remaining) == 0:
113 break
114
115 sensing_time -= BLOCK_LENGTH
116 # if sensing_time < datetime(2024, 2, 2):
117 # we need to use TM_STATE or stats to do this better
118 # this date is a day that P99EA57X is sent down
119 # break
120
121 if len(remaining) > 0:
122 logging.info('After initial scan got {got} remaining {r}'.format(
123 got=len(remaining), r=', '.join(remaining)))
124 sampling = Sampling.MIN_20
125 purge = []
126 for r in remaining:
127 logger.debug('Stats scan for {r} using {s}'.format(r=r, s=sampling.name))
128 rows = ts.select(
129 sid=sid,
130 sensing_start=datetime(2000, 1, 1), # TBD look up launch time
131 sensing_stop=sensing_time,
132 table=source_table,
133 fields=(SensingTime, source_table.fields[r]),
134 calibrated=False,
135 ordering=Reversed('sensing_time'),
136 region=sampling,
137 stat='MIN',
138 limit=1).fetchall()
139 # logger.debug('rows ' + str(rows))
140 # now we have a serious inefficiency - we do an AP scan of the 20-min block to
141 # find a value to use. Could be optimised by a) read the MAX stat and if the same
142 # as MIN and `details` not set, we just use that value and
143 # b) cluster together the fields in each 20-minute block so we only do a single
144 # sweep per block instead of a sweep per parameter
145 if len(rows) == 0:
146 logger.warn('Failed to find value for {r} in stats scan'.format(r=r))
147 continue
148
149 start_time = rows[0][0]
150 stop_time = start_time + sampling.nominal_duration
151
152 logger.debug(' got min {mn}, running packet scan from {strt} to {stop}'.format(
153 mn=rows[0][1],strt=start_time, stop=stop_time))
154 for sensing_time, spid, payload, in ts.select(
155 sid=sid,
156 sensing_start=start_time,
157 sensing_stop=stop_time,
158 table=source_table.storage_table, # to access PAYLOAD field of raw packets
159 fields=(SensingTime,
160 source_table.storage_table.fields['SPID'],
161 source_table.storage_table.fields['PAYLOAD']),
162 calibrated=False, # the individual funcs apply calibration as needed
163 ordering=Reversed('sensing_time')):
164 if r in payload.keys():
165 logger.debug(' found {param}:{value} at {when}'.format(param=r, value=payload[r], when=sensing_time))
166 state[r] = payload[r]
167 times[r] = sensing_time
168 spids[r] = spid
169 # remaining.remove(r)
170 purge.append(r)
171 break
172
173 if r not in purge:
174 logger.warn('AP scan failed to find value in stats')
175
176 for p in purge:
177 remaining.remove(p)
178
179 logging.info('Found initial state of {p} with {r} remaining'.format(
180 p=len(state),
181 r=', '.join(remaining) if len(remaining) > 0 else 'none',
182 # p=', '.join('{k}:{v}'.format(k=k, v=v) for k, v in state.items()),
183 # r=len(remaining)))
184 ))
185 return state, times, spids