1"""Perform stats calculation."""
  2
  3import logging
  4from datetime import datetime
  5from typing import Iterable
  6
  7from chart.db import ts
  8from chart.db.func import Count
  9from chart.db.connection import db_connect
 10from chart.db.exceptions import IntegrityError
 11from chart.db.model.table import TableInfo
 12from chart.common.traits import Datatype
 13from chart.db.drivers.oracle import oracle_typemap
 14from chart.common.timerange import TimeRange
 15from chart.project import SID
 16from chart.plots.sampling import Sampling
 17from chart.db.func import UNORDERED
 18from chart.project import settings
 19
 20logger = logging.getLogger()
 21
 22ADD_ORBITS = True  # for JCS
 23#settings.ORBIT_IN_JOBS_TABLE
 24
 25def compute_stats_flat(sid: SID,
 26                       table_info: TableInfo,
 27                       start_time: datetime,
 28                       stop_time: datetime,
 29                       samplings: Iterable[Sampling]):
 30    """For each sampling in `samplings` compute all stats across time range."""
 31    for sampling in samplings:
 32        if sampling is Sampling.ORBITAL:
 33            for o in sid.orbit.orbits_in_range(start_time, stop_time):
 34                if o['start_time'] >= start_time and o['start_time'] < stop_time:
 35                    compute_stats(sampling,
 36                                  sid,
 37                                  table_info,
 38                                  TimeRange(o['start_time'], o['stop_time']))
 39
 40        elif sid.orbit is not None and sampling is Sampling.DAILY:
 41            # Daily orbital stats
 42            prev_date = None
 43            orbit_start = None
 44            o = None
 45            for o in sid.orbit.orbits_in_range(start_time, stop_time):
 46                if o['start_time'] >= start_time and o['start_time'] < stop_time:
 47                    if o['start_time'].date() != prev_date:
 48                        if orbit_start is not None:
 49                            compute_stats(sampling,
 50                                          sid,
 51                                          table_info,
 52                                          TimeRange(orbit_start, o['stop_time']))
 53                            orbit_start = o['start_time']
 54
 55                        orbit_start = o['start_time']
 56                        prev_date = o['start_time'].date()
 57
 58            if orbit_start is not None and o is not None:
 59                compute_stats(sampling,
 60                              sid,
 61                              table_info,
 62                              TimeRange(orbit_start, o['stop_time']))
 63
 64        else:
 65            # fixed duration stats
 66            position = start_time
 67            while position < stop_time:
 68                next_time = min(stop_time, position + sampling.nominal_duration)
 69                compute_stats(sampling,
 70                              sid,
 71                              table_info,
 72                              TimeRange(position, next_time))
 73                position = next_time
 74
 75
 76def compute_stats(region, sid, table, time_range):
 77    """Low level function to compute stats for a single period of a table."""
 78    region = region.region_num
 79
 80    if not has_data(sid, table, time_range):
 81        return
 82
 83    logger.info('Computing stats for {source} {scid} from {time_range}'.format(
 84        source=table.name, scid=sid.name, time_range=time_range))
 85
 86    db_conn = db_connect(table)
 87    for stats_table in table.stats_tables():
 88        if 'ESTATS' in stats_table['tablename']:
 89            continue
 90
 91        del_sql = make_del_sql(db_conn, region, stats_table['tablename'])
 92        del_cur = db_conn.query(del_sql,
 93                                  sid_num=sid.sid_num,
 94                                  start_time=time_range.start,
 95                                  stop_time=time_range.stop)
 96        del_cur.close()
 97        db_conn.commit()
 98        if stats_table['tablename'].endswith('_STATS'):
 99            ins_sql = make_combined_sql(region, stats_table['tablename'])
