1#!/usr/bin/env python3
2
3r"""This module contains functions to retrieve timeseries data, either individual
4data points (the `retrieve_all_points`) or from orbital statistics tables
5(the `retrieve`) function.
6
7The return values from all functions are a dictionary of dictionaries.
8For example:
9
10>> from datetime import datetime
11>> from chart.common.tables import TableInfo
12>> from chart.project import SID
13>> sid = SID('M02')
14>> start = datetime(2010,3,26)
15>> stop = datetime(2010,3,26,0,0,3)
16>> retrieve(retrieve_all_points, sid=sid, sensing_start=start, sensing_stop=stop,\
17 datapoints=[TableInfo('MHS_NEDT').fields['NEDT_H1']]) # doctest: +ELLIPSIS
18{<chart.common.tables.FieldInfo object at ...>: {'gap_threshold': datetime.timedelta(0, 5, 333334),\
19 'values': array([...]), 'times': [datetime.datetime(2010, 3, 26, 0, 0, 1, 300000)]}}
20
21Where the '...' should look something like::
22
23 {'NEDT_H1':
24 {'sid': sid,
25 'tablename': 'MHS-NEDT',
26 'param': 'NEDT_H1',
27 'description': 'NEDT for H1'
28 'unit': 'K',
29 'times': [...],
30 'orbits': [...],
31 'rowcounts': [...],
32 'mins': [...],
33 'avgs': [...],
34 'maxs': [...],
35 'stds': [...],
36 'sensing_start': start,
37 'sensing_stop': stop,
38 'stats_type': 'orbital'
39 }
40 }
41
42(doctest disabled due to database access)
43
44The dictionaries can be passed to the graphing functions
45in the CHART `graph` module.
46"""
47
48import logging
49import itertools
50from datetime import timedelta
51
52import numpy as np
53
54from chart.db import ts
55from chart.project import SID
56from chart.common.util import timedelta_div
57from chart.db.model.table import TableStorage
58from chart.db.model.field import RowcountFieldInfo
59from chart.db.func import SensingTime
60from chart.products.fdf.orbit import NoSuchOrbit
61from chart.plots.sampling import Sampling
62from chart.plots.render import Series
63from chart.common.util import nvl_min
64from chart.common.util import nvl_max
65from chart.common.exceptions import ConfigError
66from chart.events.db import find_events
67from chart.plots.sampling import Sampling
68from chart.db.func import Func
69
70logger = logging.getLogger()
71
72# if oversampling is used (stats plot, no trim, not disabled by widget config)
73# we extend sensing start/stop by a proportion of the normal amount, subject to a minimum
74# expansion. This is to avoid seeing fake gaps at the edges of the graph from 'missing' stats
75MIN_OVERSAMPLE = timedelta(hours=2)
76
77OrbitParam = Func()
78OrbitParam.field = 'ORBIT'
79def orbit_sql_stat(_):
80 return 'ORBIT'
81OrbitParam.sql_stat = orbit_sql_stat
82def orbit_sql():
83 return 'ORBIT'
84OrbitParam.sql = orbit_sql
85
86def retrieve_series(sid,
87 sensing_start,
88 sensing_stop,
89 datapoint,
90 calibrated,
91 sampling,
92 rowcount_threshold,
93 time_limits):
94 """After parsing and interpreting all the user requests, perform the actual
95 data retrieval from timeseries tables or event properties.
96 This function should be split into sub functions, possibly in a different module,
97 and drop the retrieval module.
98
99 Args:
100 `datapoints` (dict of dict): Config elements requested by report template
101 """
102 if sampling is not Sampling.ALL_POINTS and 'constraint' in datapoint:
103 raise ConfigError('Constraints can only be used for all-points plot')
104
105 # event count / properties plot
106 if 'event' in datapoint:
107 # here we retrieve numeric event properties
108 if 'field' in datapoint:
109 raise ConfigError('Cannot specify both field and event in a single datapoint')
110
111 times = []
112 values = []
113 gaps = []
114 acc = 1 # keep track of event count. Only used with <accumulate> option
115
116 for e in find_events(sid=sid,
117 start_time=sensing_start,
118 stop_time=sensing_stop,
119 event_class=datapoint['event']['eventclass'],
120 ordering='start_time'):
121 if datapoint['event']['property'] is not None:
122 # Read the requested numeric/timedelta value from each event
123 value = e.instance_properties.get(datapoint['event']['property']['name'])
124
125 else:
126 # no property specified so we will just plot event positions
127 # be assigning them an arbitrary value of 1
128 if datapoint['accumulate']:
129 value = acc
130 acc += 1
131
132 else:
133 value = 1
134
135 # Strip out null values
136 if value is None:
137 continue
138
139 # Keep a list of all gap durations between events
140 if len(times) > 0:
141 gaps.append((e.start_time - times[-1]).total_seconds())
142
143 # Record the new data
144 times.append(e.start_time)
145 values.append(value)
146
147 # Convert timedeltas to a count of seconds
148 if datapoint['event']['property'] is not None and\
149 datapoint['event']['property']['type'] == 'duration':
150 values = [v.total_seconds() for v in values]
151
152 # Compute gap threshold as 2 * stddev(gaps)
153 # If any pair of events has a time delta greater than this it is plotted
154 # as a gap.
155 if len(gaps) > 0:
156 gaps = np.array(gaps)
157 gap_threshold = timedelta(seconds=gaps.mean() + gaps.std() * 2)
158
159 else:
160 # gap threshold is nominal as there are no gaps
161 gap_threshold = timedelta(days=1)
162
163 new_data = Series(times=times,
164 values=np.array(values),
165 gap_threshold=gap_threshold)
166
167 # table rowcount plot
168 elif datapoint['field'] is None:
169 times = [row[0] for row in ts.select(
170 table=datapoint['table'],
171 fields=('SENSING_TIME',),
172 sid=sid,
173 sensing_start=sensing_start,
174 sensing_stop=sensing_stop)]
175
176 if datapoint['accumulate']:
177 values = np.arange(1, len(times) + 1)
178
179 else:
180 values = np.ones((len(times),))
181
182 new_data = Series(times=times,
183 values=values,
184 gap_threshold=datapoint['table'].period * 2)
185
186 # elif datapoint['field'].name.upper() == 'ROWCOUNT':
187 # here we retrieve coverage data
188 # note, scao17_daily_avg may need special code
189 # if datapoint['field'].table.name == 'SCAO17_DAILY_AVG':
190 # new_data = retrieve_daycounts(sid=sid,
191 # sensing_start=sensing_start,
192 # sensing_stop=sensing_stop,
193 # table=datapoint['field'].table)
194
195 # else:
196 # new_data = retrieve_rowcounts(sid=sid,
197 # sensing_start=sensing_start,
198 # sensing_stop=sensing_stop,
199 # table=datapoint['field'].table)
200
201 # new_data.values = new_data.rowcounts
202
203 elif sampling is Sampling.ALL_POINTS or sampling is Sampling.FIT:
204 # here we retrieve all points data
205 new_datas = retrieve_all_points(
206 sid,
207 sensing_start,
208 sensing_stop,
209 [datapoint['field']],
210 calibrated=calibrated,
211 constraints=datapoint.get('constraint'))
212 new_data = new_datas[datapoint['field']]
213
214 if len(new_data.values) > 0:
215 # all-points data
216 time_limits.start = nvl_min(time_limits.start, new_data.times[0])
217 time_limits.stop = nvl_max(time_limits.stop, new_data.times[-1])
218
219 elif sampling.stats:
220 # retrieve stats data
221 new_data = retrieve_stats(
222 sid,
223 sensing_start,
224 sensing_stop,
225 [datapoint['field']],
226 sampling=sampling,
227 calibrated=calibrated,
228 rowcount_threshold=rowcount_threshold)[datapoint['field']]
229
230 if len(new_data.avgs) > 0:
231 # stats data
232 time_limits.start = nvl_min(time_limits.start, min(new_data.times))
233 time_limits.stop = nvl_max(time_limits.stop, max(new_data.times))
234
235 else:
236 raise ConfigError('Could not render graph (sampling {sampling})'.format(
237 sampling=sampling))
238
239 return new_data
240
241
242def round_time(dt, round_to):
243 """Floor a datetime object to time interval border in seconds.
244
245 Arg:
246 `dt` (datetime): datetime.datetime object
247 `round_to` (int): Closest number of seconds to round to
248 """
249
250 seconds = (dt - dt.min).seconds
251 # // is a floor division, not a comment on following line:
252 rounding = (seconds) // round_to * round_to
253 return dt + timedelta(0, rounding - seconds, -dt.microsecond)
254
255
256def retrieve_stats(sid,
257 sensing_start,
258 sensing_stop,
259 datapoints,
260 sampling=None,
261 calibrated=True,
262 rowcount_threshold=0.7,
263 read_orbits=False):
264 """Retrieve stats data."""
265 if sampling is None:
266 # hack for GSAR to use 30min stats for <15day report, else daily stats
267 if SID.__name__ == 'SID_GSAR':
268 if (sensing_stop - sensing_start).days < 15:
269 sampling = Sampling.HALF_HOURLY
270
271 else:
272 sampling = Sampling.DAILY
273
274 else:
275 sampling = sid.auto_sampling_options()[-1] # wild guess - take the biggest stats region
276
277 # initial read (should come from index only) to get times and for sizing
278 times = [r[0] for r in ts.select(
279 table=datapoints[0].table,
280 fields=(SensingTime, datapoints[0]),
281 # keynum=datapoints[0],
282 # sensing_time=datapoints[0],
283 sid=sid,
284 sensing_start=sensing_start,
285 sensing_stop=sensing_stop,
286 region=sampling,
287 stat='MIN',
288 rowcount_threshold=rowcount_threshold)]# if r[1] is not None]
289
290 # rowcount = len(times)
291 # logging.debug('Found {cc} time rows'.format(cc=rowcount))
292
293 # This field_one code is archaic and written for the EPS Intelliplot widget
294 # It was probaly a bad idea and should be removed
295 # It's fairly harmless though it just makes this function look more complex than it is
296 if sampling.field is not None:
297 field_one = sampling.field # ?
298
299 elif read_orbits:
300 field_one = OrbitParam # needed by EPS intelliplot and probably nothing else
301
302 else:
303 # Dummy value not used
304 field_one = RowcountFieldInfo(field=datapoints[0])
305
306 # Very special hack to speed up Oracle retrievals of long, full mission orbital stats
307 # plots. `pragma` is only used for flat table Oracle plots and ignored otherwise
308 # This speeds up the AMSUA1 and HIRS trending reports on M01 by about 4x
309 pragma = None
310 if sampling is Sampling.ORBITAL and (sensing_stop - sensing_start) > timedelta(days=3500):
311 pragma = 'PARALLEL(4) NO_INDEX'
312
313 # Standard set of arguments for every call to select() below
314 select_arguments = {
315 'sid': sid,
316 'sensing_start': sensing_start,
317 'sensing_stop': sensing_stop,
318 'table': datapoints[0].table,
319 'region': sampling,
320 'calibrated': calibrated,
321 'pragma': pragma,
322 'rowcount_threshold': rowcount_threshold
323 }
324 # cursor_min = ts.select(stat='MIN',
325 # fields=(field_one, 'ROWCOUNT', datapoints[0]),
326 # **select_arguments)
327
328 select_arguments['stat'] = 'MIN'
329 select_arguments['fields'] = [field_one, RowcountFieldInfo(field=datapoints[0])] + datapoints
330 cursor_min = ts.select(**select_arguments)
331 cursor_min = list(cursor_min)
332
333 select_arguments['stat'] = 'MAX'
334 select_arguments['fields'] = datapoints
335 cursor_max = ts.select(**select_arguments)
336 cursor_max = list(cursor_max)
337
338 select_arguments['stat'] = 'AVG'
339 cursor_avg = ts.select(**select_arguments)
340 cursor_avg = list(cursor_avg)
341
342 select_arguments['stat'] = 'STD'
343 cursor_std = ts.select(**select_arguments)
344 cursor_std = list(cursor_std)
345
346 cursor = [i[0] + i[1] + i[2] + i[3] for i in itertools.zip_longest(
347 cursor_min, cursor_max, cursor_avg, cursor_std, fillvalue=(None,))]
348
349 dtype = [('o', 'u4'), ('r', 'u4')]
350 for s in ('mn', 'mx', 'av', 'st'):
351 for c in range(len(datapoints)):
352 dtype += (('{s}{c}'.format(s=s, c=c), 'f4'), )
353 # dtype.extend([('mn{c}'.format(c=c), 'f4'),
354 # ('mx{c}'.format(c=c), 'f4'),
355 # ('av{c}'.format(c=c), 'f4'),
356 # ('st{c}'.format(c=c), 'f4')])
357
358 # handle the case of retrieval from a vector stats table with no stds
359 # if len(cursor_std) == 0 and len(cursor_avg) > 0:
360 # vals = []
361
362 # else:
363 vals = np.fromiter(cursor, dtype=dtype, count=len(times))
364 # dtype=[('o', 'u4'), ('r', 'u4'), ('mn', 'f4'), ('mx', 'f4'), ('av', 'f4'), ('st', 'f4')],
365 # dtype=[('r','u4'),('mn','f8'),('mx','f8'),('av','f8'),('st','f8')],
366
367 if datapoints[0].table.regular:
368 if sampling.nominal_duration is not None:
369 gap_threshold = max(sampling.nominal_duration * 2,
370 datapoints[0].table.period * 2)
371
372 elif sampling is Sampling.ORBITAL:
373 gap_threshold = sid.satellite.orbit_duration * 2
374
375 else:
376 gap_threshold = None
377
378 # parent function doesn't actually call us with multiple fields
379 result = {}
380 for c, dp in enumerate(datapoints):
381 result[dp] = Series(times=times,
382 orbits=vals['o'],
383 rowcounts=vals['r'],
384 mins=vals['mn{c}'.format(c=c)],
385 maxs=vals['mx{c}'.format(c=c)],
386 avgs=vals['av{c}'.format(c=c)],
387 stds=vals['st{c}'.format(c=c)],
388 gap_threshold=gap_threshold,
389 sampling=sampling)
390
391 return result
392
393
394def retrieve_all_points(sid,
395 sensing_start,
396 sensing_stop,
397 datapoints,
398 calibrated=True,
399 constraints=None):
400 """Retrieve all points from the specified `param_names` in `table_name`."""
401
402 # ts.query(fields[0].table, fields, sid, sensing_start, sensing_stop, calibrated)
403
404 # logging.info('Reading from ' + table_name + ': ' + ','.join(param_names))
405
406 # can we use a single numpy structure here, possibly with embedded native timestamps?
407 # if times is None:
408 # allow multiple calls to retrieve_all_points to cache the times
409 # for performance
410
411 # gap_threshold = fields[0].table.period * 2
412
413 fields = ['SENSING_TIME']
414 # In key-value case we always need a datapoint to pick relevant time points
415 if datapoints[0].table.storage in (TableStorage.KEYVALUE, TableStorage.JSONB):
416 fields.extend(datapoints)
417
418 data_cursor = ts.select(
419 table=datapoints[0].table,
420 fields=fields,
421 sid=sid,
422 sensing_start=sensing_start,
423 sensing_stop=sensing_stop,
424 calibrated=calibrated)
425
426 if constraints is not None:
427 data_cursor = ts.retrieve_with_constraints(
428 sid=sid,
429 sensing_start=sensing_start,
430 sensing_stop=sensing_stop,
431 data_cursor=data_cursor,
432 constraints=constraints)
433
434 times = [row[0] for row in data_cursor]
435
436 data = np.empty((len(times), len(datapoints)), dtype=np.double)
437
438 if constraints is None:
439 cur = ts.select(
440 table=datapoints[0].table,
441 fields=datapoints, # prepend timestamp for constraint matching
442 sid=sid,
443 sensing_start=sensing_start,
444 sensing_stop=sensing_stop,
445 calibrated=calibrated)
446 # logging.debug('AP retrieve cursor {sql}'.format(sql=cur.statement))
447 # import itertools
448 # from chart.common.xml import datetime_to_xml
449 # if datapoints[0].name == 'C1200HD':
450 # h = open(datapoints[0].name + '.csv', 'w')
451
452 for i, row in enumerate(cur):
453 data[i, :] = row
454
455 # if datapoints[0].name == 'C1200HD':
456 # h.write(str(row[0]) + '\n')
457 # if datapoints[0].name == 'C1200HD':
458 # h.close()
459
460 else:
461 raw_cursor = ts.select(
462 table=datapoints[0].table,
463 fields=['SENSING_TIME'] + datapoints, # prepend timestamp for constraint matching
464 sid=sid,
465 sensing_start=sensing_start,
466 sensing_stop=sensing_stop,
467 calibrated=calibrated)
468
469 data_cursor = ts.retrieve_with_constraints(sid,
470 sensing_start,
471 sensing_stop,
472 raw_cursor,
473 constraints)
474
475 for i, row in enumerate(data_cursor):
476 data[i, :] = row[1:] # skip prepended timestamp
477
478 result = {}
479 for cc, field in enumerate(datapoints):
480 if datapoints[cc].table.regular:
481 if datapoints[cc].table.period is not None:
482 # a data gap of >2 times the nominal period is shows as a break in the line
483 max_gap = datapoints[cc].table.period * 2
484 else:
485 # with no nominal period we use one long day instead
486 max_gap = timedelta(hours=25)
487 else:
488 max_gap = None
489
490 result[field] = Series(field=field,
491 times=times,
492 gap_threshold=max_gap,
493 values=data[:, cc],
494 calibrated=calibrated and field.cal.get_cal(sid))
495
496 return result
497
498
499def retrieve_rowcounts(sid,
500 sensing_start,
501 sensing_stop,
502 table):
503 """Retrieve data from split stats tables."""
504 logging.debug('Retrieve rowcounts for {sid} from {start} to {stop} table {table}'.format(
505 sid=sid,
506 start=sensing_start,
507 stop=sensing_stop,
508 table=table))
509
510 sparse = [] # time, orbit, rowcount
511
512 if table.has_stats:
513 # read rowcounts of most tables
514 for sensing_time, orbit, rowcount in ts.select_stats(
515 table=table,
516 fields=[SensingTime,
517 'ORBIT',
518 RowcountFieldInfo(table=table)],
519 sid=sid,
520 sensing_start=sensing_start,
521 sensing_stop=sensing_stop,
522 types=['min'])['min']:
523
524 # times.append(sensing_time)
525 # raw_ints = np.concatenate((raw_ints, np.array([[rowcount, orbit]], dtype=np.int)))
526 sparse.append((sensing_time, orbit, rowcount))
527
528 else:
529 # return rowcounts for SCAO17_DAILY_AVG and any others with no stats
530 for sensing_time, in ts.select(
531 table,
532 [SensingTime],
533 sid,
534 sensing_start,
535 sensing_stop):
536
537 # times.append(sensing_time)
538 # orbit = sid.orbit.find_orbit(sensing_time)
539 # raw_ints = np.concatenate((raw_ints, np.array([[1, orbit]], dtype=np.int)))
540
541 sparse.append((sensing_time, sid.orbit.find_orbit(sensing_time), 1))
542
543 # return {'times': times,
544 # 'orbits': raw_ints[:, 1],
545 # 'rowcounts': raw_ints[:, 0]}
546
547 first_orbit = sid.orbit.find_orbit(sensing_start)
548 last_orbit = sid.orbit.find_orbit(sensing_stop)
549
550 cc = last_orbit - first_orbit + 1 # total number of record we shall return
551 times = [None] * cc
552 orbits = np.empty((cc, ), dtype=np.int)
553 rowcounts = np.empty((cc, ), dtype=np.int)
554
555 sparse_ptr = 0
556 res_ptr = 0
557 for orbit in range(first_orbit, last_orbit + 1):
558 if sparse_ptr < len(sparse) and orbit == sparse[sparse_ptr][1]:
559 times[res_ptr] = sparse[sparse_ptr][0]
560 orbits[res_ptr] = orbit
561 rowcounts[res_ptr] = sparse[sparse_ptr][2]
562 sparse_ptr += 1
563
564 else:
565 try:
566 times[res_ptr] = sid.orbit.get_orbit_times(orbit)[0]
567 except NoSuchOrbit:
568 continue
569
570 orbits[res_ptr] = orbit
571 rowcounts[res_ptr] = 0
572
573 res_ptr += 1
574
575 times = times[:res_ptr]
576 orbits = orbits[:res_ptr]
577 rowcounts = rowcounts[:res_ptr]
578
579 return Series(times=times,
580 orbits=orbits,
581 gap_threshold=sid.satellite.orbit_duration,
582 rowcounts=rowcounts)
583
584
585def retrieve_daycounts(sid,
586 sensing_start,
587 sensing_stop,
588 table):
589 """Read one row per day.
590
591 Dangerous for any table except scao17_daily_avg."""
592
593 logging.info('Retrieve daycounts for {t}'.format(t=table))
594
595 res = {} # day against (orbit, rowcount)
596
597 for sensing_time, in ts.select(table,
598 ['SENSING_TIME'],
599 sid,
600 sensing_start,
601 sensing_stop):
602 date = sensing_time.date()
603 if date not in res:
604 res[date] = 1
605
606 else:
607 res[date] += 1
608
609 days_count = int(timedelta_div(sensing_stop - sensing_start,
610 timedelta(days=1)))
611 times = [None] * days_count
612 orbits = np.empty((days_count, ), dtype=np.int)
613 rowcounts = np.empty((days_count, ), dtype=np.int)
614
615 for i in range(days_count):
616 date = sensing_start.date() + timedelta(days=i)
617 times[i] = date
618 orbits[i] = sid.orbit.find_orbit(date)
619 rowcounts[i] = res.get(date, 0)
620
621 return Series(times=times,
622 orbits=orbits,
623 gap_threshold=timedelta(days=1),
624 rowcounts=rowcounts)