1#!/usr/bin/env python3
2
3"""Compute timeseries table statistics.
4
5If a specific sampling is given, just compute that,
6otherwise the input timerange is expanded to multiples of 3 days and
7all samplings computed.
8
9Flat tables not supported.
10"""
11
12from datetime import timedelta
13import logging
14from collections import defaultdict
15
16from chart.alg.jsonb_stats import compute_stats_jsonb
17from chart.common.args import ArgumentParser
18from chart.common.timerange import TimeRange
19from chart.db.exceptions import IntegrityError
20from chart.db.model.table import TableInfo, TableStorage
21from chart.plots.sampling import sampling_from_name
22from chart.project import settings
23from chart.plots.sampling import Sampling
24from chart.backend.worker_job_chains import threeday_start_time
25
26HOUR = timedelta(hours=1)
27DAY = timedelta(days=1)
28
29logger = logging.getLogger()
30
31# it's fairly arbitrary how we split the stats up. It's a compromise between
32# high memory usage if too many stats are combined (the worst case, processing all together,
33# results in reading 3 days of 10s stats into memory together for processing and needs too
34# much memory for a normal server
35SPLIT_SAMPLINGS = {
36 'short': [
37 Sampling.SEC_10,
38 Sampling.MIN_1,
39 Sampling.MIN_5,
40 Sampling.MIN_20,
41 Sampling.HOURLY],
42 'long': [
43 Sampling.QUARTER_DAILY,
44 Sampling.DAILY,
45 Sampling.THREE_DAILY,
46 ]
47}
48
49# Compute long periods of stats in batches to keep memory use down
50MIN_BLOCK = timedelta(hours=1)
51
52
53# def compute_stats_flat(sid, table, start_time, stop_time):
54# """Compute orbital stats for spacecraft in flat tables."""
55# batch_size = 21
56# commit_count = 1
57# logger.info('Table {t}'.format(t=table.name))
58# if sid.orbit:
59# for o in sid.orbit.orbits_in_range(start_time, stop_time):
60# try:
61# chartjcs.alg.orbital_stats.compute_stats(
62# sid,
63# o['number'],
64# TimeRange(o['start_time'], o['stop_time']),
65# table,
66# commit=commit_count % batch_size == 0)
67# except IntegrityError:
68# logger.warn('Integrity error {t} {s} {strt} {stop}'.format(
69# t=table, s=sid, strt=o['start_time'], stop=o['stop_time']))
70# continue
71
72# commit_count += 1
73
74# else:
75# logger.error('SID has no orbits: {sid}'.format(sid=sid))
76
77# chartjcs.alg.orbital_stats.commit_all()
78
79
80def main():
81 """Entry point."""
82 parser = ArgumentParser()
83 parser.add_argument('--db',
84 metavar='CONN',
85 help='Use database connection CONN')
86 parser.add_argument('--sid', '-s',
87 type=ArgumentParser.sid,
88 nargs='+',
89 required=True,
90 help='Spacecraft(s) to download')
91 parser.add_argument('--start',
92 type=ArgumentParser.start_time,
93 required=True,
94 help='Start of download period')
95 parser.add_argument('--stop',
96 type=ArgumentParser.stop_time,
97 required=True,
98 help='Stop of download period')
99 parser.add_argument('--sampling',
100 nargs='+',
101 help='Compute given samplings')
102 parser.add_argument('--tablename', '--table', '-t',
103 nargs='+',
104 required=True, # maybe allow '%' wildcards
105 help='Table(s) to process')
106 args = parser.parse_args()
107
108 if args.db:
109 settings.set_db_name(args.db)
110
111 if args.sampling is not None:
112 samplings = [sampling_from_name(s) for s in args.sampling]
113
114 else:
115 # take from sid instead?
116 samplings = [
117 Sampling.SEC_10,
118 Sampling.MIN_1,
119 Sampling.MIN_5,
120 Sampling.MIN_20,
121 Sampling.HOURLY,
122 Sampling.QUARTER_DAILY,
123 Sampling.DAILY,
124 Sampling.THREE_DAILY,
125 ]
126 logging.info('Selecting default samplings: {s}'.format(
127 s=', '.join(s.name for s in samplings)))
128
129 sampling_groups = defaultdict(list) # group name against list of samplings in use
130 for sampling in samplings:
131 for k, v in SPLIT_SAMPLINGS.items():
132 if sampling in v:
133 sampling_groups[k].append(sampling)
134
135 # now process each group of samplings seperately
136
137 for sampling_group in sampling_groups.values():
138 block_length = max(MIN_BLOCK, sampling_group[-1].nominal_duration)
139 # Expend time range, in a lazy way to keep it simple to implement
140 # If the biggest sampling is hourly or less, expand to hours
141 if sampling_group[-1].nominal_duration <= HOUR:
142 start = args.start.replace(minute=0, second=0, microsecond=0)
143
144 elif samplings[-1].nominal_duration <= DAY:
145 start = args.start,replace(hour=0, minute=0, second=0, microsecond=0)
146
147 else:
148 start = threeday_start_time(args.start)
149
150 while start < args.stop:
151 logger.info('Processing group {g} shifted start time to {strt}'.format(
152 g=' '.join(str(s) for s in sampling_group), strt=start))
153
154 for sid in args.sid:
155 for tablename in args.tablename:
156 #for tableTableInfo(name='TM', sid=sid), args.tablename):
157 table = TableInfo(name=tablename, sid=sid)
158 if table.storage is TableStorage.JSONB:
159 compute_stats_jsonb(sid,
160 table,
161 start,
162 min(args.stop, start + block_length),
163 sampling_group)
164
165 elif t.storage is TableStorage.FLAT:
166 raise NotImplementedError()
167 # compute_stats_flat(sid,
168 # table,
169 # args.start,
170 # args.stop)
171
172 elif t.storage is TableStorage.KEYVALUE:
173 raise NotImplementedError()
174 # logger.info('Keyvalue stores not supported in this tool')
175 # continue
176
177 start += block_length
178
179if __name__ == '__main__':
180 main()