1#!/usr/bin/env python3
2
3"""Wrapper around the XML events file. These are generated by algorithms which raise events,
4and the file is then read back by the `dispatcher` module to create rows in the EVENTS and
5EVENT_PROPERTIES tables.
6
7The events.xml file header states the validity of the file: the SID, time range and
8list of event types included in the file."""
9
10import logging
11from collections import defaultdict
12from chart.common.xml import Element
13from chart.common.xml import SubElement
14
15import chart.alg.settings
16from chart.common.xml import datetime_to_xml
17from chart.common.xml import write_xml
18from chart.common.xml import XMLElement
19from chart.events.event import Event
20from chart.common.xml import parsechilddatetime
21from chart.common.util import nvl_min
22from chart.common.util import nvl_max
23
24pretty_print = True
25
26ELEM_EVENTS = 'events'
27ELEM_EVENT = 'event'
28ELEM_VALIDITY = 'validity'
29ELEM_START_TIME = 'start-time'
30ELEM_STOP_TIME = 'stop-time'
31
32logger = logging.getLogger()
33
34
35class EventsFile:
36 """Write an events.xml file."""
37
38 def __init__(self, wo=None):
39 logger.info('eventsfile constructor wo {wo}'.format(wo=str(wo)))
40 # We record the file SID as set in either set_validity
41 # or set_sid so we can throw an exception if the client subsequently
42 # sets a different SID (most code will assume all events in a file use the same)
43 self.sid = None
44 self.elem = Element(ELEM_EVENTS)
45 self.activity = None
46 self.event_stats = defaultdict(int)
47 self.validity_elem = None
48
49 if wo is not None:
50 self.set_work_order(wo)
51
52 def set_validity(self, activity, sid=None, start_time=None, stop_time=None):
53 """Create a <validity> header element."""
54 self.validity_elem = SubElement(self.elem, ELEM_VALIDITY)
55 self.activity = activity
56 if start_time is not None:
57 SubElement(self.validity_elem, ELEM_START_TIME).text = datetime_to_xml(
58 start_time, include_us=True)
59
60 if stop_time is not None:
61 SubElement(self.validity_elem, ELEM_STOP_TIME).text = datetime_to_xml(
62 stop_time, include_us=True)
63
64 if sid is not None:
65 sid.to_xml(self.validity_elem)
66 self.sid = sid
67
68 for event_classname in activity.eventnames:
69 SubElement(self.validity_elem, ELEM_EVENT).text = event_classname
70
71 def set_sid(self, sid):
72 """Set source ID if not done in set_validity()."""
73 if self.sid is None:
74 sid.to_xml(self.validity_elem)
75 self.sid = sid
76
77 elif sid != self.sid:
78 logger.error('Multiple SIDs detected in the same events file')
79
80 def set_work_order(self, wo):
81 """Set our header start, stop times and list of included events based on work order
82 file."""
83 self.validity_elem = SubElement(self.elem, ELEM_VALIDITY)
84 self.activity = wo.activity
85
86 start_time = None
87 stop_time = None
88 sid = None
89 for job in wo.read_jobs():
90 # logger.debug('gotjob ' + str(id(job)) + ' ' + str(job))
91 start_time = nvl_min(start_time, job.sensing_start)
92 stop_time = nvl_max(stop_time, job.sensing_stop)
93
94 if sid is None:
95 sid = job.sid
96
97 else:
98 if sid != job.sid:
99 raise ValueError('All jobs must have the same SID')
100
101 if start_time is not None:
102 SubElement(self.validity_elem, ELEM_START_TIME).text = datetime_to_xml(
103 start_time, include_us=True)
104
105 if stop_time is not None:
106 SubElement(self.validity_elem, ELEM_STOP_TIME).text = datetime_to_xml(
107 stop_time, include_us=True)
108
109 if sid is not None:
110 sid.to_xml(self.validity_elem)
111 self.sid = sid
112
113 for event_classname in wo.activity.eventnames:
114 SubElement(self.validity_elem, ELEM_EVENT).text = event_classname
115
116 def __del__(self):
117 self.close()
118
119 def add(self, event):
120 """Insert a single event into the file."""
121 # logger.debug('Adding event {ev}'.format(ev=event))
122 if self.activity is not None and event.event_classname not in self.activity.eventnames:
123 raise ValueError(
124 'This activity is not allowed to raise events of class {event}'.format(
125 event=event.event_classname))
126
127 event.check()
128 event.to_xml(self.elem)
129 self.event_stats[event.event_classname] += 1
130
131 @property
132 def filename(self):
133 """Return default filename for this file."""
134 return chart.alg.settings.EVENTS_FILENAME
135
136 def close(self):
137 """Write the final file to disk."""
138 # skip if no start/stop time set and no events raised
139 # this happens in S3 TCHIST_SCAN if no files ingested
140 if self.elem is not None and self.validity_elem is not None and not (
141 self.get_start_time() is None and
142 self.get_stop_time() is None and
143 sum(self.event_stats.values()) == 0):
144 # print('closing')
145 write_xml(self.elem, self.filename, pretty_print)
146 self.elem = None
147
148 def get_event_count(self):
149 """Return total number of events raised."""
150 return sum(self.event_stats.values())
151
152 def log_report(self, target=logger.info):
153 """Log a summary of counts of events raised."""
154 target('Raised {cc} events in total'.format(cc=sum(self.event_stats.values())))
155 for k, v in self.event_stats.items():
156 target(' {name}: {cc}'.format(cc=v, name=k))
157
158 def get_start_time(self):
159 """Retrieve the validity start time."""
160 return parsechilddatetime(self.validity_elem, ELEM_START_TIME, None)
161
162 def set_start_time(self, start_time):
163 """Force a shifted start validity time."""
164 start_elem = self.validity_elem.find(ELEM_START_TIME)
165 if start_elem is None:
166 start_elem = SubElement(self.validity_elem, ELEM_START_TIME)
167
168 start_elem.text = datetime_to_xml(start_time)
169
170 def get_stop_time(self):
171 """Retrieve the validity stop time."""
172 return parsechilddatetime(self.validity_elem, ELEM_STOP_TIME, None)
173
174 def set_stop_time(self, stop_time):
175 """Force a shifted stop validity time."""
176 stop_elem = self.validity_elem.find(ELEM_STOP_TIME)
177 if stop_elem is None:
178 stop_elem = SubElement(self.validity_elem, ELEM_STOP_TIME)
179
180 stop_elem.text = datetime_to_xml(stop_time)
181
182 start_time = property(get_start_time, set_start_time)
183 stop_time = property(get_stop_time, set_stop_time)
184
185
186class EventsFileReader:
187 """Open an events.xml file for reading and extract Events from it."""
188
189 def __init__(self, filename=None, buff=None):
190 if filename is not None:
191 self.filename = filename
192 self.elem = XMLElement(filename=filename)
193
194 elif buff is not None:
195 self.filename = None
196 self.elem = XMLElement(from_text=buff)
197
198 def __iter__(self):
199 return self.gen_events()
200
201 def gen_events(self):
202 """Yield all Events from our file."""
203 # pulse = 1000
204 for _, event_elem in enumerate(self.elem.findall(ELEM_EVENT)):
205 # if cc % pulse == pulse:
206 # logger.debug('Loaded {cc} events'.format(cc=cc))
207 yield Event.build_from_xml(event_elem)