1#!/usr/bin/env python3
  2
  3r"""This module contains functions to retrieve timeseries data, either individual
  4data points (the `retrieve_all_points`) or from orbital statistics tables
  5(the `retrieve`) function.
  6
  7The return values from all functions are a dictionary of dictionaries.
  8For example:
  9
 10>> from datetime import datetime
 11>> from chart.common.tables import TableInfo
 12>> from chart.project import SID
 13>> sid = SID('M02')
 14>> start = datetime(2010,3,26)
 15>> stop = datetime(2010,3,26,0,0,3)
 16>> retrieve(retrieve_all_points, sid=sid, sensing_start=start, sensing_stop=stop,\
 17    datapoints=[TableInfo('MHS_NEDT').fields['NEDT_H1']])  # doctest: +ELLIPSIS
 18{<chart.common.tables.FieldInfo object at ...>: {'gap_threshold': datetime.timedelta(0, 5, 333334),\
 19 'values': array([...]), 'times': [datetime.datetime(2010, 3, 26, 0, 0, 1, 300000)]}}
 20
 21Where the '...' should look something like::
 22
 23    {'NEDT_H1':
 24      {'sid': sid,
 25       'tablename': 'MHS-NEDT',
 26       'param': 'NEDT_H1',
 27       'description': 'NEDT for H1'
 28       'unit': 'K',
 29       'times': [...],
 30       'orbits': [...],
 31       'rowcounts': [...],
 32       'mins': [...],
 33       'avgs': [...],
 34       'maxs': [...],
 35       'stds': [...],
 36       'sensing_start': start,
 37       'sensing_stop': stop,
 38       'stats_type': 'orbital'
 39      }
 40    }
 41
 42(doctest disabled due to database access)
 43
 44The dictionaries can be passed to the graphing functions
 45in the CHART `graph` module.
 46"""
 47
 48import logging
 49import itertools
 50from datetime import timedelta
 51
 52import numpy as np
 53
 54from chart.db import ts
 55from chart.project import SID
 56from chart.common.util import timedelta_div
 57from chart.db.model.table import TableStorage
 58from chart.db.model.field import RowcountFieldInfo
 59from chart.db.func import SensingTime
 60from chart.products.fdf.orbit import NoSuchOrbit
 61from chart.plots.sampling import Sampling
 62from chart.plots.render import Series
 63from chart.common.util import nvl_min
 64from chart.common.util import nvl_max
 65from chart.common.exceptions import ConfigError
 66from chart.events.db import find_events
 67from chart.plots.sampling import Sampling
 68from chart.db.func import Func
 69
 70logger = logging.getLogger()
 71
 72# if oversampling is used (stats plot, no trim, not disabled by widget config)
 73# we extend sensing start/stop by a proportion of the normal amount, subject to a minimum
 74# expansion. This is to avoid seeing fake gaps at the edges of the graph from 'missing' stats
 75MIN_OVERSAMPLE = timedelta(hours=2)
 76
 77OrbitParam = Func()
 78OrbitParam.field = 'ORBIT'
 79def orbit_sql_stat(_):
 80    return 'ORBIT'
 81OrbitParam.sql_stat = orbit_sql_stat
 82def orbit_sql():
 83    return 'ORBIT'
 84OrbitParam.sql = orbit_sql
 85
 86def retrieve_series(sid,
 87                    sensing_start,
 88                    sensing_stop,
 89                    datapoint,
 90                    calibrated,
 91                    sampling,
 92                    rowcount_threshold,
 93                    time_limits):
 94    """After parsing and interpreting all the user requests, perform the actual
 95    data retrieval from timeseries tables or event properties.
 96    This function should be split into sub functions, possibly in a different module,
 97    and drop the retrieval module.
 98
 99    Args:
100    `datapoints` (dict of dict): Config elements requested by report template
101    """
102    if sampling is not Sampling.ALL_POINTS and 'constraint' in datapoint:
103        raise ConfigError('Constraints can only be used for all-points plot')
104
105    # event count / properties plot
106    if 'event' in datapoint:
107        # here we retrieve numeric event properties
108        if 'field' in datapoint:
109            raise ConfigError('Cannot specify both field and event in a single datapoint')
110
111        times = []
112        values = []
113        gaps = []
114        acc = 1  # keep track of event count. Only used with <accumulate> option
115
116        for e in find_events(sid=sid,
117                             start_time=sensing_start,
118                             stop_time=sensing_stop,
119                             event_class=datapoint['event']['eventclass'],
120                             ordering='start_time'):
121            if datapoint['event']['property'] is not None:
122                # Read the requested numeric/timedelta value from each event
123                value = e.instance_properties.get(datapoint['event']['property']['name'])
124
125            else:
126                # no property specified so we will just plot event positions
127                # be assigning them an arbitrary value of 1
128                if datapoint['accumulate']:
129                    value = acc
130                    acc += 1
131
132                else:
133                    value = 1
134
135            # Strip out null values
136            if value is None:
137                continue
138
139            # Keep a list of all gap durations between events
140            if len(times) > 0:
141                gaps.append((e.start_time - times[-1]).total_seconds())
142
143            # Record the new data
144            times.append(e.start_time)
145            values.append(value)
146
147        # Convert timedeltas to a count of seconds
148        if datapoint['event']['property'] is not None and\
149           datapoint['event']['property']['type'] == 'duration':
150            values = [v.total_seconds() for v in values]
151
152        # Compute gap threshold as 2 * stddev(gaps)
153        # If any pair of events has a time delta greater than this it is plotted
154        # as a gap.
155        if len(gaps) > 0:
156            gaps = np.array(gaps)
157            gap_threshold = timedelta(seconds=gaps.mean() + gaps.std() * 2)
158
159        else:
160            # gap threshold is nominal as there are no gaps
161            gap_threshold = timedelta(days=1)
162
163        new_data = Series(times=times,
164                          values=np.array(values),
165                          gap_threshold=gap_threshold)
166
167    # table rowcount plot
168    elif datapoint['field'] is None:
169        times = [row[0] for row in ts.select(
170                table=datapoint['table'],
171                fields=('SENSING_TIME',),
172                sid=sid,
173                sensing_start=sensing_start,
174                sensing_stop=sensing_stop)]
175
176        if datapoint['accumulate']:
177            values = np.arange(1, len(times) + 1)
178
179        else:
180            values = np.ones((len(times),))
181
182        new_data = Series(times=times,
183                          values=values,
184                          gap_threshold=datapoint['table'].period * 2)
185
186    # elif datapoint['field'].name.upper() == 'ROWCOUNT':
187        # here we retrieve coverage data
188        # note, scao17_daily_avg may need special code
189        # if datapoint['field'].table.name == 'SCAO17_DAILY_AVG':
190            # new_data = retrieve_daycounts(sid=sid,
191                                                   # sensing_start=sensing_start,
192                                                   # sensing_stop=sensing_stop,
193                                                   # table=datapoint['field'].table)
194
195        # else:
196            # new_data = retrieve_rowcounts(sid=sid,
197                                                   # sensing_start=sensing_start,
198                                                   # sensing_stop=sensing_stop,
199                                                   # table=datapoint['field'].table)
200
201        # new_data.values = new_data.rowcounts
202
203    elif sampling is Sampling.ALL_POINTS or sampling is Sampling.FIT:
204        # here we retrieve all points data
205        new_datas = retrieve_all_points(
206            sid,
207            sensing_start,
208            sensing_stop,
209            [datapoint['field']],
210            calibrated=calibrated,
211            constraints=datapoint.get('constraint'))
212        new_data = new_datas[datapoint['field']]
213
214        if len(new_data.values) > 0:
215            # all-points data
216            time_limits.start = nvl_min(time_limits.start, new_data.times[0])
217            time_limits.stop = nvl_max(time_limits.stop, new_data.times[-1])
218
219    elif sampling.stats:
220        # retrieve stats data
221        new_data = retrieve_stats(
222            sid,
223            sensing_start,
224            sensing_stop,
225            [datapoint['field']],
226            sampling=sampling,
227            calibrated=calibrated,
228            rowcount_threshold=rowcount_threshold)[datapoint['field']]
229
230        if len(new_data.avgs) > 0:
231            # stats data
232            time_limits.start = nvl_min(time_limits.start, min(new_data.times))
233            time_limits.stop = nvl_max(time_limits.stop, max(new_data.times))
234
235    else:
236        raise ConfigError('Could not render graph (sampling {sampling})'.format(
237                sampling=sampling))
238
239    return new_data
240
241
242def round_time(dt, round_to):
243    """Floor a datetime object to time interval border in seconds.
244
245    Arg:
246        `dt` (datetime): datetime.datetime object
247        `round_to` (int): Closest number of seconds to round to
248    """
249
250    seconds = (dt - dt.min).seconds
251    # // is a floor division, not a comment on following line:
252    rounding = (seconds) // round_to * round_to
253    return dt + timedelta(0, rounding - seconds, -dt.microsecond)
254
255
256def retrieve_stats(sid,
257                   sensing_start,
258                   sensing_stop,
259                   datapoints,
260                   sampling=None,
261                   calibrated=True,
262                   rowcount_threshold=0.7,
263                   read_orbits=False):
264    """Retrieve stats data."""
265    if sampling is None:
266        # hack for GSAR to use 30min stats for <15day report, else daily stats
267        if SID.__name__ == 'SID_GSAR':
268            if (sensing_stop - sensing_start).days < 15:
269                sampling = Sampling.HALF_HOURLY
270
271            else:
272                sampling = Sampling.DAILY
273
274        else:
275            sampling = sid.auto_sampling_options()[-1]  # wild guess - take the biggest stats region
276
277    # initial read (should come from index only) to get times and for sizing
278    times = [r[0] for r in ts.select(
279        table=datapoints[0].table,
280        fields=(SensingTime, datapoints[0]),
281        # keynum=datapoints[0],
282        # sensing_time=datapoints[0],
283        sid=sid,
284        sensing_start=sensing_start,
285        sensing_stop=sensing_stop,
286        region=sampling,
287        stat='MIN',
288        rowcount_threshold=rowcount_threshold)]# if r[1] is not None]
289
290    # rowcount = len(times)
291    # logging.debug('Found {cc} time rows'.format(cc=rowcount))
292
293    # This field_one code is archaic and written for the EPS Intelliplot widget
294    # It was probaly a bad idea and should be removed
295    # It's fairly harmless though it just makes this function look more complex than it is
296    if sampling.field is not None:
297        field_one = sampling.field  # ?
298
299    elif read_orbits:
300        field_one = OrbitParam  # needed by EPS intelliplot and probably nothing else
301
302    else:
303        # Dummy value not used
304        field_one = RowcountFieldInfo(field=datapoints[0])
305
306    # Very special hack to speed up Oracle retrievals of long, full mission orbital stats
307    # plots. `pragma` is only used for flat table Oracle plots and ignored otherwise
308    # This speeds up the AMSUA1 and HIRS trending reports on M01 by about 4x
309    pragma = None
310    if sampling is Sampling.ORBITAL and (sensing_stop - sensing_start) > timedelta(days=3500):
311        pragma = 'PARALLEL(4) NO_INDEX'
312
313    # Standard set of arguments for every call to select() below
314    select_arguments = {
315        'sid': sid,
316        'sensing_start': sensing_start,
317        'sensing_stop': sensing_stop,
318        'table': datapoints[0].table,
319        'region': sampling,
320        'calibrated': calibrated,
321        'pragma': pragma,
322        'rowcount_threshold': rowcount_threshold
323    }
324    # cursor_min = ts.select(stat='MIN',
325    #                         fields=(field_one, 'ROWCOUNT', datapoints[0]),
326    #                         **select_arguments)
327
328    select_arguments['stat'] = 'MIN'
329    select_arguments['fields'] = [field_one, RowcountFieldInfo(field=datapoints[0])] + datapoints
330    cursor_min = ts.select(**select_arguments)
331    cursor_min = list(cursor_min)
332
333    select_arguments['stat'] = 'MAX'
334    select_arguments['fields'] = datapoints
335    cursor_max = ts.select(**select_arguments)
336    cursor_max = list(cursor_max)
337
338    select_arguments['stat'] = 'AVG'
339    cursor_avg = ts.select(**select_arguments)
340    cursor_avg = list(cursor_avg)
341
342    select_arguments['stat'] = 'STD'
343    cursor_std = ts.select(**select_arguments)
344    cursor_std = list(cursor_std)
345
346    cursor = [i[0] + i[1] + i[2] + i[3] for i in itertools.zip_longest(
347        cursor_min, cursor_max, cursor_avg, cursor_std, fillvalue=(None,))]
348
349    dtype = [('o', 'u4'), ('r', 'u4')]
350    for s in ('mn', 'mx', 'av', 'st'):
351        for c in range(len(datapoints)):
352            dtype += (('{s}{c}'.format(s=s, c=c), 'f4'), )
353            # dtype.extend([('mn{c}'.format(c=c), 'f4'),
354                          # ('mx{c}'.format(c=c), 'f4'),
355                          # ('av{c}'.format(c=c), 'f4'),
356                          # ('st{c}'.format(c=c), 'f4')])
357
358    # handle the case of retrieval from a vector stats table with no stds
359    # if len(cursor_std) == 0 and len(cursor_avg) > 0:
360        # vals = []
361
362    # else:
363    vals = np.fromiter(cursor, dtype=dtype, count=len(times))
364        # dtype=[('o', 'u4'), ('r', 'u4'), ('mn', 'f4'), ('mx', 'f4'), ('av', 'f4'), ('st', 'f4')],
365        # dtype=[('r','u4'),('mn','f8'),('mx','f8'),('av','f8'),('st','f8')],
366
367    if datapoints[0].table.regular:
368        if sampling.nominal_duration is not None:
369            gap_threshold = max(sampling.nominal_duration * 2,
370                                datapoints[0].table.period * 2)
371
372        elif sampling is Sampling.ORBITAL:
373            gap_threshold = sid.satellite.orbit_duration * 2
374
375    else:
376        gap_threshold = None
377
378    # parent function doesn't actually call us with multiple fields
379    result = {}
380    for c, dp in enumerate(datapoints):
381        result[dp] = Series(times=times,
382                            orbits=vals['o'],
383                            rowcounts=vals['r'],
384                            mins=vals['mn{c}'.format(c=c)],
385                            maxs=vals['mx{c}'.format(c=c)],
386                            avgs=vals['av{c}'.format(c=c)],
387                            stds=vals['st{c}'.format(c=c)],
388                            gap_threshold=gap_threshold,
389                            sampling=sampling)
390
391    return result
392
393
394def retrieve_all_points(sid,
395                        sensing_start,
396                        sensing_stop,
397                        datapoints,
398                        calibrated=True,
399                        constraints=None):
400    """Retrieve all points from the specified `param_names` in `table_name`."""
401
402    # ts.query(fields[0].table, fields, sid, sensing_start, sensing_stop, calibrated)
403
404    # logging.info('Reading from ' + table_name + ': ' + ','.join(param_names))
405
406    # can we use a single numpy structure here, possibly with embedded native timestamps?
407    # if times is None:
408    # allow multiple calls to retrieve_all_points to cache the times
409    # for performance
410
411    # gap_threshold = fields[0].table.period * 2
412
413    fields = ['SENSING_TIME']
414    # In key-value case we always need a datapoint to pick relevant time points
415    if datapoints[0].table.storage in (TableStorage.KEYVALUE, TableStorage.JSONB):
416        fields.extend(datapoints)
417
418    data_cursor = ts.select(
419        table=datapoints[0].table,
420        fields=fields,
421        sid=sid,
422        sensing_start=sensing_start,
423        sensing_stop=sensing_stop,
424        calibrated=calibrated)
425
426    if constraints is not None:
427        data_cursor = ts.retrieve_with_constraints(
428            sid=sid,
429            sensing_start=sensing_start,
430            sensing_stop=sensing_stop,
431            data_cursor=data_cursor,
432            constraints=constraints)
433
434    times = [row[0] for row in data_cursor]
435
436    data = np.empty((len(times), len(datapoints)), dtype=np.double)
437
438    if constraints is None:
439        cur = ts.select(
440                table=datapoints[0].table,
441                fields=datapoints,  # prepend timestamp for constraint matching
442                sid=sid,
443                sensing_start=sensing_start,
444                sensing_stop=sensing_stop,
445                calibrated=calibrated)
446        # logging.debug('AP retrieve cursor {sql}'.format(sql=cur.statement))
447        # import itertools
448        # from chart.common.xml import datetime_to_xml
449        # if datapoints[0].name == 'C1200HD':
450            # h = open(datapoints[0].name + '.csv', 'w')
451
452        for i, row in enumerate(cur):
453            data[i, :] = row
454
455            # if datapoints[0].name == 'C1200HD':
456                # h.write(str(row[0]) + '\n')
457        # if datapoints[0].name == 'C1200HD':
458            # h.close()
459
460    else:
461        raw_cursor = ts.select(
462            table=datapoints[0].table,
463            fields=['SENSING_TIME'] + datapoints,  # prepend timestamp for constraint matching
464            sid=sid,
465            sensing_start=sensing_start,
466            sensing_stop=sensing_stop,
467            calibrated=calibrated)
468
469        data_cursor = ts.retrieve_with_constraints(sid,
470                                                      sensing_start,
471                                                      sensing_stop,
472                                                      raw_cursor,
473                                                      constraints)
474
475        for i, row in enumerate(data_cursor):
476            data[i, :] = row[1:]  # skip prepended timestamp
477
478    result = {}
479    for cc, field in enumerate(datapoints):
480        if datapoints[cc].table.regular:
481            if datapoints[cc].table.period is not None:
482                # a data gap of >2 times the nominal period is shows as a break in the line
483                max_gap = datapoints[cc].table.period * 2
484            else:
485                # with no nominal period we use one long day instead
486                max_gap = timedelta(hours=25)
487        else:
488            max_gap = None
489
490        result[field] = Series(field=field,
491                times=times,
492                gap_threshold=max_gap,
493                values=data[:, cc],
494                calibrated=calibrated and field.cal.get_cal(sid))
495
496    return result
497
498
499def retrieve_rowcounts(sid,
500                       sensing_start,
501                       sensing_stop,
502                       table):
503    """Retrieve data from split stats tables."""
504    logging.debug('Retrieve rowcounts for {sid} from {start} to {stop} table {table}'.format(
505            sid=sid,
506            start=sensing_start,
507            stop=sensing_stop,
508            table=table))
509
510    sparse = []  # time, orbit, rowcount
511
512    if table.has_stats:
513        # read rowcounts of most tables
514        for sensing_time, orbit, rowcount in ts.select_stats(
515            table=table,
516            fields=[SensingTime,
517                    'ORBIT',
518                    RowcountFieldInfo(table=table)],
519            sid=sid,
520            sensing_start=sensing_start,
521            sensing_stop=sensing_stop,
522            types=['min'])['min']:
523
524            # times.append(sensing_time)
525            # raw_ints = np.concatenate((raw_ints, np.array([[rowcount, orbit]], dtype=np.int)))
526            sparse.append((sensing_time, orbit, rowcount))
527
528    else:
529        # return rowcounts for SCAO17_DAILY_AVG and any others with no stats
530        for sensing_time, in ts.select(
531            table,
532            [SensingTime],
533            sid,
534            sensing_start,
535            sensing_stop):
536
537            # times.append(sensing_time)
538            # orbit = sid.orbit.find_orbit(sensing_time)
539            # raw_ints = np.concatenate((raw_ints, np.array([[1, orbit]], dtype=np.int)))
540
541            sparse.append((sensing_time, sid.orbit.find_orbit(sensing_time), 1))
542
543        # return {'times': times,
544                # 'orbits': raw_ints[:, 1],
545                # 'rowcounts': raw_ints[:, 0]}
546
547    first_orbit = sid.orbit.find_orbit(sensing_start)
548    last_orbit = sid.orbit.find_orbit(sensing_stop)
549
550    cc = last_orbit - first_orbit + 1  # total number of record we shall return
551    times = [None] * cc
552    orbits = np.empty((cc, ), dtype=np.int)
553    rowcounts = np.empty((cc, ), dtype=np.int)
554
555    sparse_ptr = 0
556    res_ptr = 0
557    for orbit in range(first_orbit, last_orbit + 1):
558        if sparse_ptr < len(sparse) and orbit == sparse[sparse_ptr][1]:
559            times[res_ptr] = sparse[sparse_ptr][0]
560            orbits[res_ptr] = orbit
561            rowcounts[res_ptr] = sparse[sparse_ptr][2]
562            sparse_ptr += 1
563
564        else:
565            try:
566                times[res_ptr] = sid.orbit.get_orbit_times(orbit)[0]
567            except NoSuchOrbit:
568                continue
569
570            orbits[res_ptr] = orbit
571            rowcounts[res_ptr] = 0
572
573        res_ptr += 1
574
575    times = times[:res_ptr]
576    orbits = orbits[:res_ptr]
577    rowcounts = rowcounts[:res_ptr]
578
579    return Series(times=times,
580                  orbits=orbits,
581                  gap_threshold=sid.satellite.orbit_duration,
582                  rowcounts=rowcounts)
583
584
585def retrieve_daycounts(sid,
586                       sensing_start,
587                       sensing_stop,
588                       table):
589    """Read one row per day.
590
591    Dangerous for any table except scao17_daily_avg."""
592
593    logging.info('Retrieve daycounts for {t}'.format(t=table))
594
595    res = {}  # day against (orbit, rowcount)
596
597    for sensing_time, in ts.select(table,
598                                     ['SENSING_TIME'],
599                                     sid,
600                                     sensing_start,
601                                     sensing_stop):
602        date = sensing_time.date()
603        if date not in res:
604            res[date] = 1
605
606        else:
607            res[date] += 1
608
609    days_count = int(timedelta_div(sensing_stop - sensing_start,
610                                   timedelta(days=1)))
611    times = [None] * days_count
612    orbits = np.empty((days_count, ), dtype=np.int)
613    rowcounts = np.empty((days_count, ), dtype=np.int)
614
615    for i in range(days_count):
616        date = sensing_start.date() + timedelta(days=i)
617        times[i] = date
618        orbits[i] = sid.orbit.find_orbit(date)
619        rowcounts[i] = res.get(date, 0)
620
621    return Series(times=times,
622                  orbits=orbits,
623                  gap_threshold=timedelta(days=1),
624                  rowcounts=rowcounts)