1#!/usr/bin/env python3
2
3"""Implementation of Calibration class."""
4
5import logging
6from collections import namedtuple
7import math
8
9from chart.common.xml import parsechildstrs
10from chart.common.xml import parsechildstr
11from chart.common.xml import parsechildfloats
12from chart.common.xml import parsechildfloat
13from chart.common.xml import xml_filename
14from chart.common.xml import XMLElement
15from chart.common.xml import EXT_XML
16from chart.browse.make_url import make_url
17from chart.db.connection import db_connect
18from chart.common.exceptions import ConfigError
19from chart.common.xml import SubElement
20from chart.project import settings
21from chart.project import SID
22from chart.common.traits import to_str
23from chart.common.path import Path
24
25ELEM_CALIBRATION = 'calibration'
26ELEM_NAME = 'name'
27ELEM_EXTRAPOLATE = 'extrapolate'
28ELEM_POLY = 'poly'
29ELEM_LOG = 'log'
30ELEM_COEFF = 'coeff'
31ELEM_LINEAR = 'linear'
32ELEM_SQL = 'sql'
33ELEM_PAIR = 'pair'
34ELEM_RAW = 'raw'
35ELEM_CAL = 'cal'
36
37SCHEMA_CALIBRATIONS = 'calibrations'
38
39# Calibration value to return if the raw value is outside defined range
40# of a linear calibration and <extrapoate> is not set
41DEFAULT_INVALID_ENG_VALUE = 0.0
42
43# If no <extrapolate> element is found in a linear calibration, control whether
44# we extrapolate values outside the given points or return DEFAULT_INVALID_ENG_VALUE
45DEFAULT_EXTRAPOLATE_LINEAR_CALIBRATION = True
46
47# File extension for XML files
48EXT_XML = '.xml'
49
50logger = logging.getLogger()
51
52
53# class Calibrations(object):
54# """Temporary retain original "trait.calibrations.calibrated" function
55
56# until Item <> WidgetOptionsChoice <> EventClass.choices merge is complete.
57
58# This class should not be used by any new code and will be removed soon.
59# """
60
61# def __init__(self, choices):
62# self.choices = choices
63
64# def calibrated(self, value):
65# """Temp"""
66# if self.choices is not None:
67# return nvl(self.choices.get_name(value), '')
68
69# else:
70# return ''
71
72# def raw(self, name):
73# """Temp"""
74# if self.choices is not None:
75# return nvl(self.choices.get_values(name), '')
76
77# else:
78# return ''
79
80class CalibrationFile:
81 """Calibration storage is a bit more complicated than other objects. The scheme is:
82
83 - All are read from the db/cal/ directory
84 - If a <field> has a <calibration-name> element, we look for a file [name].xml
85 which will contain one or more <calibration> elements, with an optional default
86 calibration plus a number of sid-specific calibrations.
87 - Without a calibration name we look for files called [tablename]*.xml. Each contains
88 all calibrations for a table. Each file contains calibrations for only one source,
89 or the default calibrations. The * can be anything, the code just reads them all
90 and examines <source> elements. In this case every <calibration> element needs one or more
91 <field> elements to match them up.
92
93 Note due to a mis-step in the code, this class is used only to create and store
94 calibration files from the PUS code. The Table class handles reading them in all cases.
95 A future task might be to rework it so this class handles all i/o.
96
97 <calibrations>
98 <calibration>
99 <source>
100 <scid>MSG4</>
101 <ogsid>PRIM</>
102 </>
103 </>
104 </>
105 """
106
107 def __init__(self, filename):
108 """Read from `filename` if set."""
109 self.cals = [] # CalibrationCluster
110
111 def store(self, filename=None):
112 """Deduce correct filename if not given. Pass '-' for stdout."""
113 pass
114
115 @staticmethod
116 def get_cals(self, field):
117 # lookup based on field.name or field.calibration_name as needed
118 pass
119
120
121class CalibrationCluster:
122 """Base class to calibration handling."""
123 def __init__(self, field_info=None, calibration_name=None):
124 self.field_info = field_info
125 self.calibration_name = calibration_name
126 self.cals = {}
127
128 def __setitem__(self, sid, cal):
129 self.cals[sid] = cal
130
131 def __getitem__(self, sid):
132 if sid in self.cals:
133 return self.cals[sid]
134
135 if None in self.cals:
136 return self.cals[None]
137
138 return None
139
140 def __len__(self):
141 return len(self.cals)
142
143 def sids(self):
144 return self.cals.keys()
145
146 def get_cal(self, sid):
147 if sid is None:
148 if None in self.cals:
149 return self.cals[None]
150
151 else:
152 return self.cals[self.keys()[0]]
153
154 else:
155 return self.cals.get(sid)
156
157
158class SplitCalibrationCluster(CalibrationCluster):
159 """Legacy calibration support, using anonymous calibration functions in separate dirs.
160
161 For legacy projects Calibrations are constructed from the db/cal directory where each
162 definition begins with a list of <field> nodes it applies to. Calibrations are not
163 named."""
164 def browse_urls(self):
165 pass
166
167def get_cal(filename):
168 """Populate our `calibration` member."""
169
170 node = XMLElement(filename=filename)
171 linear_node = node.find(ELEM_LINEAR)
172 poly_node = node.find(ELEM_POLY)
173 sql_node = node.find(ELEM_SQL)
174 log_node = node.find(ELEM_LOG)
175
176 if linear_node is not None:
177 cal = LinearCalibration(node=linear_node)
178
179 elif poly_node is not None:
180 cal = PolyCalibration(node=poly_node)
181
182 elif sql_node is not None:
183 cal = SQLCalibration(node=sql_node)
184
185 elif log_node is not None:
186 cal = LogCalibration(node=log_node)
187
188 else:
189 assert False
190
191 return cal
192
193
194class NamedCalibrationCluster(CalibrationCluster):
195 """Represent all the calibrations available for various SID (sources).
196
197 These objects will be pointed to by the "calibration" member of a FieldInfo object.
198
199
200 For new PUS-based missions we read named Calibrations from db/named_cal."""
201
202 def load_xml(self):
203 """Populate our `cals` member.
204
205 Using either the `field_info` or `calibration_name` member."""
206
207 if settings.SID_SPECIFIC_DIR is None:
208 # This is for non-PUS projects only
209 for d in settings.CALIBRATION_DIR.glob('*'):
210 # logger.debug('Scanning cal subdir ' + str(d))
211 # named cal dir subfolders must map to SID short names
212 sid = SID(d.name)
213 # logger.debug('sid ' + str(sid))
214 # complete path including our name
215 if self.calibration_name is not None:
216 filename = d.joinpath(self.calibration_name + EXT_XML)
217 else:
218 filename = d.joinpath(self.field_info.calibration_name + EXT_XML)
219
220 if filename.exists():
221 self.cals[sid] = get_cal(filename)
222
223 else:
224 # PUS projects are always multi-SRDB
225 for sid in SID.all():
226 cal_dir = sid.db_dir(settings.CALIBRATION_SUBDIR)
227 if self.calibration_name is not None:
228 filename = cal_dir.joinpath(self.calibration_name + EXT_XML)
229
230 else:
231 # ! why would this happen?
232 filename = cal_dir.joinpath(self.field_info.calibration_name + EXT_XML)
233
234 if filename.exists():
235 self.cals[sid] = get_cal(filename)
236
237
238 def browse_urls(self):
239 pass
240
241 def write_definition(self, output_dir:Path):
242 """Write ourselves to disk.
243
244 For named calibrations, we will create a unique file in
245 {settings.NAMED_CALIBRATION_DIR}/{SID}/{calname}.xml.
246
247 For legacy calibrations (although not implemented) we could write to
248 {settings.CALIBRATION_DIR}/{tablename}_{sid}.xml but this is a shared
249 file and we would have to write a single element of it.
250
251 If settings.SID_SPECIFIC_DIR set then sid specific directory already
252 defined by 'output'."""
253
254 for sid, cal in self.cals.items():
255 if sid is None:
256 logging.warn('Not writing SID-less calibration file')
257 continue
258
259 root_node = XMLElement(tag=ELEM_CALIBRATION)
260 root_node.set_schema(SCHEMA_CALIBRATIONS)
261 cal.write_definition(root_node)
262 root_node.write(output_dir.joinpath(self.field_info.calibration_name + EXT_XML))
263
264
265class Calibration:
266 """Representation of a calibration point for a specific, or default, SID.
267
268 These are stored in FieldInfo.cal as a map of (sid...) against Calibration object."""
269
270 # Concrete Calibration classes override this
271 calibration_type = 'base'
272
273 def __init__(self, description=None):
274 self.description = description
275 self.condition = None
276
277 # @staticmethod
278 # def load_cal(name):
279 # """Load a named calibration.
280
281 # This doesn't work well with multiple spacecraft. To handle those this function
282 # should move to the CalibrationCluster class and be handled there - the
283 # ddl.manage.load_cal() function can use a cluster to build complete DDL."""
284
285 # return Calibration(XMLElement(filename=settings.CALIBRATION_DIR.joinpath(
286 # '{name}{ext}'.format(name=name, ext=EXT_XML))))
287
288 # def calibrate_value(self, value):
289
290 # plsql = parsechildstr(self.elem, ELEM_PLSQL, None)
291 # if plsql is not None:
292 # db_conn = db_connect()
293 # # We assume the require plsql will be present in the primary database.
294 # return db_conn.query("SELECT {fn}(:value) FROM DUAL".format(fn=plsql),
295 # value=int(value)).fetchone()[0]
296
297 # return None
298
299 # def __eq__(self, other):
300 # """Allow comparisons between TableInfo objects to work as expected."""
301 # ! hope this is not used
302 # return id(self.elem) == id(other.elem)
303
304 # @property
305 # def type(self):
306 # """Calibration type.
307 # Can be 'linear', 'poly' or 'plsql'.
308 # """
309
310 # for t in ('linear', 'poly', 'plsql', 'sql'):
311 # if self.elem.find(t) is not None:
312 # return t
313
314 # raise ConfigError('Unknown or missing calibration type',
315 # elem=self.elem,)
316
317 # @property
318 # def linear_pairs(self):
319 # """Return a list of tuples giving the (`raw`, `cal`) values
320 # for each calibration point.
321 # """
322
323 # raw = []
324 # cal = []
325 # linear_elem = self.elem.find(ELEM_LINEAR)
326 # for pair_elem in linear_elem.findall(ELEM_PAIR):
327 # raw.append(parsechildfloat(pair_elem, ELEM_RAW))
328 # cal.append(parsechildfloat(pair_elem, ELEM_CAL))
329
330 # return raw, cal
331
332 # @property
333 # def poly_coeffs(self):
334 # """Return list of all polynomial coefficients."""
335
336 # poly_elem = self.elem.find('poly')
337 # return parsechildfloats(poly_elem, 'coeff')
338
339 # @property
340 # def plsql_function(self):
341 # """Return plsql function name for external calibrations."""
342
343 # return parsechildstr(self.elem, 'plsql')
344
345 # @property
346 # def sql(self):
347 # """Return content of <sql> element."""
348 # return parsechildstr(self.elem, 'sql')
349
350 @property
351 def browse_source_url(self):
352 """Return a URL which can be used to browse the source XML file for this
353 calibration function, scrolling down to the relevant line.
354 """
355 start_line = self.elem.sourceline
356 stop_line = self.elem.getnext().sourceline - 1 if self.elem.getnext() is not None \
357 else 1000000
358 # start_line = None
359 # stop_line = None
360 return make_url(xml_filename(self.elem),
361 highlight=(start_line, stop_line))
362
363
364class PolyCalibration(Calibration):
365 """Polynomial calibration."""
366
367 calibration_type = 'poly'
368
369 def __init__(self, coeffs=None, node=None, description=None):
370 super(PolyCalibration, self).__init__(description=description)
371 self.coeffs = coeffs
372
373 if node is not None:
374 self.coeffs = node.parse_floats(ELEM_COEFF)
375
376 def write_definition(self, node):
377 poly_node = node.add(tag=ELEM_POLY)
378 for coeff in self.coeffs:
379 poly_node.add(tag=ELEM_COEFF, text=coeff)
380
381 def calibrate_value(self, value):
382 """Calibrate a raw value."""
383 res = 0.0
384 for i, coeff in enumerate(self.coeffs):
385 res += (value ** i) * float(coeff)
386
387 return res
388
389
390class LogCalibration(Calibration):
391 """Logarithmic calibration."""
392
393 calibration_type = 'log'
394
395 def __init__(self, coeffs=None, node=None, description=None):
396 super(LogCalibration, self).__init__(description=description)
397 self.coeffs = coeffs
398
399 if node is not None:
400 self.coeffs = node.parse_floats(ELEM_COEFF)
401
402 def write_definition(self, node):
403 log_node = node.add(tag=ELEM_LOG)
404 for coeff in self.coeffs:
405 log_node.add(tag=ELEM_COEFF, text=coeff)
406
407 def calibrate_value(self, value):
408 """Calibrate a raw value."""
409 res = 0.0
410 for i, coeff in enumerate(self.coeffs):
411 res += coeff * math.pow(math.log(value), i)
412
413 return 1.0 / res
414
415
416class LinearCalibration(Calibration):
417 """Linear calibration."""
418
419 Pair = namedtuple('Pair', 'raw cal')
420
421 calibration_type = 'linear'
422
423 def __init__(self, count=None, node=None, description=None, extrapolate=True):
424 super(LinearCalibration, self).__init__(description=description)
425 self.pairs = []
426 self.extrapolate = extrapolate
427 if node is not None:
428 self.extrapolate = node.parse_bool(ELEM_EXTRAPOLATE,
429 DEFAULT_EXTRAPOLATE_LINEAR_CALIBRATION)
430 for pair_node in node.findall(ELEM_PAIR):
431 self.pairs.append(
432 LinearCalibration.Pair(
433 # yes, the raw values can be floating point
434 raw=pair_node.parse_str(ELEM_RAW),
435 cal=pair_node.parse_str(ELEM_CAL)))
436
437 self.count = len(self.pairs)
438
439 else:
440 self.extrapolate = extrapolate
441 self.count = count
442
443 def write_definition(self, node):
444 linear_node = node.add(tag=ELEM_LINEAR)
445 linear_node.add(tag=ELEM_EXTRAPOLATE, text=to_str(self.extrapolate))
446
447 for pair in self.pairs:
448 pair_node = linear_node.add(tag=ELEM_PAIR)
449 pair_node.add(tag=ELEM_RAW, text=pair.raw)
450 pair_node.add(tag=ELEM_CAL, text=pair.cal)
451
452 def calibrate_value(self, value):
453 """Calibrate `value`."""
454 if len(self.pairs) == 0:
455 raise ConfigError('<linear> element contains no <pair> elements')
456
457 if len(self.pairs) == 1:
458 raise ConfigError('<linear> element only contains 1 <pair> element')
459
460 def slope(i):
461 """Compute the slope between points `i` and `i`+1."""
462 return (float(self.pairs[i + 1].cal) - float(self.pairs[i].cal)) /\
463 (float(self.pairs[i + 1].raw) - float(self.pairs[i].raw))
464
465 # <pair><raw>10</raw><cal>100</cal></pair>
466 # <pair><raw>20</raw><cal>200</cal></pair>
467
468 # if `value` is below the range of <pair> elements interpolate back from
469 # the first 2
470 for i in range(len(self.pairs) - 1):
471 if value >= float(self.pairs[i].raw) and value <= float(self.pairs[i + 1].raw):
472 return float(self.pairs[i].cal) + slope(i) * (float(value) - float(self.pairs[i].raw))
473
474 if value <= float(self.pairs[0].raw):
475 if self.extrapolate:
476 return float(self.pairs[0].cal) + slope(0) * (float(value) - float(self.pairs[0].raw))
477 else:
478 return DEFAULT_INVALID_ENG_VALUE
479
480 # if `value` is above the range of <pair> elements interpolate forwards from
481 # the last 2
482 if value >= float(self.pairs[-1].raw):
483 if self.extrapolate:
484 return float(self.pairs[-2].cal) + slope(-2) * (float(value) - float(self.pairs[-2].raw))
485 else:
486 return DEFAULT_INVALID_ENG_VALUE
487
488 raise ValueError('cannot calibrate')
489
490
491class ConditionalCalibration(Calibration):
492 calibration_type = 'conditional'
493
494
495class SQLCalibration(Calibration):
496 def __init__(self, node=None):
497 self.sql_statement = node.text
498
499 calibration_type = 'SQL'
500
501 def calibrate_value(self, value):
502 """Don't bother trying to calibrate these.
503
504 It's just about possible but a lot of work and only affects a couple of parameters."""
505 # db_conn = db_connect()
506 # We assume the require plsql will be present in the primary database.
507 # This is probably Oracle-only not that's not currently a problem
508 # return db_conn.query('SELECT {fn}(:value) FROM DUAL'.format(fn=self.sql_statement),
509 # value=int(value)).fetchone()[0]
510 return None
511
512
513class PlSQLCalibration(Calibration):
514 calibration_type = 'PlSQL'
515 def __init__(self, node=None):
516 self.plsql_function = node.text
517
518 def calibrate_value(self, value):
519 """Calibrate a raw value."""
520 db_conn = db_connect()
521 # We assume the require plsql will be present in the primary database.
522 # This is probably Oracle-only not that's not currently a problem
523 return db_conn.query('SELECT {fn}(:value) FROM DUAL'.format(fn=self.plsql_function),
524 value=int(value)).fetchone()[0]