1#!/usr/bin/env python3
2
3# SK 22/06/2023: Created
4
5import logging
6from chart.db import ts
7from chart.events.event import Event, include_event_check
8from chart.products.pus.packetdef import PacketDef
9from chart.db.model.table import TableInfo
10from chart.common.traits import is_listlike
11from chart.web.user import User
12from chart.db.ts import flat as ts_flat
13from chart.db.ts import kv as ts_kv
14from chart.db.ts import jsonb as ts_jsonb
15from chart.db.connection import db_connect
16from chart.db.func import Func
17from chart.db.model.field import RowcountFieldInfo
18from chart.db.model.field import FieldInfo
19
20DECODER_ID = 'PRODUCTS-EVENTS'
21TABLE = 'PRODUCTS'
22EVENT_CLASSNAME = 'PRODUCTS-EVENTS'
23
24logger = logging.getLogger()
25
26def get_fields(fields, stat):
27 """Yield columns to retrieve based on `fields`.
28
29 Elements of `fields` can be strings (field names) or FieldInfo objects.
30 Column names will be converted to stats (NEDT_H1_AVG etc.) if needed.
31 If `strip` == True, all fields with Func will
32 be stripped of Func, but suffix will be retained:
33 db.Min('NEDT_H1_MIN') -> 'NEDT_H1_MIN'
34 This is needed by retrievals from stats tables with subsampling (modulus > 1).
35 """
36 for f in fields:
37 yield from handle_field(f, stat)
38
39
40def handle_field(f, stat):
41 if isinstance(f, Func):
42 yield from handle_func_field(f, stat)
43 elif isinstance(f, RowcountFieldInfo):
44 yield from handle_rowcount_field(f, stat)
45 elif f is 'INGESTION_TIME':
46 yield 'INGESTION_TIME'
47 elif isinstance(f, FieldInfo):
48 yield from handle_field_info(f, stat)
49 else:
50 yield from handle_string_field(f, stat)
51
52
53def handle_func_field(f, stat):
54 if stat is not None and name_of_thing(f.field).upper() not in COMMON_TS_FIELDS:
55 yield f.sql_stat(stat.upper())
56 else:
57 yield f.sql()
58
59
60def handle_rowcount_field(f, stat):
61 if stat is not None:
62 yield 'ROWCOUNT'
63 else:
64 yield 'count(*)'
65
66
67def handle_field_info(f, stat):
68 field_name = f.name
69 if stat is not None:
70 field_name = f"{field_name}_{stat.upper()}"
71 yield field_name
72
73
74def handle_string_field(f, stat):
75 if stat is None or stat == 'STATS' or f.upper() in COMMON_TS_FIELDS or '(' in f:
76 yield f
77 else:
78 yield f"{f}_{stat.upper()}"
79
80
81def custom_select(table_info, # (too many arguments) pylint: disable=R0913
82 fields,
83 sensing_start=None,
84 sensing_stop=None,
85 ingestion_time=None,
86 sid=None,
87 region_start=None,
88 region_stop=None,
89 include_previous=False,
90 calibrated=True,
91 where=None,
92 bindvars=None,
93 ordering='INGESTION_TIME',
94 limit=None,
95 region=None,
96 stat=None,
97 rowcount_threshold=None,
98 modulus=None,
99 gap_threshold=None):
100 """Retrieve from `table` (string or TableInfo) all data in `fields`, applying filters.
101
102 Results are ordered by ingestion_time and are cursor row objects that can generally be treated
103 as tuple generators.
104 `start_time` and `stop_time` can be None but will cause an unbounded scan of the table
105 (very bad especially with partitioned tables under Oracle).
106
107 Args:
108 `table` (TableInfo): Identify the table to be read from. If None use `fields`
109 to find the table.
110 Note `table` may no non-AP i.e. "AMSUA1_AVGS" (from retrieve_histogram only) but
111 this should not be done - use `region` and `stat` instead.
112 The '_CAL' prefix should not be present, use `calibrated` instead.
113
114 `fields` (list of str or FieldInfo): Fields to read from.
115
116 `sid` (SID): Data source ID.
117
118 `sensing_start` (datetime): If given, read a range of fields with sensing time greater
119 than or equal to `sensing_start`.
120
121 `sensing_stop` (datetime): If given, read a range of fields with sensing time less than
122 `sensing_start`.
123
124 `ingestion_time` (datetime): If given, read a single row with exactly matching sensing time.
125
126 `include_previous` (bool): When reading a range of values include the immediately preceding
127 row in the result set.
128
129 `calibrated` (bool): Read from calibrated raw tables.
130
131 `clauses` (list of str): Optional WHERE clauses to be added to query.
132
133 `bindvars` (dict): Bind variables to support `clauses`.
134
135 `ordering` (str): Order or results.
136
137 `region` (Sampling object or str): If reading from stats tables, name the value of the
138 REGION column to search from if a string, otherwise if a Sampling object use it to
139 determine the field and value to search for. This is for activating stats lookup only,
140 not AP subsampling.
141
142 `stat` (str[min, max, avg or std]): Read from stats tables instead of all-points.
143 When used the query will automatically read from the correct stats table and will
144 modify all field names to include the '_<stat>' prefix.
145 If `table` has no stats this field is ignored.
146
147 `region_start` (int): Minimum region to read from if reading stats.
148
149 `region_stop` (int): Maximum-1 region to read from if reading stats.
150
151 `rowcount_threshold` (float): For stats retrievals only, filter results to those whose
152 rowcount is more than this proportion of the nominal rowcount.
153
154 `modulus' (int): If modulus is not None or 1, it defines the size of the bucket used to
155 average returned data. Time-ordered data is grouped into buckets of
156 `modulus` size and respective averages, minima and maxima of the bucket
157 can be returned via aggregate functions. Sensing time is returned as
158 min(ingestion_time) i.e. the earliest timestamp in the bucket.
159
160 `gap_threshold` (timedelta): If set, ignore `fields` and return rows of
161 (gap_start, gap_stop) for all gaps over threshold for our time interval.
162 """
163 from chart.db.ts import adjust_to_previous
164
165 # assert not (region is None and stat is None)
166 # assert not (gap_threshold is not None and fields is not None)
167
168 if not is_listlike(fields):
169 # allow `fields` to be a single field
170 fields = [fields]
171
172 db_conn = db_connect(table_info.name)
173
174 # if 'include_previous' is set adjust start_time to include the previous scan line
175 if include_previous:
176 # logger.debug('Scan previous set time of {table} to {time}'.format(
177 # table=name_of_thing(table), time=sensing_start))
178 if sid is not None:
179 sensing_start = adjust_to_previous(sid, sensing_start, table_info)
180
181 else:
182 sensing_start = adjust_to_previous(sid, sensing_start, table_info)
183
184 if bindvars is None:
185 bindvars = {}
186
187 clauses = [sid.sql_where()]
188
189 if where is not None:
190 if is_listlike(where):
191 clauses.extend(where)
192
193 else:
194 clauses.append(where)
195
196 if sensing_start is not None:
197 clauses.append('INGESTION_TIME>=:start_time')
198 bindvars['start_time'] = sensing_start
199
200 if sensing_stop is not None:
201 clauses.append('INGESTION_TIME<:stop_time')
202 bindvars['stop_time'] = sensing_stop
203
204 if ingestion_time is not None:
205 clauses.append('INGESTION_TIME=:ingestion_time')
206 bindvars['ingestion_time'] = ingestion_time
207
208 if region is not None and table_info.has_stats:
209 if isinstance(region, str):
210 # compatibility mode; allow user to pass in a string
211 # do not remove this option as there are custom scripts using it
212 region_field = 'ORBIT' # ? this is weird
213 # old projects used string for region column, new projects use id code
214 bindvars['region'] = region
215
216 else:
217 region_field = region.field
218 if settings.STATS_USE_NUMERICAL_REGIONS:
219 bindvars['region'] = region.region_num
220
221 else:
222 bindvars['region'] = region.region.upper()
223
224 clauses.append('REGION=:region')
225
226 if region_field is None and (region_start is not None or region_stop is not None):
227 raise ValueError('Cannot use region start/stop with region_field unavailable')
228
229 if region_start is not None:
230 clauses.append('{region_field}>=:region_start'.format(region_field=region_field))
231 bindvars['region_start'] = region_start
232
233 if region_stop is not None:
234 clauses.append('{region_field}<:region_stop'.format(region_field=region_field))
235 bindvars['region_stop'] = region_stop
236
237 if rowcount_threshold is not None and table_info.has_stats:
238 if stat is None and region is None:
239 raise ValueError('Cannot use rowcount_threshold when reading from all-points tables')
240
241 if sid.satellite is not None and sid.satellite.orbit_duration is not None\
242 and table_info.period is not None and table_info.regular:
243
244 # for irregular tables we don't do thresholding
245 # now do we threshold (yet) for MSG
246 min_rowcount = int(timedelta_div(sid.satellite.orbit_duration, table_info.period) *
247 rowcount_threshold)
248 # logger.debug('Computed actual min rowcount as {min} based on '
249 # 'threshold {thresh}'.format(min=min_rowcount, thresh=rowcount_threshold))
250 clauses.append('rowcount>=:rowcount')
251 bindvars['rowcount'] = min_rowcount
252 # logger.debug('Set rowcount threshold to {thresh}'.format(thresh=rowcount_thresh))
253
254 sql_table_name = table_info.get_cal_name(calibrated)
255 # Read from stats table if requested and exists
256 if region is not None and table_info.has_stats:
257 # force stats when looking for gaps with region subsampling
258 if gap_threshold is not None and stat is None:
259 stat = 'AVG'
260
261 if table_info.stats_storage is StatsStorage.SPLIT:
262 sql_table_name = '{base}_{stat}S'.format(base=sql_table_name, stat=stat.upper())
263
264 else:
265 sql_table_name = '{base}_STATS'.format(base=sql_table_name)
266
267 if ordering is None:
268 sql_ordering = ''
269
270 else:
271 if ordering is not None:
272 if is_listlike(ordering):
273 ordering_clauses = ordering
274
275 else:
276 ordering_clauses = [ordering]
277
278 ordering_clauses = [o() if callable(o) else o for o in ordering_clauses]
279
280 sql_ordering = ' ORDER BY {clauses}'.format(clauses=','.join(ordering_clauses))
281
282 # Allow specified tables to be read from an alternate database connection
283 db_conn = db_connect(table_info.name)
284
285 if gap_threshold is not None:
286 # oracle
287 # bindvars['gap'] = gap_threshold.total_seconds()
288 # return db_conn.query((
289 # 'SELECT gstart, gstop FROM ('
290 # ' SELECT ingestion_time AS gstop, '
291 # ' LAG(ingestion_time) OVER (ORDER BY ingestion_time) AS gstart, '
292 # ' ingestion_time - LAG(ingestion_time) OVER (ORDER BY ingestion_time) AS gap '
293 # ' FROM {table} WHERE {where}) q '
294 # 'WHERE gap > NUMTODSINTERVAL(:gap, \'SECOND\')'
295 # ).format(
296 # table=sql_table_name,
297 # where=' AND '.join(clauses)),
298 # **bindvars)
299
300 bindvars['gap'] = gap_threshold
301 return db_conn.query((
302 'SELECT gstart, gstop FROM ('
303 ' SELECT INGESTION_TIME AS gstop, '
304 ' LAG(INGESTION_TIME) OVER (ORDER BY INGESTION_TIME) AS gstart, '
305 ' ingestion_time - LAG(INGESTION_TIME) OVER (ORDER BY INGESTION_TIME) AS gap '
306 ' FROM {table} WHERE {where}) q '
307 'WHERE gap > :gap').format(
308 table=sql_table_name,
309 where=' AND '.join(clauses)),
310 **bindvars)
311
312 if modulus is None:
313 modulus = 1
314
315 if region == 'ORB' and (sid.satellite is None or sid.satellite.orbit_duration is None):
316 raise ValueError('No orbital stats for table {}'.format(sql_table_name))
317
318 if modulus == 1:
319 # Limit only supported for non-subsampled retrievals from PostgreSQL databases
320 sql_limit = ''
321 if limit is not None:
322 # to limit number of rows
323 sql_limit = ' LIMIT {limit}'.format(limit=limit)
324
325 return db_conn.query(('SELECT {fields} FROM {table} WHERE {where}{ordering}{limit}').format(
326 table=sql_table_name,
327 fields=','.join(get_fields(fields, stat)),
328 where=' AND '.join(clauses),
329 ordering=sql_ordering,
330 limit=sql_limit),
331 **bindvars)
332
333 elif db_conn.engine is DatabaseEngine.SQLITE:
334 # be warned this returns the rn value as an additional unwanted field.
335 # It's not easy to get rid of it efficiently
336 return db_conn.query('SELECT {fields}, (SELECT count(*) FROM {table} t2 WHERE {where2}) rn '
337 'FROM {table} t1 '
338 'WHERE {where} '
339 'GROUP BY rn%{modulus} '
340 '{ordering}'.format(
341 fields=','.join(get_fields(fields, stat)),
342 table=sql_table_name,
343 where=' AND '.join(clauses),
344 where2=' AND '.join(clauses + ['t2.INGESTION_TIME<t1.INGESTION_TIME']),
345 ordering=sql_ordering,
346 modulus=modulus),
347 **bindvars)
348
349 else:
350 grouping = 'cast(counter/{ratio} as integer)'.format(ratio=modulus)
351 # all_fields - fields for the inner query
352 # Inner query only selected field - ordering doesn't matter
353 inner_fields = list(set((f for f in get_inner_fields(fields, stat))))
354
355 if 'INGESTION_TIME' not in inner_fields:
356 inner_fields.append('INGESTION_TIME')
357
358 if region is not None or stat is not None:
359 if 'ROWCOUNT' not in inner_fields:
360 inner_fields.append('ROWCOUNT')
361
362 return db_conn.query(
363 ('SELECT '
364 ' {fields}'
365 ' FROM ( '
366 ' SELECT {inner_fields}, row_number() over (ORDER BY INGESTION_TIME) AS counter '
367 ' FROM {table} '
368 ' WHERE {where} '
369 ' ORDER BY INGESTION_TIME) q '
370 ' GROUP BY {grouping} ORDER BY min(INGESTION_TIME) '
371 ).format(
372 fields=','.join(get_fields(fields, stat)),
373 table=sql_table_name,
374 inner_fields=','.join(inner_fields),
375 where=' AND '.join(clauses),
376 grouping=grouping),
377 **bindvars)
378
379
380def find_events(sid,
381 start_time,
382 start_time_eq,
383 stop_time,
384 event_classes,
385 ordering,
386 from_to,
387 properties,
388 count,
389 single,
390 filtering,
391 multitable_extra_params,
392 user: User = None):
393
394 if from_to is not None:
395 page_start, page_stop = from_to
396 # print(f"Page start: {page_start}, Page stop: {page_stop}")
397
398 limit = 1 if single and start_time_eq is None else None
399 # print(f"Limit: {limit}")
400
401 events = []
402 event_count = 0
403
404 table_info = TableInfo(TABLE, sid=sid, fast_load=True)
405
406 if not single and count:
407 # just return count of events
408 # Note: 'PRODUCTS' table has 'sensing_start' rather than 'sensing_time' as key
409 # so use db.query to get count
410 db_conn = db_connect(table_info)
411
412 clauses = [sid.sql_where()]
413 bindvars = {}
414
415 if start_time is not None:
416 clauses.append('sensing_start>=:start_time')
417 bindvars['start_time'] = start_time
418
419 if stop_time is not None:
420 clauses.append('sensing_start<:stop_time')
421 bindvars['stop_time'] = stop_time
422
423 event_count = db_conn.query('SELECT count(*) FROM {table} WHERE {where}'.format(
424 table=table_info.storage_table.name,
425 where=' AND '.join(clauses)), **bindvars).fetchone()[0]
426
427 return event_count, events
428
429 # print('Table info: ', table_info)
430
431 fields = ('ID', 'ACTIVITY', 'FILENAME', 'INGESTION_TIME', 'RESULT')
432
433 # print(f"Running ts.select with sid: {sid}, start_time: {start_time}, stop_time: {stop_time}, start_time_eq: {start_time_eq}, table_info: {table_info}, fields: {fields}, ordering: {'ID'}, limit: {limit}")
434
435 for row in custom_select(sid=sid, sensing_start=start_time, sensing_stop=stop_time, table_info=table_info, fields=fields, ordering='ID', limit=limit):
436 # print(f"Processing row: {row}")
437 product_id, activity, filename, ingest_time, result = row
438 inst_properties = {
439 'SID': sid,
440 'ID': product_id,
441 'ACTIVITY': activity,
442 'FILENAME': filename,
443 'INGESTION-TIME': ingest_time,
444 'RESULT': result
445 }
446
447 # print(f"Inst properties: {inst_properties}")
448
449 if not include_event_check(properties, inst_properties):
450 # print("Properties not included in inst_properties. Skipping row.")
451 continue
452
453 event_count += 1
454 # print(f"Event count: {event_count}")
455
456 if not count and (from_to is None or (page_start <= event_count and event_count <= page_stop)):
457 # print("Appending new event.")
458 events.append(Event(sid=sid, event_classname=EVENT_CLASSNAME, start_time=ingest_time, instance_properties=inst_properties))
459 if single:
460 # print("Single is True. Breaking loop.")
461 break
462
463 # print(f"Returning event count: {event_count}, and events: {events}")
464 return event_count, events