1"""Perform stats calculation."""
2
3import logging
4from datetime import datetime
5from typing import Iterable
6
7from chart.db import ts
8from chart.db.func import Count
9from chart.db.connection import db_connect
10from chart.db.exceptions import IntegrityError
11from chart.db.model.table import TableInfo
12from chart.common.traits import Datatype
13from chart.db.drivers.oracle import oracle_typemap
14from chart.common.timerange import TimeRange
15from chart.project import SID
16from chart.plots.sampling import Sampling
17from chart.db.func import UNORDERED
18from chart.project import settings
19
20logger = logging.getLogger()
21
22ADD_ORBITS = True # for JCS
23#settings.ORBIT_IN_JOBS_TABLE
24
25def compute_stats_flat(sid: SID,
26 table_info: TableInfo,
27 start_time: datetime,
28 stop_time: datetime,
29 samplings: Iterable[Sampling]):
30 """For each sampling in `samplings` compute all stats across time range."""
31 for sampling in samplings:
32 if sampling is Sampling.ORBITAL:
33 for o in sid.orbit.orbits_in_range(start_time, stop_time):
34 if o['start_time'] >= start_time and o['start_time'] < stop_time:
35 compute_stats(sampling,
36 sid,
37 table_info,
38 TimeRange(o['start_time'], o['stop_time']))
39
40 elif sid.orbit is not None and sampling is Sampling.DAILY:
41 # Daily orbital stats
42 prev_date = None
43 orbit_start = None
44 o = None
45 for o in sid.orbit.orbits_in_range(start_time, stop_time):
46 if o['start_time'] >= start_time and o['start_time'] < stop_time:
47 if o['start_time'].date() != prev_date:
48 if orbit_start is not None:
49 compute_stats(sampling,
50 sid,
51 table_info,
52 TimeRange(orbit_start, o['stop_time']))
53 orbit_start = o['start_time']
54
55 orbit_start = o['start_time']
56 prev_date = o['start_time'].date()
57
58 if orbit_start is not None and o is not None:
59 compute_stats(sampling,
60 sid,
61 table_info,
62 TimeRange(orbit_start, o['stop_time']))
63
64 else:
65 # fixed duration stats
66 position = start_time
67 while position < stop_time:
68 next_time = min(stop_time, position + sampling.nominal_duration)
69 compute_stats(sampling,
70 sid,
71 table_info,
72 TimeRange(position, next_time))
73 position = next_time
74
75
76def compute_stats(region, sid, table, time_range):
77 """Low level function to compute stats for a single period of a table."""
78 region = region.region_num
79
80 if not has_data(sid, table, time_range):
81 return
82
83 logger.info('Computing stats for {source} {scid} from {time_range}'.format(
84 source=table.name, scid=sid.name, time_range=time_range))
85
86 db_conn = db_connect(table)
87 for stats_table in table.stats_tables():
88 if 'ESTATS' in stats_table['tablename']:
89 continue
90
91 del_sql = make_del_sql(db_conn, region, stats_table['tablename'])
92 del_cur = db_conn.query(del_sql,
93 sid_num=sid.sid_num,
94 start_time=time_range.start,
95 stop_time=time_range.stop)
96 del_cur.close()
97 db_conn.commit()
98 if stats_table['tablename'].endswith('_STATS'):
99 ins_sql = make_combined_sql(region, stats_table['tablename'])
100
101 else:
102 ins_sql = make_split_sql(region, stats_table['tablename'])
103
104 gen_time = datetime.utcnow()
105 try:
106 ins_cur = db_conn.query(ins_sql,
107 sid_num=sid.sid_num,
108 start_time=time_range.start,
109 stop_time=time_range.stop,
110 gen_time=gen_time)
111 except IntegrityError:
112 logger.error('Cannot insert into table {tbl} using parameters {scid} from {time_range}'
113 .format(tbl=table.name, scid=sid.name, time_range=time_range))
114 raise
115
116 ins_cur.close()
117 db_conn.commit()
118
119
120def has_data(sid, table, time_range):
121 """Check if there is data in the given table and time range for computing the statistics."""
122
123 if table == TableInfo('TC'):
124 # TBD Json table, so count using ts.count,
125 # maybe we can always use the ts.count for this check
126 rec_count = ts.count(table=table.storage_table,
127 sid=sid,
128 field='SENSING_TIME',
129 sensing_start=time_range.start,
130 sensing_stop=time_range.stop)
131
132 if rec_count is None or rec_count == 0:
133 logging.debug('No data to create stats for; table {table} from {time_range}'
134 .format(table=table.name, time_range=time_range))
135 return False
136
137 else:
138 rec_count = ts.select(table=table,
139 fields=(Count()),
140 sid=sid,
141 sensing_start=time_range.start,
142 sensing_stop=time_range.stop,
143 calibrated=False,
144 ordering=UNORDERED).fetchone()
145
146 if rec_count is None or rec_count[0] == 0:
147 logging.debug('No data to create stats for; table {table} from {time_range}'
148 .format(table=table.name, time_range=time_range))
149 return False
150
151 return True
152
153
154def make_del_sql(db_conn, region, tablename):
155 # delete old stats in time-range before inserting new stats
156 del_sql = ('DELETE FROM {table} '
157 'WHERE REGION=\'{region}\''
158 'AND sid_num=:sid_num '
159 'AND sensing_time>=:start_time '
160 'AND sensing_time<:stop_time'.format(
161 region=region,
162 table=tablename))
163 # del_cur.setinputsizes(start_time=oracle_typemap[datetime],
164 # stop_time=oracle_typemap[datetime])
165 # del_curs.append((stats_table['tablename'], del_cur))
166 return del_sql
167
168
169def make_combined_sql(region, dest_tablename):
170 """Generate engine-specific SQL code to write all statistics for a single
171 period to a _STATS or _CAL_STATS table.
172 """
173 source_tablename = dest_tablename[:-6]
174
175 if source_tablename.endswith('_CAL'):
176 info_tablename = source_tablename[:-4]
177
178 else:
179 info_tablename = source_tablename
180
181 table_info = TableInfo(info_tablename)
182
183 all_fields = list(table_info.fields.keys())
184
185 min_field_list = []
186 max_field_list = []
187 avg_field_list = []
188 std_field_list = []
189 d_min_field_list = []
190 d_max_field_list = []
191 d_avg_field_list = []
192 d_std_field_list = []
193
194 if table_info.sparse:
195 cnt_field_list = []
196 d_cnt_field_list = []
197
198 # only calculate stats for numerical types...
199 for field in all_fields:
200 f = table_info.fields.get(field)
201 if f.datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
202
203 min_field_list.append('min('+ field + ')')
204 max_field_list.append('max('+ field + ')')
205 d_min_field_list.append(field + '_MIN')
206 d_max_field_list.append(field + '_MAX')
207
208 avg_field_list.append('avg('+ field + ')')
209 std_field_list.append('coalesce(stddev(' + field + '),0)')
210
211 if table_info.sparse:
212 cnt_field_list.append('count('+ field + ')')
213 d_cnt_field_list.append(field + '_CNT')
214
215 d_avg_field_list.append(field + '_AVG')
216 d_std_field_list.append(field + '_STD')
217
218 # collect
219 source_fields = min_field_list + \
220 avg_field_list + \
221 max_field_list + \
222 std_field_list
223
224 dest_fields = d_min_field_list + \
225 d_avg_field_list + \
226 d_max_field_list + \
227 d_std_field_list
228
229 if table_info.sparse:
230 # sparse table so contains _CNT field
231 source_fields += cnt_field_list
232 dest_fields += d_cnt_field_list
233
234 sql = ('INSERT INTO {dest_tablename} (SID_NUM,REGION,GEN_TIME,SENSING_TIME,ROWCOUNT{orbit_field},'
235 ' {dest_fields})'
236 ' SELECT :sid_num,\'{region}\',:gen_time, :start_time,count(*){orbit_value},'
237 ' {source_fields}'
238 ' FROM {source_tablename}'
239 ' WHERE sid_num=:sid_num'
240 ' AND sensing_time>=:start_time AND sensing_time<:stop_time'.format(
241 dest_tablename=dest_tablename,
242 orbit_field=',orbit' if ADD_ORBITS else '',
243 dest_fields=','.join('{f}'.format(f=f) for f in dest_fields),
244 region=region,
245 orbit_value=',-1' if ADD_ORBITS else '',
246 source_fields=','.join('{f}'.format(f=f) for f in source_fields),
247 source_tablename=source_tablename))
248
249 return sql
250
251def make_split_sql(region, dest_tablename):
252 """Generate engine-specific SQL code to write to a single split statistics table."""
253 source_tablename = dest_tablename[:-5]
254
255 if source_tablename.endswith('_CAL'):
256 info_tablename = source_tablename[:-4]
257 else:
258 info_tablename = source_tablename
259
260 table_info = TableInfo(info_tablename)
261 all_fields = list(table_info.fields.keys())
262
263 source_fields = []
264 dest_fields = []
265
266 # only calculate stats for numerical types...
267 if dest_tablename.endswith('_MINS'):
268 field_suffix = '_MIN'
269 for field in all_fields:
270 if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
271 source_fields.append('min('+ field + ')')
272 dest_fields.append(field + field_suffix)
273
274 elif dest_tablename.endswith('_MAXS'):
275 field_suffix = '_MAX'
276 for field in all_fields:
277 if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
278 source_fields.append('max('+ field + ')')
279 dest_fields.append(field + field_suffix)
280
281 elif dest_tablename.endswith('_CNTS'):
282 field_suffix = '_CNT'
283 for field in all_fields:
284 if field in 'TIME_STATUS OBT SPID':
285 continue
286 if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
287 source_fields.append('count('+ field + ')')
288 dest_fields.append(field + field_suffix)
289
290 elif dest_tablename.endswith('_AVGS'):
291 field_suffix = '_AVG'
292 for field in all_fields:
293 if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
294 source_fields.append('avg('+ field + ')')
295 dest_fields.append(field + field_suffix)
296
297 elif dest_tablename.endswith('_STDS'):
298 field_suffix = '_STD'
299 for field in all_fields:
300 if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
301 source_fields.append('coalesce(stddev(' + field + '),0)')
302 dest_fields.append(field + field_suffix)
303
304 else:
305 raise ValueError('Cannot identify function type for parameter {table}'.format(
306 table=dest_tablename))
307
308 sql = ('INSERT INTO {dest_tablename} (SID_NUM,REGION,GEN_TIME,SENSING_TIME,ROWCOUNT, '
309 ' {dest_fields})'
310 ' SELECT :sid_num,\'{region}\',:gen_time, :start_time,count(*),'
311 ' {source_fields}'
312 ' FROM {source_tablename}'
313 ' WHERE sid_num=:sid_num'
314 ' AND sensing_time>=:start_time AND sensing_time<:stop_time'.format(
315 region=region,
316 source_tablename=source_tablename,
317 source_fields=','.join(field for field in source_fields),
318 dest_tablename=dest_tablename,
319 dest_fields=','.join(field for field in dest_fields)))
320
321 return sql