1#!/usr/bin/env python3
2
3"""Ingest SF00 files, algorithm working directories or events XML files to
4Oracle DB and archive directories."""
5
6import os
7import imp
8import time
9import logging
10from datetime import timedelta
11
12from chart.project import settings
13from chart.products.eps.gpfs import time_filter
14from chart.db.model.table import TableInfo
15from chart.db.connection import db_connect
16from chart.db.settings import DatabaseEngine
17from chart.project import SID
18from chart.db.model.exceptions import NoSuchTable
19
20
21class CannotIngest(Exception):
22 """Exception to be raised if an ingestion failed.
23 Usually this is because it violates a uniqueness constraint.
24 """
25
26 def __init__(self, message):
27 super(CannotIngest, self).__init__()
28 self.message = message
29
30 def __str__(self):
31 return self.message
32
33
34def load_rdr_py():
35 """Rebuild rdr.py if the source XML files have changed.
36 The loaded module is returned.
37 """
38 # Hack, we should scan all TS_TABLE_DIRS but it probably doesn't matter for EPS
39 latest_xml_time = max(
40 os.path.getmtime(str(f)) for f in settings.TS_TABLE_DIRS[0].glob('*.xml'))
41
42 # Look for compiled .pyc file
43 pyc_name = str(settings.RDR_PY_FILENAME) + 'c'
44 if os.path.exists(pyc_name) and os.path.getmtime(pyc_name) < latest_xml_time:
45 logging.info('Removing potentially out of date {pyc_name}'.format(pyc_name=pyc_name))
46 os.unlink(pyc_name)
47
48 py_name = settings.RDR_PY_FILENAME
49 if py_name.exists() and os.path.getmtime(str(py_name)) < latest_xml_time:
50 logging.info('Removing potentially out of date {py_name}'.format(py_name=py_name))
51 py_name.unlink()
52
53 try:
54 # pylint complains this import is not used,
55 # but it is only here to confirm whether the code file exists or not
56 # from chart.backend import rdr # (unused variable) pylint: disable=W0612
57 return imp.load_source('rdr', str(py_name))
58 except (ImportError, IOError):
59 from chart.products.eps import rdr_gen
60 rdr_gen.make_rdr_py()
61 return imp.load_source('rdr', str(py_name))
62
63
64def ingest_sf00(sf00,
65 replace,
66 replace_delete_post_window=timedelta(0),
67 fuzzy_replace=None):
68 """Ingest a single SF00 file (from disk or memory object).
69 Don't forget to commit() afterwards.
70
71 Args:
72 `replace` (bool): Delete existing data ranges before ingesting new data.
73 `replace_delete_post_window` (timedelta): When `replace` is specified, delete any existing
74 data within a window starting at the start time of the new data and ending at the stop
75 time of the new data plus `replace_delete_post_window` seconds.
76 `fuzzy_replace` (timedelta): Before ingesting each individual new row delete any existing
77 rows with a timestamp within +/- `fuzzy_replace`.
78
79 Returns:
80 List of dictionaries containing:
81 table
82 sensing_start
83 sensing_stop
84
85 describing which AP tables were written to
86 """
87
88 # if settings.REINGEST:
89 replace = True
90
91 rdr = load_rdr_py()
92
93 sid = sf00.sid
94 # assert isinstance(sid, SID)
95
96 # patch the OGSID to match the CFID
97 # We should determine the ogsid instead from the switch log instead
98 # if hasattr(sid, 'cfid'):
99 # disable 'unused imports' here otherwise the jenkins core pylint check fails
100 # from chart.sids.sid_msg import CFID # pylint: disable=F0401
101 # from chart.sids.sid_msg import OGSID # pylint: disable=F0401
102 # if sid.cfid is CFID.OPER:
103 # sid.ogsid = OGSID.PRIM
104
105 # elif sid.cfid is CFID.VALI:
106 # sid.ogsid = OGSID.BACK
107
108 extractors = rdr.make_extractors(sf00.get_assembly_id())
109
110 #db.connection().stmtcachesize = 1000 # seems to slow things down
111
112 if len(extractors) == 0:
113 if sf00.get_assembly_id() == 19: # disable check on HKTM FRAME for now
114 return
115
116 raise ValueError('No suitable conversions found for input SF00 with ASSY {id}'.format(
117 id=sf00.get_assembly_id()))
118
119 for extractor in extractors:
120 try:
121 table = TableInfo(extractor.__class__.name)
122 except NoSuchTable:
123 logging.info('Disabled table {tbl}'.format(tbl=extractor.__class__.name))
124 return []
125 db_conn = db_connect(table)
126 if db_conn.engine is DatabaseEngine.ORACLE:
127 from chart.products.sf00.ingester_oracle import OracleBuffer
128 extractor.buffer = OracleBuffer(
129 db_conn,
130 table,
131 None if replace is False else replace_delete_post_window,
132 fuzzy_replace)
133 # sf00.get_sensing_start())
134
135 elif db_conn.engine is DatabaseEngine.POSTGRESQL:
136 from chart.products.sf00.ingester_oracle import OracleBuffer
137 # from chart.products.ingester_postgres import PostgresBuffer
138 # extractor.buffer = PostgresBuffer(
139 extractor.buffer = OracleBuffer(
140 db_conn,
141 table,
142 None if replace is None or replace is False else replace_delete_post_window,
143 fuzzy_replace)
144
145 ingest_start = time.time()
146
147 logging.debug('Input identified, creating {0} extractors'.format(len(extractors)))
148 prev_timestamp = None
149
150 first_time = None
151 last_time = None
152
153 for timestamp, payload in time_filter(sf00.gen_snacks()):
154 if prev_timestamp == timestamp:
155 logging.error('Dropping snack with timestamp {ts} due to duplicate timestamp'.format(
156 ts=timestamp))
157 continue
158
159 for extractor in extractors:
160 extractor.buffer.insert(extractor.handle(sid, timestamp, payload))
161
162 prev_timestamp = timestamp
163
164 if first_time is None:
165 first_time = timestamp
166
167 last_time = timestamp
168
169 conns = set()
170 for extractor in extractors:
171 extractor.buffer.flush()
172 # extractor.buffer.commit()
173 conns.add(extractor.buffer.db_conn)
174
175 # just commit each DB connection once - some SF00 types write to 60+ tables
176 # so it might be a problem to overcommit.
177 for c in conns:
178 c.commit()
179
180 ingest_time = time.time() - ingest_start
181
182 logging.debug('Timing assy {assy} tables {tables} ingest {ingest:.3} insert {insert:.3}'.format(
183 assy=sf00.get_assembly_id(),
184 tables=len(extractors),
185 ingest=ingest_time,
186 insert=sum(e.buffer.insert_time for e in extractors)))
187
188 return [{'table': e.name,
189 'sensing_start': first_time,
190 'sensing_stop': last_time} for e in extractors]