1#!/usr/bin/env python3
  2
  3"""Compute timeseries table statistics.
  4
  5If a specific sampling is given, just compute that,
  6otherwise the input timerange is expanded to multiples of 3 days and
  7all samplings computed.
  8
  9Flat tables not supported.
 10"""
 11
 12from datetime import timedelta
 13import logging
 14from collections import defaultdict
 15
 16from chart.alg.jsonb_stats import compute_stats_jsonb
 17from chart.common.args import ArgumentParser
 18from chart.common.timerange import TimeRange
 19from chart.db.exceptions import IntegrityError
 20from chart.db.model.table import TableInfo, TableStorage
 21from chart.plots.sampling import sampling_from_name
 22from chart.project import settings
 23from chart.plots.sampling import Sampling
 24from chart.backend.worker_job_chains import threeday_start_time
 25
 26HOUR = timedelta(hours=1)
 27DAY = timedelta(days=1)
 28
 29logger = logging.getLogger()
 30
 31# it's fairly arbitrary how we split the stats up. It's a compromise between
 32# high memory usage if too many stats are combined (the worst case, processing all together,
 33# results in reading 3 days of 10s stats into memory together for processing and needs too
 34# much memory for a normal server
 35SPLIT_SAMPLINGS = {
 36    'short': [
 37        Sampling.SEC_10,
 38        Sampling.MIN_1,
 39        Sampling.MIN_5,
 40        Sampling.MIN_20,
 41        Sampling.HOURLY],
 42    'long': [
 43        Sampling.QUARTER_DAILY,
 44        Sampling.DAILY,
 45        Sampling.THREE_DAILY,
 46    ]
 47}
 48
 49# Compute long periods of stats in batches to keep memory use down
 50MIN_BLOCK = timedelta(hours=1)
 51
 52
 53# def compute_stats_flat(sid, table, start_time, stop_time):
 54#     """Compute orbital stats for spacecraft in flat tables."""
 55#     batch_size = 21
 56#     commit_count = 1
 57#     logger.info('Table {t}'.format(t=table.name))
 58#     if sid.orbit:
 59#         for o in sid.orbit.orbits_in_range(start_time, stop_time):
 60#             try:
 61#                 chartjcs.alg.orbital_stats.compute_stats(
 62#                     sid,
 63#                     o['number'],
 64#                     TimeRange(o['start_time'], o['stop_time']),
 65#                     table,
 66#                     commit=commit_count % batch_size == 0)
 67#             except IntegrityError:
 68#                 logger.warn('Integrity error {t} {s} {strt} {stop}'.format(
 69#                     t=table, s=sid, strt=o['start_time'], stop=o['stop_time']))
 70#                 continue
 71
 72#             commit_count += 1
 73
 74#         else:
 75#             logger.error('SID has no orbits: {sid}'.format(sid=sid))
 76
 77#     chartjcs.alg.orbital_stats.commit_all()
 78
 79
 80def main():
 81    """Entry point."""
 82    parser = ArgumentParser()
 83    parser.add_argument('--db',
 84                        metavar='CONN',
 85                        help='Use database connection CONN')
 86    parser.add_argument('--sid', '-s',
 87                        type=ArgumentParser.sid,
 88                        nargs='+',
 89                        required=True,
 90                        help='Spacecraft(s) to download')
 91    parser.add_argument('--start',
 92                        type=ArgumentParser.start_time,
 93                        required=True,
 94                        help='Start of download period')
 95    parser.add_argument('--stop',
 96                        type=ArgumentParser.stop_time,
 97                        required=True,
 98                        help='Stop of download period')
 99    parser.add_argument('--sampling',
100                        nargs='+',
101                        help='Compute given samplings')
102    parser.add_argument('--tablename', '--table', '-t',
103                        nargs='+',
104                        required=True,  # maybe allow '%' wildcards
105                        help='Table(s) to process')
106    args = parser.parse_args()
107
108    if args.db:
109        settings.set_db_name(args.db)
110
111    if args.sampling is not None:
112        samplings = [sampling_from_name(s) for s in args.sampling]
113
114    else:
115        # take from sid instead?
116        samplings = [
117            Sampling.SEC_10,
118            Sampling.MIN_1,
119            Sampling.MIN_5,
120            Sampling.MIN_20,
121            Sampling.HOURLY,
122            Sampling.QUARTER_DAILY,
123            Sampling.DAILY,
124            Sampling.THREE_DAILY,
125        ]
126        logging.info('Selecting default samplings: {s}'.format(
127            s=', '.join(s.name for s in samplings)))
128
129    sampling_groups = defaultdict(list)  # group name against list of samplings in use
130    for sampling in samplings:
131        for k, v in SPLIT_SAMPLINGS.items():
132            if sampling in v:
133                sampling_groups[k].append(sampling)
134
135    # now process each group of samplings seperately
136
137    for sampling_group in sampling_groups.values():
138        block_length = max(MIN_BLOCK, sampling_group[-1].nominal_duration)
139        # Expend time range, in a lazy way to keep it simple to implement
140        # If the biggest sampling is hourly or less, expand to hours
141        if sampling_group[-1].nominal_duration <= HOUR:
142            start = args.start.replace(minute=0, second=0, microsecond=0)
143
144        elif samplings[-1].nominal_duration <= DAY:
145            start = args.start,replace(hour=0, minute=0, second=0, microsecond=0)
146
147        else:
148            start = threeday_start_time(args.start)
149
150        while start < args.stop:
151            logger.info('Processing group {g} shifted start time to {strt}'.format(
152                g=' '.join(str(s) for s in sampling_group), strt=start))
153
154            for sid in args.sid:
155                for tablename in args.tablename:
156                    #for tableTableInfo(name='TM', sid=sid), args.tablename):
157                    table = TableInfo(name=tablename, sid=sid)
158                    if table.storage is TableStorage.JSONB:
159                        compute_stats_jsonb(sid,
160                                            table,
161                                            start,
162                                            min(args.stop, start + block_length),
163                                            sampling_group)
164
165                    elif t.storage is TableStorage.FLAT:
166                        raise NotImplementedError()
167                        # compute_stats_flat(sid,
168                                           # table,
169                                           # args.start,
170                                           # args.stop)
171
172                    elif t.storage is TableStorage.KEYVALUE:
173                        raise NotImplementedError()
174                        # logger.info('Keyvalue stores not supported in this tool')
175                        # continue
176
177            start += block_length
178
179if __name__ == '__main__':
180    main()