1#!/usr/bin/env python3
2
3"""Wrapper around event class XML definitions."""
4
5import re
6import fnmatch
7import logging
8import operator
9import importlib
10import collections
11from enum import Enum
12
13from chart.common.path import Path
14from chart.common.decorators import memoized2
15from chart.common.decorators import memoized
16from chart.common.exceptions import ConfigError
17from chart.common.xml import XMLElement
18from chart.project import settings
19from chart.backend.activity import Activity
20from chart.common.traits import is_listlike
21from chart.browse.make_url import make_url
22from chart.events.exceptions import NoSuchEventClass
23
24ELEM_CHOICES = 'choices'
25ELEM_DESCRIPTION = 'description'
26ELEM_EMAIL_FUNCTION = 'email-function'
27ELEM_EMAIL_NOTIFICATION = 'email-notification'
28ELEM_ITEM = 'item'
29ELEM_NAME = 'name'
30ELEM_OPTIONAL = 'optional'
31ELEM_DEFAULTVIEW = 'defaultview'
32ELEM_TEMPLATE = 'template'
33ELEM_TEST = 'test'
34ELEM_UNIT = 'unit'
35ELEM_URL = 'url'
36ELEM_VALUE = 'value'
37ELEM_CLASS_PROPERTY = 'class-property'
38ELEM_RENDERER = 'renderer'
39ELEM_INSTANCE_PROPERTY = 'instance-property'
40ELEM_COLOUR = 'colour'
41ELEM_COLOUR_PARAM = 'use-colour'
42ELEM_DB_TABLE = 'db-table'
43ELEM_DECODER = 'decoder'
44
45logger = logging.getLogger()
46
47
48class StringProp(str):
49 """Subclass of str which can be assigned instance attributes."""
50 pass
51
52
53@memoized
54def get_event_xml():
55 """Return lxml node for the event class definition file."""
56 res = XMLElement(filename=settings.EVENT_CLASSES_FILENAME, xinclude=True)
57 return res
58
59
60class EventRenderer(Enum):
61 """Special handling for generating text representations of some events."""
62 DEFAULT = 'default'
63 TELECOMMAND = 'telecommand'
64 OOL_EVENT = 'ool_event'
65 TELEMETRY = 'telemetry'
66
67
68class EventTimeFilter(Enum):
69 """Specify attribute for ordering (and filtering?) events in event viewer."""
70 # enum values are the strings encoded in event viewer URLs
71 SENSING_TIME = 'sensing'
72 EXECUTION_TIME = 'exec'
73
74# This bit is not used - just an idea to pass the GUI labels through from the Python code
75# instead of hard-coding them into the eventview HTML, to make them project-configurable
76# EventTimeFilter.label = 'TC View'
77# EventTimeFilter.SENSING_TIME.label = 'TC Stack'
78# EventTimeFilter.SENSING_TIME.label = 'TC Hist'
79
80
81class EventClassMeta(type):
82 # we have a cached constructor
83 cache = {}
84
85 def __call__(cls, name):
86 if name in EventClassMeta.cache:
87 return EventClassMeta.cache[name]
88
89 result = EventClass.__new__(cls)
90 result.__init__(name_or_elem=name)
91 EventClassMeta.cache[name] = result
92 return result
93
94
95class EventClass(metaclass=EventClassMeta):
96 """Representation of an event class as specified in the event_classes.xml file."""
97
98 def __init__(self, name_or_elem):
99 """Construct with either the class name or defining element."""
100 if isinstance(name_or_elem, str):
101 # logger.debug('Building EventClass for {n}'.format(n=name_or_elem))
102 name = name_or_elem.upper()
103 elems = get_event_xml().elem.xpath("//class[name/.='{name}']".format(name=name))
104 if len(elems) == 0:
105 raise NoSuchEventClass(name)
106
107 if len(elems) > 1:
108 raise ConfigError('Multiple event classes for ' + name)
109
110 self.elem = XMLElement(elems[0])
111
112 else:
113 self.elem = name_or_elem
114
115 @property
116 def name(self):
117 """The name of this event class."""
118 return self.elem.parse_str('name')
119
120 @property
121 @memoized2
122 def instance_properties(self):
123 """Return an ordered dictionary of the instance properties defined for this event class.
124 Each dictionary key is the property name, each value is a dict containing:
125 `type`: A string giving the data type
126 `optional`: Defined for properties which are optional.
127
128 The ordering matches the order the properties are defined in in the events
129 definition file.
130 """
131
132 scan_elem = self.elem
133 instance_properties = collections.OrderedDict()
134 while True:
135 for prop_elem in scan_elem.findall(ELEM_INSTANCE_PROPERTY):
136 prop_name = prop_elem.parse_str('name')
137 # derived classes may override instance properties from parent classes
138 if prop_name not in instance_properties:
139 prop_type = prop_elem.parse_str('type')
140
141 # we include name in the value dictionary so the value dictionary
142 # can be passed around by itself i.e. if a widget property has type
143 # eventproperty.
144 new_property = {'name': prop_name,
145 'type': prop_type}
146
147 if prop_elem.find(ELEM_OPTIONAL):
148 new_property['optional'] = prop_elem.parse_bool(ELEM_OPTIONAL, True)
149
150 if prop_elem.find(ELEM_DEFAULTVIEW):
151 new_property['defaultview'] = prop_elem.parse_bool(ELEM_DEFAULTVIEW, True)
152
153 unit = prop_elem.parse_str(ELEM_UNIT, None)
154 if unit is not None:
155 new_property['unit'] = unit
156
157 description = prop_elem.parse_str(ELEM_DESCRIPTION, None)
158 if description:
159 new_property['description'] = description
160
161 choices_elem = prop_elem.find(ELEM_CHOICES)
162 if choices_elem is not None:
163 choices = []
164 for item_elem in choices_elem.findall(ELEM_ITEM):
165 new_choice = {'name': item_elem.parse_str(ELEM_NAME)}
166 if item_elem.find(ELEM_DESCRIPTION) is not None:
167 desc = item_elem.parse_str(ELEM_DESCRIPTION)
168 if desc is not None and len(desc) > 0: # we have some wrong
169 # and bad <choice> elements in, i.e. CHART-EPS ASCAT-MODE-CHANGE
170 # but we'll just allow it here instead of fixing the XML
171 # TBD: fix the XML
172 new_choice['description'] = desc
173
174 if item_elem.find(ELEM_VALUE) is not None:
175 new_choice['value'] = item_elem.parse_str(ELEM_VALUE)
176 # values were originally int only but for PUS OOL events
177 # they can be strings, which are mapped to other strings for
178 # display
179 try:
180 new_choice['value'] = int(new_choice['value'])
181 except ValueError:
182 # Leave as string
183 pass
184
185 if item_elem.find(ELEM_COLOUR) is not None:
186 new_choice['colour'] = item_elem.parse_str(ELEM_COLOUR)
187
188 if item_elem.find(ELEM_COLOUR_PARAM) is not None:
189 new_choice['use-colour'] = item_elem.parse_str(ELEM_COLOUR_PARAM)
190
191 choices.append(new_choice)
192
193 new_property['choices'] = choices
194
195 instance_properties[prop_name] = new_property
196
197 parents = scan_elem.elem.xpath('parent::class')
198 if len(parents) == 0:
199 break
200
201 scan_elem = XMLElement(parents[0])
202
203 return instance_properties
204
205 @property
206 @memoized2
207 def class_properties(self):
208 """Return dictionary of class properties."""
209 # Note that the same event cannot be raised by different activities.
210 # activity = None
211 # for a in Activity.all():
212 # if self.name in a.events:
213 # activity = a
214
215 class_properties = collections.OrderedDict()
216
217 # ascend back to root
218 # parent class's class properties do not overwrite same properties of the child class
219 parent = self.elem
220 while parent is not None:
221 for cls_prop in parent.findall(ELEM_CLASS_PROPERTY):
222 prop_name = cls_prop.parse_str(ELEM_NAME)
223 if prop_name not in class_properties:
224 class_properties[prop_name] = cls_prop.parse_str(ELEM_VALUE)
225
226 parents = parent.elem.xpath('parent::class')
227 if len(parents) == 0:
228 parent = None
229
230 elif parents[0] is None:
231 parent = None
232
233 else:
234 parent = XMLElement(parents[0])
235
236 return class_properties
237
238 @property
239 def description(self):
240 """Return class <description>."""
241 return self.elem.parse_str(ELEM_DESCRIPTION, None)
242
243 @property
244 def renderer(self):
245 """Optional special rendered for drawing us as HTML. Child inherits parents setting if not set."""
246 # parent class's class properties do not overwrite same properties of the child class
247 # TBD: The initial read is not needed, we could start with the while loop.
248 # TBD: Just use self.elem.parent instead of messing around with xpath and XMLElement constructors
249 renderer = self.elem.parse_str(ELEM_RENDERER, None)
250 if renderer is None:
251 # renderer not set, check parents...
252 parent = self.elem
253 while parent is not None:
254 parents = parent.elem.xpath('parent::class')
255 if len(parents) == 0:
256 parent = None
257
258 elif parents[0] is None:
259 parent = None
260
261 else:
262 parent = XMLElement(parents[0])
263 renderer = parent.parse_str(ELEM_RENDERER, None)
264
265 if renderer is not None:
266 break
267
268 if renderer is None:
269 return None
270
271 return EventRenderer(renderer)
272
273 @property
274 def db_table(self):
275 """Optional Database table to use to when building Event on the fly.
276 Otherwise, and by default, use generated event stored in EVENTS table.
277 """
278 return self.elem.parse_str(ELEM_DB_TABLE, None)
279
280 @property
281 def decoder(self):
282 """Return the decoder module to use for building events from a custom db table."""
283 decoder_value = self.elem.parse_str(ELEM_DECODER, None)
284 if decoder_value is None:
285 return None
286 else:
287 return importlib.import_module(decoder_value)
288
289 @property
290 def template(self):
291 """Return the template used to generate specific per-instance description strings."""
292 def inner(elem):
293 """Look for a <template> member of `elem`, recursively checking parent elements
294 if we don't have one."""
295 template = elem.parse_str(ELEM_TEMPLATE, None)
296 if template is None:
297 parent = elem.parent()
298 if parent is None:
299 return None
300
301 else:
302 return inner(parent)
303
304 else:
305 # According to
306 # http://stackoverflow.com/questions/1898656/remove-whitespace-in-python-using-\
307 # string-whitespace this is the fastest way to consolidate multiple whitespace
308 # chars
309 return ' '.join(template.split())
310
311 return inner(self.elem)
312
313 @property
314 def url(self):
315 """Event class URL."""
316 res = self.elem.parse_str(ELEM_URL, None, expand_settings=True)
317 if res is None:
318 return None
319
320 else:
321 res = StringProp(res)
322 # string off all text before the final '/'
323 res.text = re.sub('.*/([^/]+)', '\\1', res)
324 return res
325
326 @property
327 def test(self):
328 """Return the content of the <test> element giving the location of an automated
329 test verifying this activity."""
330 val = self.elem.parse_str(ELEM_TEST, None)
331 if val is None:
332 return None
333
334 else:
335 return Path(val)
336
337 @property
338 def email_function(self):
339 """If the class has an <email-function> defined return the function object.
340 Otherwise return None.
341 """
342 full_name = self.elem.parse_str(ELEM_EMAIL_FUNCTION, None)
343 if full_name is None:
344 return None
345
346 module_name, _, fn_name = full_name.rpartition('.')
347 module_obj = importlib.import_module(module_name, __package__)
348 return getattr(module_obj, fn_name)
349
350 @property
351 def email_notification(self):
352 """Return True if this is a notification email.
353 Subscribers to notification emails receive an email when the first event
354 is raised, and a second email when it is denotified.
355 Any events raised between the initial creation and the denotification
356 call do not result in further emails being sent."""
357 return self.elem.parse_bool(ELEM_EMAIL_NOTIFICATION, False)
358
359 def browse_url(self):
360 """Return a URL showing a description of this event class."""
361 from django.urls import reverse
362 return reverse('events:single', kwargs=dict(eventclass=self.name))
363
364 def raised_by(self):
365 """Yield a list of activities which can raise this event.
366 Do not use yield it messes up {%for%}{%empty%} in Django templates.
367 """
368 res = []
369 for a in Activity.all():
370 for e in a.eventnames:
371 if e == self.name:
372 res.append(a)
373
374 return res
375
376 def no_raisers(self):
377 """True is nothing raises this event. Only used by Django single_eventclass.html
378 template. There is probably a way to eliminate this and do it in Django instead."""
379 # for some bizarre reason this function is invisible in Django templates !?!
380 return len(self.raised_by) == 0
381
382 def is_operator(self):
383 """Return true is we are an operator event manually ingested."""
384 return self.name.startswith('OPERATOR-')
385
386 def browse_test(self):
387 """Link to the defined unit test."""
388 if self.test is None:
389 return None
390
391 rel_path = self.test.relative_to(settings.PROJECT_HOME_DIR)
392 return {'text': self.test.name,
393 'url': '{root}/{filename}'.format(
394 root=settings.PROJECT_BROWSE_URL,
395 filename=rel_path)}
396
397 @property
398 @memoized2
399 def browse_source_url(self):
400 """Return a dictionary describing a URL to a page giving information about this event.
401 The dictionary contains 'text' (a text label for the URL) and 'url' (the URL itself)."""
402
403 def seek(filename):
404 """Find which actual file we were defined in, following xinclude links as
405 needed."""
406 logger.debug('Examing file {path}'.format(path=filename))
407 root_elem = XMLElement(filename=filename)
408
409 for class_elem in root_elem.elem.xpath('//class'):
410 # logger.debug('got name ' + parsechildstr(class_elem, 'name'))
411 if XMLElement(class_elem).parse_str(ELEM_NAME) == self.name:
412 return filename
413
414 for xi_elem in root_elem.elem.findall('{http://www.w3.org/2001/XInclude}include'):
415 # logger.debug('link ' + xi_elem.get('href'))
416 res = seek(filename.parent.child(xi_elem.get('href')))
417 if res is not None:
418 return res
419
420 filename = seek(settings.EVENT_CLASSES_FILENAME)
421 return make_url(filename, fmt=dict)
422
423 @property
424 def instrument(self):
425 """For event classes which are clearly associated with a specific instrument, return that
426 instrument name. Possible return values:
427
428 ADCS
429 AMSU-A1
430 AMSU-A2
431 ASCAT
432 AVHRR
433 GOME
434 GRAS
435 HIRS
436 IASI
437 MHS
438 NIU
439 SARP
440 SARR
441 SEM
442
443 or None.
444
445 """
446
447 if self.name.endswith('-MODE-CHANGE'):
448 return self.name[:-12]
449
450 @staticmethod
451 def expand(source):
452 """We take `source` which is either a simple string or a list of strings.
453 Each one is either an actual event class name or a wildcard which expands into
454 one or more event class names. Return a list of EventClasses."""
455 result = set()
456
457 def imp(instr):
458 """Process a single string."""
459 new_classes = []
460 for e in EventClass.all(wildcard=instr.upper()):
461 new_classes.append(e)
462
463 if len(new_classes) == 0:
464 for e in EventClass.all(wildcard=instr.upper().replace('_', '-')):
465 raise ValueError('In the event classname Did you mean to use "-"'
466 'as a word delimiter not "_"?')
467
468 raise ValueError('Could not find any event classes matching {s}'.format(s=instr))
469
470 else:
471 print('Expanded {orig} to {dest}'.format(
472 orig=instr, dest=', '.join(c.name for c in new_classes)))
473
474 for nc in new_classes:
475 result.add(nc)
476
477 if is_listlike(source):
478 for s in source:
479 imp(s)
480
481 else:
482 imp(source)
483
484 return result
485
486 @staticmethod
487 def _gen_eventclasses(prefix=None, wildcard=None):
488 """Yield a list of all EventClasses rendered from events XML file.
489 `wildcard` can use normal glob style chars (*, ?) and we also allow
490 SQL-style '%' since that's easier to enter from the command line."""
491 if wildcard is not None and '%' in wildcard:
492 wildcard = wildcard.replace('%', '*')
493
494 for e in get_event_xml().elem.xpath('//class'):
495 # test for derived classes
496 if len(e.findall('class')) > 0:
497 # we don't return this class if it is abstract
498 continue
499
500 # test against the prefix if given
501 if prefix is not None and not XMLElement(e).parse_str(ELEM_NAME).startswith(prefix):
502 continue
503
504 if wildcard is not None:
505 if not fnmatch.fnmatch(XMLElement(e).parse_str(ELEM_NAME), wildcard):
506 continue
507
508 yield EventClass(XMLElement(e))
509
510 @staticmethod
511 def all(prefix=None, wildcard=None):
512 """Return a list of all final Event Classes.
513 Only includes concrete classes, not abstract classes which are subclassed
514 as they are not meant to be instantiated.
515 Call via EventClass.all()
516 Non memoizable due to default argument.
517 """
518
519 assert not (wildcard is not None and prefix is not None), 'Cannot use both "prefix" and '\
520 '"wildcard" parameters to EventClass.all()'
521
522 if wildcard is not None:
523 return EventClass._gen_eventclasses(wildcard=wildcard)
524
525 elif prefix is None:
526 return EventClass._all_eventclasses_imp()
527
528 else:
529 return EventClass._all_eventclasses_imp_prefix(prefix)
530
531 @staticmethod
532 @memoized
533 def _all_eventclasses_imp():
534 """Return all EventClasses (memoized)."""
535 # return sorted(_gen_eventclasses(), key=operator.attrgetter('name'))
536 res = list(EventClass._gen_eventclasses())
537 res.sort(key=operator.attrgetter('name'))
538 return res
539
540 @staticmethod
541 @memoized
542 def _all_eventclasses_imp_prefix(prefix):
543 """Return all EventClasses matching `prefix`."""
544 # return sorted(_gen_eventclasses(prefix), key=operator.attrgetter('name'))
545 res = list(EventClass._gen_eventclasses(prefix))
546 res.sort(key=operator.attrgetter('name'))
547 return res