1#!/usr/bin/env python3
2
3"""Compute stats for a table over a time range.
4
5If called by the SHORT_TERM_STATS activity, we generate orbital stats
6plus, in future, any higher frequency sub-orbital stats.
7
8If called by the LONG_TERM_STATS activity, we generate lower frequency stats such as
9- multiple-orbit stats
10- daily stats (note: this one is tricky because it may need to collect stats over all orbits
11 that begin in a day, not over all points for a calendar day. This is to avoid introducing
12 artificial variations into the data, since each daily stat should include a whole number
13 or orbits and no partial orbits.
14- Lower frequency stats like 3-day stats. This won't be needed until several years after
15 launch
16"""
17
18import logging
19
20from chart.alg import init_algorithm
21from chart.plots.sampling import Sampling
22from chart.alg.jsonb_stats import compute_stats_jsonb
23from chart.alg.flat_stats import compute_stats_flat
24from chart.backend.jobs import JobStatus
25from chart.common.timerange import TimeRange
26from chart.db.exceptions import IntegrityError
27from chart.db.model.table import TableStorage
28
29
30logger = logging.getLogger()
31
32def dispatch(wo, resultfile, _):
33 """We are being run by the dispatcher tool."""
34 logger.info('Startup')
35 for job in wo.read_jobs():
36 logger.debug('job {j}'.format(j=job))
37 # Build a list of all the stats specifications to be processed
38 if job.activity.name == 'SHORT_TERM_STATS':
39 # Generate the short,fixed length stats in response to a product arriving
40 logging.info('Begin short term stats for {table} from {start} to {stop}'. format(
41 table=job.table,
42 start=job.sensing_start,
43 stop=job.sensing_stop))
44 samplings = [Sampling.SEC_10, Sampling.MIN_1, Sampling.MIN_5, Sampling.MIN_20]
45
46 elif job.activity.name == 'LONG_TERM_STATS':
47 # Generate the longer orbit based stats in response to a daily trigger
48 logging.info('Begin Long Term ORBITAL and DAILY stats for {table} from {start} to {stop}'. format(
49 table=job.table,
50 start=job.sensing_start,
51 stop=job.sensing_stop))
52 samplings = [Sampling.ORBITAL, Sampling.HALF_DAILY, Sampling.DAILY]
53
54 elif job.activity.name == 'EXTENDED_STATS':
55 # Generate the longer orbit based stats in response to a daily trigger
56 logging.info('Begin Long Term ORBITAL and DAILY stats for {table} from {start} to {stop}'. format(
57 table=job.table,
58 start=job.sensing_start,
59 stop=job.sensing_stop))
60 samplings = [Sampling.THREE_DAILY]
61
62 else:
63 logger.error('Unhandled activity')
64 break
65
66 # now generate required set of stats
67 if job.table.storage is TableStorage.JSONB:
68 compute_stats_jsonb(sid=job.sid,
69 table_info=job.table,
70 start=job.sensing_start,
71 stop=job.sensing_stop,
72 samplings=samplings)
73
74 elif job.table.storage is TableStorage.FLAT:
75 compute_stats_flat(sid=job.sid,
76 table_info=job.table,
77 start_time=job.sensing_start,
78 stop_time=job.sensing_stop,
79 samplings=samplings)
80
81 else:
82 raise NotImplementedError()
83
84 resultfile.add_job(job, JobStatus.COMPLETED)
85
86 logging.info('All done')
87
88
89def main():
90 """Command line entry point.
91 This program can either be called via `dispatcher` or standalone."""
92 dispatch(*init_algorithm())
93
94if __name__ == '__main__':
95 main()