1#!/usr/bin/env python3
  2
  3# SK 22/06/2023: Created
  4
  5import logging
  6from chart.db import ts
  7from chart.events.event import Event, include_event_check
  8from chart.products.pus.packetdef import PacketDef
  9from chart.db.model.table import TableInfo
 10from chart.common.traits import is_listlike
 11from chart.web.user import User
 12from chart.db.ts import flat as ts_flat
 13from chart.db.ts import kv as ts_kv
 14from chart.db.ts import jsonb as ts_jsonb
 15from chart.db.connection import db_connect
 16from chart.db.func import Func
 17from chart.db.model.field import RowcountFieldInfo
 18from chart.db.model.field import FieldInfo
 19
 20DECODER_ID = 'PRODUCTS-EVENTS'
 21TABLE = 'PRODUCTS'
 22EVENT_CLASSNAME = 'PRODUCTS-EVENTS'
 23
 24logger = logging.getLogger()
 25
 26def get_fields(fields, stat):
 27    """Yield columns to retrieve based on `fields`.
 28
 29    Elements of `fields` can be strings (field names) or FieldInfo objects.
 30    Column names will be converted to stats (NEDT_H1_AVG etc.) if needed.
 31    If `strip` == True, all fields with Func will
 32    be stripped of Func, but suffix will be retained:
 33        db.Min('NEDT_H1_MIN') -> 'NEDT_H1_MIN'
 34    This is needed by retrievals from stats tables with subsampling (modulus > 1).
 35    """
 36    for f in fields:
 37        yield from handle_field(f, stat)
 38
 39
 40def handle_field(f, stat):
 41    if isinstance(f, Func):
 42        yield from handle_func_field(f, stat)
 43    elif isinstance(f, RowcountFieldInfo):
 44        yield from handle_rowcount_field(f, stat)
 45    elif f is 'INGESTION_TIME':
 46        yield 'INGESTION_TIME'
 47    elif isinstance(f, FieldInfo):
 48        yield from handle_field_info(f, stat)
 49    else:
 50        yield from handle_string_field(f, stat)
 51
 52
 53def handle_func_field(f, stat):
 54    if stat is not None and name_of_thing(f.field).upper() not in COMMON_TS_FIELDS:
 55        yield f.sql_stat(stat.upper())
 56    else:
 57        yield f.sql()
 58
 59
 60def handle_rowcount_field(f, stat):
 61    if stat is not None:
 62        yield 'ROWCOUNT'
 63    else:
 64        yield 'count(*)'
 65
 66
 67def handle_field_info(f, stat):
 68    field_name = f.name
 69    if stat is not None:
 70        field_name = f"{field_name}_{stat.upper()}"
 71    yield field_name
 72
 73
 74def handle_string_field(f, stat):
 75    if stat is None or stat == 'STATS' or f.upper() in COMMON_TS_FIELDS or '(' in f:
 76        yield f
 77    else:
 78        yield f"{f}_{stat.upper()}"
 79
 80
 81def custom_select(table_info,  # (too many arguments) pylint: disable=R0913
 82           fields,
 83           sensing_start=None,
 84           sensing_stop=None,
 85           ingestion_time=None,
 86           sid=None,
 87           region_start=None,
 88           region_stop=None,
 89           include_previous=False,
 90           calibrated=True,
 91           where=None,
 92           bindvars=None,
 93           ordering='INGESTION_TIME',
 94           limit=None,
 95           region=None,
 96           stat=None,
 97           rowcount_threshold=None,
 98           modulus=None,
 99           gap_threshold=None):