100
101        else:
102            ins_sql = make_split_sql(region, stats_table['tablename'])
103
104        gen_time = datetime.utcnow()
105        try:
106            ins_cur = db_conn.query(ins_sql,
107                                    sid_num=sid.sid_num,
108                                    start_time=time_range.start,
109                                    stop_time=time_range.stop,
110                                    gen_time=gen_time)
111        except IntegrityError:
112            logger.error('Cannot insert into table {tbl} using parameters {scid} from {time_range}'
113                         .format(tbl=table.name, scid=sid.name, time_range=time_range))
114            raise
115
116        ins_cur.close()
117        db_conn.commit()
118
119
120def has_data(sid, table, time_range):
121    """Check if there is data in the given table and time range for computing the statistics."""
122
123    if table == TableInfo('TC'):
124        # TBD Json table, so count using ts.count,
125        # maybe we can always use the ts.count for this check
126        rec_count = ts.count(table=table.storage_table,
127                             sid=sid,
128                             field='SENSING_TIME',
129                             sensing_start=time_range.start,
130                             sensing_stop=time_range.stop)
131
132        if rec_count is None or rec_count == 0:
133            logging.debug('No data to create stats for; table {table} from {time_range}'
134                      .format(table=table.name, time_range=time_range))
135            return False
136
137    else:
138        rec_count = ts.select(table=table,
139                              fields=(Count()),
140                              sid=sid,
141                              sensing_start=time_range.start,
142                              sensing_stop=time_range.stop,
143                              calibrated=False,
144                              ordering=UNORDERED).fetchone()
145
146        if rec_count is None or rec_count[0] == 0:
147            logging.debug('No data to create stats for; table {table} from {time_range}'
148                      .format(table=table.name, time_range=time_range))
149            return False
150
151    return True
152
153
154def make_del_sql(db_conn, region, tablename):
155    # delete old stats in time-range before inserting new stats
156    del_sql = ('DELETE FROM {table} '
157               'WHERE REGION=\'{region}\''
158               'AND sid_num=:sid_num '
159               'AND sensing_time>=:start_time '
160               'AND sensing_time<:stop_time'.format(
161                   region=region,
162                   table=tablename))
163    # del_cur.setinputsizes(start_time=oracle_typemap[datetime],
164                          # stop_time=oracle_typemap[datetime])
165    # del_curs.append((stats_table['tablename'], del_cur))
166    return del_sql
167
168
169def make_combined_sql(region, dest_tablename):
170    """Generate engine-specific SQL code to write all statistics for a single
171    period to a _STATS or _CAL_STATS table.
172    """
173    source_tablename = dest_tablename[:-6]
174
175    if source_tablename.endswith('_CAL'):
176        info_tablename = source_tablename[:-4]
177
178    else:
179        info_tablename = source_tablename
180
181    table_info = TableInfo(info_tablename)
182
183    all_fields = list(table_info.fields.keys())
184
185    min_field_list = []
186    max_field_list = []
187    avg_field_list = []
188    std_field_list = []
189    d_min_field_list = []
190    d_max_field_list = []
191    d_avg_field_list = []
192    d_std_field_list = []
193
194    if table_info.sparse:
195        cnt_field_list = []
196        d_cnt_field_list = []
197
198    # only calculate stats for numerical types...
199    for field in all_fields:
200        f = table_info.fields.get(field)
201        if f.datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
202
203            min_field_list.append('min('+ field + ')')
204            max_field_list.append('max('+ field + ')')
205            d_min_field_list.append(field + '_MIN')
206            d_max_field_list.append(field + '_MAX')
207
208            avg_field_list.append('avg('+ field + ')')
209            std_field_list.append('coalesce(stddev(' + field + '),0)')
210
211            if table_info.sparse:
212                cnt_field_list.append('count('+ field + ')')
213                d_cnt_field_list.append(field + '_CNT')
214
215            d_avg_field_list.append(field + '_AVG')
216            d_std_field_list.append(field + '_STD')
217
218    # collect
219    source_fields = min_field_list + \
220             avg_field_list + \
221             max_field_list + \
222             std_field_list
223
224    dest_fields = d_min_field_list + \
225             d_avg_field_list + \
226             d_max_field_list + \
227             d_std_field_list
228
229    if table_info.sparse:
230        # sparse table so contains _CNT field
231        source_fields += cnt_field_list
232        dest_fields += d_cnt_field_list
233
234    sql = ('INSERT INTO {dest_tablename} (SID_NUM,REGION,GEN_TIME,SENSING_TIME,ROWCOUNT{orbit_field},'
235            '    {dest_fields})'
236            '  SELECT :sid_num,\'{region}\',:gen_time, :start_time,count(*){orbit_value},'
237            '    {source_fields}'
238            '  FROM {source_tablename}'
239            '  WHERE sid_num=:sid_num'
240            '  AND sensing_time>=:start_time AND sensing_time<:stop_time'.format(
241                dest_tablename=dest_tablename,
242                orbit_field=',orbit' if ADD_ORBITS else '',
243                dest_fields=','.join('{f}'.format(f=f) for f in dest_fields),
244                region=region,
245                orbit_value=',-1' if ADD_ORBITS else '',
246                source_fields=','.join('{f}'.format(f=f) for f in source_fields),
247                source_tablename=source_tablename))
248
249    return sql
250
251def make_split_sql(region, dest_tablename):
252    """Generate engine-specific SQL code to write to a single split statistics table."""
253    source_tablename = dest_tablename[:-5]
254
255    if source_tablename.endswith('_CAL'):
256        info_tablename = source_tablename[:-4]
257    else:
258        info_tablename = source_tablename
259
260    table_info = TableInfo(info_tablename)
261    all_fields = list(table_info.fields.keys())
262
263    source_fields = []
264    dest_fields = []
265
266    # only calculate stats for numerical types...
267    if dest_tablename.endswith('_MINS'):
268        field_suffix = '_MIN'
269        for field in all_fields:
270            if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
271                source_fields.append('min('+ field + ')')
272                dest_fields.append(field + field_suffix)
273
274    elif dest_tablename.endswith('_MAXS'):
275        field_suffix = '_MAX'
276        for field in all_fields:
277            if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
278                source_fields.append('max('+ field + ')')
279                dest_fields.append(field + field_suffix)
280
281    elif dest_tablename.endswith('_CNTS'):
282        field_suffix = '_CNT'
283        for field in all_fields:
284            if field in 'TIME_STATUS OBT SPID':
285                continue
286            if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
287                source_fields.append('count('+ field + ')')
288                dest_fields.append(field + field_suffix)
289
290    elif dest_tablename.endswith('_AVGS'):
291        field_suffix = '_AVG'
292        for field in all_fields:
293            if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
294                source_fields.append('avg('+ field + ')')
295                dest_fields.append(field + field_suffix)
296
297    elif dest_tablename.endswith('_STDS'):
298        field_suffix = '_STD'
299        for field in all_fields:
300            if table_info.fields.get(field).datatype in (Datatype.UINT, Datatype.INT, Datatype.FLOAT):
301                source_fields.append('coalesce(stddev(' + field + '),0)')
302                dest_fields.append(field + field_suffix)
303
304    else:
305        raise ValueError('Cannot identify function type for parameter {table}'.format(
306                    table=dest_tablename))
307
308    sql = ('INSERT INTO {dest_tablename} (SID_NUM,REGION,GEN_TIME,SENSING_TIME,ROWCOUNT, '
309            '    {dest_fields})'
310            '  SELECT :sid_num,\'{region}\',:gen_time, :start_time,count(*),'
311            '    {source_fields}'
312            '  FROM {source_tablename}'
313            '  WHERE sid_num=:sid_num'
314            '  AND sensing_time>=:start_time AND sensing_time<:stop_time'.format(
315                region=region,
316                source_tablename=source_tablename,
317                source_fields=','.join(field for field in source_fields),
318                dest_tablename=dest_tablename,
319                dest_fields=','.join(field for field in dest_fields)))
320
321    return sql