1#!/usr/bin/env python3
2
3"""Ingest events XML files."""
4
5import logging
6from collections import defaultdict
7from itertools import chain
8
9from chart.db.connection import db_connect
10from chart.common.xml import load_xml
11from chart.common.xml import parsechildstrs
12from chart.common.xml import parsechilddatetime
13from chart.events.db import find_events
14from chart.events.db import store_event
15from chart.events.db import delete_event
16from chart.events.db import update_event
17from chart.events.db import delete_events
18from chart.events.emails import event_notify
19from chart.backend.eventsfile import EventsFileReader
20from chart.project import SID
21from chart import settings
22
23ELEM_VALIDITY = 'validity'
24ELEM_START_TIME = 'start-time'
25ELEM_STOP_TIME = 'stop-time'
26ELEM_EVENT = 'event'
27
28db_conn = db_connect('EVENTS')
29
30logger = logging.getLogger()
31
32
33def find_match_and_mark(event, old_events):
34 """Find matching event in the list.
35
36 If `event` is a perfect match to one of `old_events`, mark the old
37 event by settings a 'keep' attribute and return 'same'
38 If we find a partial match where the only different is the value of a single instance property
39 return 'similar'
40 Otherwise return 'nomatch'
41
42 Args:
43 event: Event to match to the old_events list
44 old_events: List of events already present in the db for this scid and time range
45
46 Returns:
47 A tuple of match type ('same' | 'similar' | 'nomatch') and matched old event or None
48 if 'nomatch'
49
50 """
51 candidate = None
52 for e in old_events[event.start_time]:
53 # if this event was already matched, skip it
54 if hasattr(e, 'keep'):
55 continue
56
57 if (e.event_classname != event.event_classname or
58 e.start_time != event.start_time or
59 # e.stop_time != event.stop_time or
60 # e.scid != event.scid or
61 e.sid != event.sid or
62 e.instance_properties.get('component') != event.instance_properties.get('component')):
63
64 continue
65
66 # if here we have at least a partial match
67 # compare instance properties
68 prop_diff = set(event.instance_properties.keys()).symmetric_difference(
69 iter(e.instance_properties.keys()))
70
71 # if we find any difference in property names, this isn't a match
72 if len(prop_diff) > 0:
73 continue
74
75 # do the detailed comparison, allow no more than 1 instance property to differ
76 mismatches = 0
77 for k, v in event.instance_properties.items():
78 if e.instance_properties.get(k) != v:
79 mismatches += 1
80
81 if e.stop_time != event.stop_time:
82 mismatches += 1
83
84 if mismatches == 0:
85 e.keep = True
86 # if the new event
87 # if event.gen_time != e.gen_time:
88 # return 'similar', e
89
90 # else:
91 return 'same', e
92
93 if mismatches == 1:
94 # this is a candidate for partial match, but we keep looking for a perfect match
95 candidate = e
96
97 if candidate is not None:
98 candidate.keep = True
99 return 'similar', candidate
100
101 return 'nomatch', None
102
103
104def gen_events(input_file, gen_method):
105 """Yield events from `input_file` with basic preprocessing before ingestion."""
106 for event in EventsFileReader(input_file):
107
108 if settings.USE_GEN_METHOD:
109 event.gen_method = gen_method
110 else:
111 event.gen_method = None
112
113 if event.event_classname.startswith('OPERATOR'):
114 duration = event.instance_properties.get('duration')
115 if event.stop_time is None and duration is not None:
116 event.stop_time = event.start_time + duration
117
118 yield event
119
120 # logger.info('read {cc} events'.format(cc=len(new_events)))
121
122
123def ingest_events(input_file,
124 clear_existing=False,
125 sendmails=True,
126 cutoff_duration=None,
127 gen_method=None,
128 quiet=False):
129 """Ingest events from an events.xml file into the database.
130
131 Args:
132 input_file (str or file handle): XML filename or handle to read
133 clear_existing (bool): Delete existing events before ingestion
134 sendmail (bool): send email event notifications if configured
135 force_sendmail (bool): send notifications even if the event is older than 3 days
136 gen_method (str): Override gen_method values
137 quiet (bool): Don't show information about every event
138
139 Returns:
140 A tuple of event counters
141
142 Raises:
143 IOError: If `filename` cannot be opened as a file
144 ParserError: If `filename` is not valid XML
145 ValueError: If `input_file` contains element values which cannot be parsed in context
146
147 """
148 # Remove circular import from dispatcher.py
149 from chart.users.role import RoleManager
150
151 if cutoff_duration is None:
152 cutoff_duration = settings.EMAIL_CUTOFF
153
154 logger.info('Reading events file {name}'.format(name=input_file))
155
156 # build a list of events to be ingested
157 new_events = gen_events(input_file, gen_method)
158
159 # for reporting change summary back to client
160 input_counter = 0
161 deleted_counter = 0
162 updated_counter = 0
163 unchanged_counter = 0
164 stored_counter = 0
165
166 root_elem = load_xml(input_file)
167 validity_elem = root_elem.find(ELEM_VALIDITY)
168 if validity_elem is not None:
169 alg_start = parsechilddatetime(validity_elem, ELEM_START_TIME)
170 alg_stop = parsechilddatetime(validity_elem, ELEM_STOP_TIME)
171 alg_sid = SID.from_xml(validity_elem)
172
173 # New role-based notifications
174 role_manager = RoleManager.instance()
175
176 if validity_elem is not None and clear_existing:
177 raised_classes = parsechildstrs(validity_elem, ELEM_EVENT)
178 logger.info('Deleting {c} event classes for {s} from {strt} to {stop}'.format(
179 c=len(raised_classes), s=alg_sid, strt=alg_start, stop=alg_stop))
180 deleted_counter = delete_events(
181 sid=alg_sid,
182 start_time_ge=alg_start,
183 start_time_le=alg_stop,
184 event_class=raised_classes,
185 commit=False)
186 db_conn.commit()
187
188 for event in new_events:
189 store_event(event, commit=False, quiet=quiet)
190 if event.event_id is not None:
191 event_notify(event, sendmails=sendmails, cutoff_duration=cutoff_duration)
192 role_manager.handle_event(
193 event, sendmails=sendmails)
194
195 stored_counter += 1
196 input_counter += 1
197
198 db_conn.commit()
199
200 elif validity_elem is not None:
201 raised_classes = parsechildstrs(validity_elem, ELEM_EVENT)
202
203 # prevent any other workers from starting an ingestion until we're done
204 db_conn.execute('LOCK TABLE EVENTS IN EXCLUSIVE MODE')
205
206 # build a list of existing events in our time range
207 logger.debug('Scanning for old events')
208 old_events = defaultdict(list)
209 for old_event in find_events(sid=alg_sid,
210 start_time=alg_start,
211 start_time_lt=alg_stop,
212 event_class=raised_classes):
213 old_events[old_event.start_time].append(old_event)
214
215 logger.debug('Found {cc} unique times in old events'.format(cc=len(old_events)))
216
217 # try to match new events to old ones
218 for new_event in new_events:
219 # logger.debug('processing new event stored {stored}'.format(stored=stored_counter))
220 result, old_event = find_match_and_mark(new_event, old_events)
221 # if new event is the same as old, just keep the old one
222 if result == 'same':
223 # even if they look the same, we update the gen_time of the old event
224 # This is not reported to the user as a change.
225 # We could also update gen_method here.
226 unchanged_counter += 1
227 # old_event.gen_time = new_event.gen_time
228 new_event.event_id = old_event.event_id
229 # update_event(new_event, properties=('gen_time',))
230
231 # if new event is similar to old, update the old one in place
232 elif result == 'similar':
233 updated_counter += 1
234 new_event.event_id = old_event.event_id
235 update_event(new_event)
236
237 # if new event is really new, store it in db
238 elif result == 'nomatch':
239 store_event(new_event, commit=False, quiet=quiet)
240 stored_counter += 1
241 event_notify(new_event, sendmails=sendmails, cutoff_duration=cutoff_duration)
242 role_manager.handle_event(new_event, sendmails=sendmails)
243
244 else:
245 raise ValueError('unknown response from find_match')
246
247 input_counter += 1
248 if input_counter % 10000 == 0:
249 logger.debug('Processed {cc} events'.format(cc=input_counter))
250
251 # delete old events that were neither updated nor kept as is
252 for purgee in chain(*old_events.values()):
253 if not hasattr(purgee, 'keep'):
254 delete_event(purgee.event_id, commit=False)
255 deleted_counter += 1
256
257 db_conn.commit()
258
259 else:
260 # it would be nice to detect unchanged and updated events, as then we could
261 # ingest more cleanly and without creating new event numbers
262
263 # but for now we just delete existing ones and re-ingest
264 for event in new_events:
265 existings = find_events(start_time=event.start_time,
266 stop_time=event.start_time,
267 event_class=event.event_class)
268 for existing in existings:
269 delete_event(existing.event_id)
270 deleted_counter += 1
271
272 store_event(event, commit=False, quiet=quiet)
273 if event.event_id is not None:
274 event_notify(event, sendmails=sendmails, cutoff_duration=cutoff_duration)
275 role_manager.handle_event(event, sendmails=sendmails)
276
277 stored_counter += 1
278 input_counter += 1
279
280 db_conn.commit()
281
282 logger.info('Ingested {input} events: {stored} new, {deleted} deleted, {updated} updated, '
283 '{unchanged} unchanged'.format(input=input_counter,
284 stored=stored_counter,
285 deleted=deleted_counter,
286 updated=updated_counter,
287 unchanged=unchanged_counter))
288
289 return input_counter, stored_counter, deleted_counter, updated_counter, unchanged_counter