100    """Retrieve from `table` (string or TableInfo) all data in `fields`, applying filters.
101
102    Results are ordered by ingestion_time and are cursor row objects that can generally be treated
103    as tuple generators.
104     `start_time` and `stop_time` can be None but will cause an unbounded scan of the table
105    (very bad especially with partitioned tables under Oracle).
106
107    Args:
108        `table` (TableInfo): Identify the table to be read from. If None use `fields`
109            to find the table.
110            Note `table` may no non-AP i.e. "AMSUA1_AVGS" (from retrieve_histogram only) but
111            this should not be done - use `region` and `stat` instead.
112            The '_CAL' prefix should not be present, use `calibrated` instead.
113
114        `fields` (list of str or FieldInfo): Fields to read from.
115
116        `sid` (SID): Data source ID.
117
118        `sensing_start` (datetime): If given, read a range of fields with sensing time greater
119            than or equal to `sensing_start`.
120
121        `sensing_stop` (datetime): If given, read a range of fields with sensing time less than
122            `sensing_start`.
123
124        `ingestion_time` (datetime): If given, read a single row with exactly matching sensing time.
125
126        `include_previous` (bool): When reading a range of values include the immediately preceding
127            row in the result set.
128
129        `calibrated` (bool): Read from calibrated raw tables.
130
131        `clauses` (list of str): Optional WHERE clauses to be added to query.
132
133        `bindvars` (dict): Bind variables to support `clauses`.
134
135        `ordering` (str): Order or results.
136
137        `region` (Sampling object or str): If reading from stats tables, name the value of the
138            REGION column to search from if a string, otherwise if a Sampling object use it to
139            determine the field and value to search for. This is for activating stats lookup only,
140            not AP subsampling.
141
142        `stat` (str[min, max, avg or std]): Read from stats tables instead of all-points.
143            When used the query will automatically read from the correct stats table and will
144            modify all field names to include the '_<stat>' prefix.
145            If `table` has no stats this field is ignored.
146
147        `region_start` (int): Minimum region to read from if reading stats.
148
149        `region_stop` (int): Maximum-1 region to read from if reading stats.
150
151        `rowcount_threshold` (float): For stats retrievals only, filter results to those whose
152            rowcount is    more than this proportion of the nominal rowcount.
153
154        `modulus' (int): If modulus is not None or 1, it defines the size of the bucket used to
155                         average returned data. Time-ordered data is grouped into buckets of
156                         `modulus` size and respective averages, minima and maxima of the bucket
157                         can be returned via aggregate functions. Sensing time is returned as
158                         min(ingestion_time) i.e. the earliest timestamp in the bucket.
159
160        `gap_threshold` (timedelta): If set, ignore `fields` and return rows of
161            (gap_start, gap_stop) for all gaps over threshold for our time interval.
162    """
163    from chart.db.ts import adjust_to_previous
164
165    # assert not (region is None and stat is None)
166    # assert not (gap_threshold is not None and fields is not None)
167
168    if not is_listlike(fields):
169        # allow `fields` to be a single field
170        fields = [fields]
171
172    db_conn = db_connect(table_info.name)
173
174    # if 'include_previous' is set adjust start_time to include the previous scan line
175    if include_previous:
176        # logger.debug('Scan previous set time of {table} to {time}'.format(
177                # table=name_of_thing(table), time=sensing_start))
178        if sid is not None:
179            sensing_start = adjust_to_previous(sid, sensing_start, table_info)
180
181        else:
182            sensing_start = adjust_to_previous(sid, sensing_start, table_info)
183
184    if bindvars is None:
185        bindvars = {}
186
187    clauses = [sid.sql_where()]
188
189    if where is not None:
190        if is_listlike(where):
191            clauses.extend(where)
192
193        else:
194            clauses.append(where)
195
196    if sensing_start is not None:
197        clauses.append('INGESTION_TIME>=:start_time')
198        bindvars['start_time'] = sensing_start
199
200    if sensing_stop is not None:
201        clauses.append('INGESTION_TIME<:stop_time')
202        bindvars['stop_time'] = sensing_stop
203
204    if ingestion_time is not None:
205        clauses.append('INGESTION_TIME=:ingestion_time')
206        bindvars['ingestion_time'] = ingestion_time
207
208    if region is not None and table_info.has_stats:
209        if isinstance(region, str):
210            # compatibility mode; allow user to pass in a string
211            # do not remove this option as there are custom scripts using it
212            region_field = 'ORBIT'  # ? this is weird
213            # old projects used string for region column, new projects use id code
214            bindvars['region'] = region
215
216        else:
217            region_field = region.field
218            if settings.STATS_USE_NUMERICAL_REGIONS:
219                bindvars['region'] = region.region_num
220
221            else:
222                bindvars['region'] = region.region.upper()
223
224        clauses.append('REGION=:region')
225
226        if region_field is None and (region_start is not None or region_stop is not None):
227            raise ValueError('Cannot use region start/stop with region_field unavailable')
228
229        if region_start is not None:
230            clauses.append('{region_field}>=:region_start'.format(region_field=region_field))
231            bindvars['region_start'] = region_start
232
233        if region_stop is not None:
234            clauses.append('{region_field}<:region_stop'.format(region_field=region_field))
235            bindvars['region_stop'] = region_stop
236
237    if rowcount_threshold is not None and table_info.has_stats:
238        if stat is None and region is None:
239            raise ValueError('Cannot use rowcount_threshold when reading from all-points tables')
240
241        if sid.satellite is not None and sid.satellite.orbit_duration is not None\
242           and table_info.period is not None and table_info.regular:
243
244            # for irregular tables we don't do thresholding
245            # now do we threshold (yet) for MSG
246            min_rowcount = int(timedelta_div(sid.satellite.orbit_duration, table_info.period) *
247                               rowcount_threshold)
248            # logger.debug('Computed actual min rowcount as {min} based on '
249            # 'threshold {thresh}'.format(min=min_rowcount, thresh=rowcount_threshold))
250            clauses.append('rowcount>=:rowcount')
251            bindvars['rowcount'] = min_rowcount
252            # logger.debug('Set rowcount threshold to {thresh}'.format(thresh=rowcount_thresh))
253
254    sql_table_name = table_info.get_cal_name(calibrated)
255    # Read from stats table if requested and exists
256    if region is not None and table_info.has_stats:
257        # force stats when looking for gaps with region subsampling
258        if gap_threshold is not None and stat is None:
259            stat = 'AVG'
260
261        if table_info.stats_storage is StatsStorage.SPLIT:
262            sql_table_name = '{base}_{stat}S'.format(base=sql_table_name, stat=stat.upper())
263
264        else:
265            sql_table_name = '{base}_STATS'.format(base=sql_table_name)
266
267    if ordering is None:
268        sql_ordering = ''
269
270    else:
271        if ordering is not None:
272            if is_listlike(ordering):
273                ordering_clauses = ordering
274
275            else:
276                ordering_clauses = [ordering]
277
278        ordering_clauses = [o() if callable(o) else o for o in ordering_clauses]
279
280        sql_ordering = ' ORDER BY {clauses}'.format(clauses=','.join(ordering_clauses))
281
282    # Allow specified tables to be read from an alternate database connection
283    db_conn = db_connect(table_info.name)
284
285    if gap_threshold is not None:
286        # oracle
287        # bindvars['gap'] = gap_threshold.total_seconds()
288        # return db_conn.query((
289            # 'SELECT gstart, gstop FROM ('
290            # '  SELECT ingestion_time AS gstop, '
291            # '  LAG(ingestion_time) OVER (ORDER BY ingestion_time) AS gstart, '
292            # '  ingestion_time - LAG(ingestion_time) OVER (ORDER BY ingestion_time) AS gap '
293            # '  FROM {table} WHERE {where}) q '
294            # 'WHERE gap > NUMTODSINTERVAL(:gap, \'SECOND\')'
295        # ).format(
296            # table=sql_table_name,
297            # where=' AND '.join(clauses)),
298                             # **bindvars)
299
300        bindvars['gap'] = gap_threshold
301        return db_conn.query((
302            'SELECT gstart, gstop FROM ('
303            '  SELECT INGESTION_TIME AS gstop, '
304            '  LAG(INGESTION_TIME) OVER (ORDER BY INGESTION_TIME) AS gstart, '
305            '  ingestion_time - LAG(INGESTION_TIME) OVER (ORDER BY INGESTION_TIME) AS gap '
306            '  FROM {table} WHERE {where}) q '
307            'WHERE gap > :gap').format(
308            table=sql_table_name,
309            where=' AND '.join(clauses)),
310                             **bindvars)
311
312    if modulus is None:
313        modulus = 1
314
315    if region == 'ORB' and (sid.satellite is None or sid.satellite.orbit_duration is None):
316        raise ValueError('No orbital stats for table {}'.format(sql_table_name))
317
318    if modulus == 1:
319        # Limit only supported for non-subsampled retrievals from PostgreSQL databases
320        sql_limit = ''
321        if limit is not None:
322            # to limit number of rows
323            sql_limit = ' LIMIT {limit}'.format(limit=limit)
324
325        return db_conn.query(('SELECT {fields} FROM {table} WHERE {where}{ordering}{limit}').format(
326            table=sql_table_name,
327            fields=','.join(get_fields(fields, stat)),
328            where=' AND '.join(clauses),
329            ordering=sql_ordering,
330            limit=sql_limit),
331                             **bindvars)
332
333    elif db_conn.engine is DatabaseEngine.SQLITE:
334        # be warned this returns the rn value as an additional unwanted field.
335        # It's not easy to get rid of it efficiently
336        return db_conn.query('SELECT {fields}, (SELECT count(*) FROM {table} t2 WHERE {where2}) rn '
337                        'FROM {table} t1 '
338                        'WHERE {where} '
339                        'GROUP BY rn%{modulus} '
340                        '{ordering}'.format(
341                    fields=','.join(get_fields(fields, stat)),
342                    table=sql_table_name,
343                    where=' AND '.join(clauses),
344                    where2=' AND '.join(clauses + ['t2.INGESTION_TIME<t1.INGESTION_TIME']),
345                    ordering=sql_ordering,
346                    modulus=modulus),
347                **bindvars)
348
349    else:
350        grouping = 'cast(counter/{ratio} as integer)'.format(ratio=modulus)
351        # all_fields - fields for the inner query
352        # Inner query only selected field - ordering doesn't matter
353        inner_fields = list(set((f for f in get_inner_fields(fields, stat))))
354
355        if 'INGESTION_TIME' not in inner_fields:
356            inner_fields.append('INGESTION_TIME')
357
358        if region is not None or stat is not None:
359            if 'ROWCOUNT' not in inner_fields:
360                inner_fields.append('ROWCOUNT')
361
362        return db_conn.query(
363            ('SELECT '
364             ' {fields}'
365             '  FROM ( '
366             '    SELECT {inner_fields}, row_number() over (ORDER BY INGESTION_TIME) AS counter '
367             '    FROM {table} '
368             '    WHERE {where} '
369             '    ORDER BY INGESTION_TIME) q '
370             ' GROUP BY {grouping} ORDER BY min(INGESTION_TIME) '
371            ).format(
372                    fields=','.join(get_fields(fields, stat)),
373                    table=sql_table_name,
374                    inner_fields=','.join(inner_fields),
375                    where=' AND '.join(clauses),
376                    grouping=grouping),
377                **bindvars)
378
379
380def find_events(sid,
381                start_time,
382                start_time_eq,
383                stop_time,
384                event_classes,
385                ordering,
386                from_to,
387                properties,
388                count,
389                single,
390                filtering,
391                multitable_extra_params,
392                user: User = None):
393
394    if from_to is not None:
395        page_start, page_stop = from_to
396        # print(f"Page start: {page_start}, Page stop: {page_stop}")
397
398    limit = 1 if single and start_time_eq is None else None
399    # print(f"Limit: {limit}")
400
401    events = []
402    event_count = 0
403
404    table_info = TableInfo(TABLE, sid=sid, fast_load=True)
405
406    if not single and count:
407        # just return count of events
408        # Note: 'PRODUCTS' table has 'sensing_start' rather than 'sensing_time' as key
409        # so use db.query to get count
410        db_conn = db_connect(table_info)
411
412        clauses = [sid.sql_where()]
413        bindvars = {}
414
415        if start_time is not None:
416            clauses.append('sensing_start>=:start_time')
417            bindvars['start_time'] = start_time
418
419        if stop_time is not None:
420            clauses.append('sensing_start<:stop_time')
421            bindvars['stop_time'] = stop_time
422
423        event_count =  db_conn.query('SELECT count(*) FROM {table} WHERE {where}'.format(
424                table=table_info.storage_table.name,
425                where=' AND '.join(clauses)), **bindvars).fetchone()[0]
426
427        return event_count, events
428
429    # print('Table info: ', table_info)
430
431    fields = ('ID', 'ACTIVITY', 'FILENAME', 'INGESTION_TIME', 'RESULT')
432
433    # print(f"Running ts.select with sid: {sid}, start_time: {start_time}, stop_time: {stop_time}, start_time_eq: {start_time_eq}, table_info: {table_info}, fields: {fields}, ordering: {'ID'}, limit: {limit}")
434
435    for row in custom_select(sid=sid, sensing_start=start_time, sensing_stop=stop_time, table_info=table_info, fields=fields, ordering='ID', limit=limit):
436        # print(f"Processing row: {row}")
437        product_id, activity, filename, ingest_time, result = row
438        inst_properties = {
439            'SID': sid,        
440            'ID': product_id,
441            'ACTIVITY': activity,
442            'FILENAME': filename,
443            'INGESTION-TIME': ingest_time,
444            'RESULT': result
445        }
446
447        # print(f"Inst properties: {inst_properties}")
448
449        if not include_event_check(properties, inst_properties):
450            # print("Properties not included in inst_properties. Skipping row.")
451            continue
452
453        event_count += 1
454        # print(f"Event count: {event_count}")
455
456        if not count and (from_to is None or (page_start <= event_count and event_count <= page_stop)):
457            # print("Appending new event.")
458            events.append(Event(sid=sid, event_classname=EVENT_CLASSNAME, start_time=ingest_time, instance_properties=inst_properties))
459        if single:
460            # print("Single is True. Breaking loop.")
461            break
462
463    # print(f"Returning event count: {event_count}, and events: {events}")
464    return event_count, events