1#!/usr/bin/env python3
  2
  3"""Ingest SF00 files, algorithm working directories or events XML files to
  4Oracle DB and archive directories."""
  5
  6import os
  7import imp
  8import time
  9import logging
 10from datetime import timedelta
 11
 12from chart.project import settings
 13from chart.products.eps.gpfs import time_filter
 14from chart.db.model.table import TableInfo
 15from chart.db.connection import db_connect
 16from chart.db.settings import DatabaseEngine
 17from chart.project import SID
 18from chart.db.model.exceptions import NoSuchTable
 19
 20
 21class CannotIngest(Exception):
 22    """Exception to be raised if an ingestion failed.
 23    Usually this is because it violates a uniqueness constraint.
 24    """
 25
 26    def __init__(self, message):
 27        super(CannotIngest, self).__init__()
 28        self.message = message
 29
 30    def __str__(self):
 31        return self.message
 32
 33
 34def load_rdr_py():
 35    """Rebuild rdr.py if the source XML files have changed.
 36    The loaded module is returned.
 37    """
 38    # Hack, we should scan all TS_TABLE_DIRS but it probably doesn't matter for EPS
 39    latest_xml_time = max(
 40        os.path.getmtime(str(f)) for f in settings.TS_TABLE_DIRS[0].glob('*.xml'))
 41
 42    # Look for compiled .pyc file
 43    pyc_name = str(settings.RDR_PY_FILENAME) + 'c'
 44    if os.path.exists(pyc_name) and os.path.getmtime(pyc_name) < latest_xml_time:
 45        logging.info('Removing potentially out of date {pyc_name}'.format(pyc_name=pyc_name))
 46        os.unlink(pyc_name)
 47
 48    py_name = settings.RDR_PY_FILENAME
 49    if py_name.exists() and os.path.getmtime(str(py_name)) < latest_xml_time:
 50        logging.info('Removing potentially out of date {py_name}'.format(py_name=py_name))
 51        py_name.unlink()
 52
 53    try:
 54        # pylint complains this import is not used,
 55        # but it is only here to confirm whether the code file exists or not
 56        # from chart.backend import rdr  # (unused variable) pylint: disable=W0612
 57        return imp.load_source('rdr', str(py_name))
 58    except (ImportError, IOError):
 59        from chart.products.eps import rdr_gen
 60        rdr_gen.make_rdr_py()
 61        return imp.load_source('rdr', str(py_name))
 62
 63
 64def ingest_sf00(sf00,
 65                replace,
 66                replace_delete_post_window=timedelta(0),
 67                fuzzy_replace=None):
 68    """Ingest a single SF00 file (from disk or memory object).
 69    Don't forget to commit() afterwards.
 70
 71    Args:
 72        `replace` (bool): Delete existing data ranges before ingesting new data.
 73        `replace_delete_post_window` (timedelta): When `replace` is specified, delete any existing
 74            data within a window starting at the start time of the new data and ending at the stop
 75            time of the new data plus `replace_delete_post_window` seconds.
 76        `fuzzy_replace` (timedelta): Before ingesting each individual new row delete any existing
 77            rows with a timestamp within +/- `fuzzy_replace`.
 78
 79    Returns:
 80        List of dictionaries containing:
 81            table
 82            sensing_start
 83            sensing_stop
 84
 85        describing which AP tables were written to
 86    """
 87
 88    # if settings.REINGEST:
 89    replace = True
 90
 91    rdr = load_rdr_py()
 92
 93    sid = sf00.sid
 94    # assert isinstance(sid, SID)
 95
 96    # patch the OGSID to match the CFID
 97    # We should determine the ogsid instead from the switch log instead
 98    # if hasattr(sid, 'cfid'):
 99        # disable 'unused imports' here otherwise the jenkins core pylint check fails
100        # from chart.sids.sid_msg import CFID  # pylint: disable=F0401
101        # from chart.sids.sid_msg import OGSID  # pylint: disable=F0401
102        # if sid.cfid is CFID.OPER:
103            # sid.ogsid = OGSID.PRIM
104
105        # elif sid.cfid is CFID.VALI:
106            # sid.ogsid = OGSID.BACK
107
108    extractors = rdr.make_extractors(sf00.get_assembly_id())
109
110    #db.connection().stmtcachesize = 1000  # seems to slow things down
111
112    if len(extractors) == 0:
113        if sf00.get_assembly_id() == 19:  # disable check on HKTM FRAME for now
114            return
115
116        raise ValueError('No suitable conversions found for input SF00 with ASSY {id}'.format(
117            id=sf00.get_assembly_id()))
118
119    for extractor in extractors:
120        try:
121            table = TableInfo(extractor.__class__.name)
122        except NoSuchTable:
123            logging.info('Disabled table {tbl}'.format(tbl=extractor.__class__.name))
124            return []
125        db_conn = db_connect(table)
126        if db_conn.engine is DatabaseEngine.ORACLE:
127            from chart.products.sf00.ingester_oracle import OracleBuffer
128            extractor.buffer = OracleBuffer(
129                db_conn,
130                table,
131                None if replace is False else replace_delete_post_window,
132                fuzzy_replace)
133                # sf00.get_sensing_start())
134
135        elif db_conn.engine is DatabaseEngine.POSTGRESQL:
136            from chart.products.sf00.ingester_oracle import OracleBuffer
137            # from chart.products.ingester_postgres import PostgresBuffer
138            # extractor.buffer = PostgresBuffer(
139            extractor.buffer = OracleBuffer(
140                db_conn,
141                table,
142                None if replace is None or replace is False else replace_delete_post_window,
143                fuzzy_replace)
144
145    ingest_start = time.time()
146
147    logging.debug('Input identified, creating {0} extractors'.format(len(extractors)))
148    prev_timestamp = None
149
150    first_time = None
151    last_time = None
152
153    for timestamp, payload in time_filter(sf00.gen_snacks()):
154        if prev_timestamp == timestamp:
155            logging.error('Dropping snack with timestamp {ts} due to duplicate timestamp'.format(
156                    ts=timestamp))
157            continue
158
159        for extractor in extractors:
160            extractor.buffer.insert(extractor.handle(sid, timestamp, payload))
161
162        prev_timestamp = timestamp
163
164        if first_time is None:
165            first_time = timestamp
166
167        last_time = timestamp
168
169    conns = set()
170    for extractor in extractors:
171        extractor.buffer.flush()
172        # extractor.buffer.commit()
173        conns.add(extractor.buffer.db_conn)
174
175    # just commit each DB connection once - some SF00 types write to 60+ tables
176    # so it might be a problem to overcommit.
177    for c in conns:
178        c.commit()
179
180    ingest_time = time.time() - ingest_start
181
182    logging.debug('Timing assy {assy} tables {tables} ingest {ingest:.3} insert {insert:.3}'.format(
183            assy=sf00.get_assembly_id(),
184            tables=len(extractors),
185            ingest=ingest_time,
186            insert=sum(e.buffer.insert_time for e in extractors)))
187
188    return [{'table': e.name,
189             'sensing_start': first_time,
190             'sensing_stop': last_time} for e in extractors]