1#!/usr/bin/env python3
  2
  3"""Ingest events XML files."""
  4
  5import logging
  6from collections import defaultdict
  7from itertools import chain
  8
  9from chart.db.connection import db_connect
 10from chart.common.xml import load_xml
 11from chart.common.xml import parsechildstrs
 12from chart.common.xml import parsechilddatetime
 13from chart.events.db import find_events
 14from chart.events.db import store_event
 15from chart.events.db import delete_event
 16from chart.events.db import update_event
 17from chart.events.db import delete_events
 18from chart.events.emails import event_notify
 19from chart.backend.eventsfile import EventsFileReader
 20from chart.project import SID
 21from chart import settings
 22
 23ELEM_VALIDITY = 'validity'
 24ELEM_START_TIME = 'start-time'
 25ELEM_STOP_TIME = 'stop-time'
 26ELEM_EVENT = 'event'
 27
 28db_conn = db_connect('EVENTS')
 29
 30logger = logging.getLogger()
 31
 32
 33def find_match_and_mark(event, old_events):
 34    """Find matching event in the list.
 35
 36    If `event` is a perfect match to one of `old_events`, mark the old
 37        event by settings a 'keep' attribute and return 'same'
 38    If we find a partial match where the only different is the value of a single instance property
 39        return 'similar'
 40    Otherwise return 'nomatch'
 41
 42    Args:
 43        event: Event to match to the old_events list
 44        old_events: List of events already present in the db for this scid and time range
 45
 46    Returns:
 47        A tuple of match type ('same' | 'similar' | 'nomatch') and matched old event or None
 48            if 'nomatch'
 49
 50    """
 51    candidate = None
 52    for e in old_events[event.start_time]:
 53        # if this event was already matched, skip it
 54        if hasattr(e, 'keep'):
 55            continue
 56
 57        if (e.event_classname != event.event_classname or
 58            e.start_time != event.start_time or
 59            # e.stop_time != event.stop_time or
 60            # e.scid != event.scid or
 61            e.sid != event.sid or
 62            e.instance_properties.get('component') != event.instance_properties.get('component')):
 63
 64            continue
 65
 66        # if here we have at least a partial match
 67        # compare instance properties
 68        prop_diff = set(event.instance_properties.keys()).symmetric_difference(
 69            iter(e.instance_properties.keys()))
 70
 71        # if we find any difference in property names, this isn't a match
 72        if len(prop_diff) > 0:
 73            continue
 74
 75        # do the detailed comparison, allow no more than 1 instance property to differ
 76        mismatches = 0
 77        for k, v in event.instance_properties.items():
 78            if e.instance_properties.get(k) != v:
 79                mismatches += 1
 80
 81        if e.stop_time != event.stop_time:
 82            mismatches += 1
 83
 84        if mismatches == 0:
 85            e.keep = True
 86            # if the new event
 87            # if event.gen_time != e.gen_time:
 88                # return 'similar', e
 89
 90            # else:
 91            return 'same', e
 92
 93        if mismatches == 1:
 94            # this is a candidate for partial match, but we keep looking for a perfect match
 95            candidate = e
 96
 97    if candidate is not None:
 98        candidate.keep = True
 99        return 'similar', candidate
100
101    return 'nomatch', None
102
103
104def gen_events(input_file, gen_method):
105    """Yield events from `input_file` with basic preprocessing before ingestion."""
106    for event in EventsFileReader(input_file):
107
108        if settings.USE_GEN_METHOD:
109            event.gen_method = gen_method
110        else:
111            event.gen_method = None
112
113        if event.event_classname.startswith('OPERATOR'):
114            duration = event.instance_properties.get('duration')
115            if event.stop_time is None and duration is not None:
116                event.stop_time = event.start_time + duration
117
118        yield event
119
120    # logger.info('read {cc} events'.format(cc=len(new_events)))
121
122
123def ingest_events(input_file,
124                  clear_existing=False,
125                  sendmails=True,
126                  cutoff_duration=None,
127                  gen_method=None,
128                  quiet=False):
129    """Ingest events from an events.xml file into the database.
130
131    Args:
132        input_file (str or file handle): XML filename or handle to read
133        clear_existing (bool): Delete existing events before ingestion
134        sendmail (bool): send email event notifications if configured
135        force_sendmail (bool): send notifications even if the event is older than 3 days
136        gen_method (str): Override gen_method values
137        quiet (bool): Don't show information about every event
138
139    Returns:
140        A tuple of event counters
141
142    Raises:
143        IOError: If `filename` cannot be opened as a file
144        ParserError: If `filename` is not valid XML
145        ValueError: If `input_file` contains element values which cannot be parsed in context
146
147    """
148    # Remove circular import from dispatcher.py
149    from chart.users.role import RoleManager
150
151    if cutoff_duration is None:
152        cutoff_duration = settings.EMAIL_CUTOFF
153
154    logger.info('Reading events file {name}'.format(name=input_file))
155
156    # build a list of events to be ingested
157    new_events = gen_events(input_file, gen_method)
158
159    # for reporting change summary back to client
160    input_counter = 0
161    deleted_counter = 0
162    updated_counter = 0
163    unchanged_counter = 0
164    stored_counter = 0
165
166    root_elem = load_xml(input_file)
167    validity_elem = root_elem.find(ELEM_VALIDITY)
168    if validity_elem is not None:
169        alg_start = parsechilddatetime(validity_elem, ELEM_START_TIME)
170        alg_stop = parsechilddatetime(validity_elem, ELEM_STOP_TIME)
171        alg_sid = SID.from_xml(validity_elem)
172
173    # New role-based notifications
174    role_manager = RoleManager.instance()
175
176    if validity_elem is not None and clear_existing:
177        raised_classes = parsechildstrs(validity_elem, ELEM_EVENT)
178        logger.info('Deleting {c} event classes for {s} from {strt} to {stop}'.format(
179            c=len(raised_classes), s=alg_sid, strt=alg_start, stop=alg_stop))
180        deleted_counter = delete_events(
181            sid=alg_sid,
182            start_time_ge=alg_start,
183            start_time_le=alg_stop,
184            event_class=raised_classes,
185            commit=False)
186        db_conn.commit()
187
188        for event in new_events:
189            store_event(event, commit=False, quiet=quiet)
190            if event.event_id is not None:
191                event_notify(event, sendmails=sendmails, cutoff_duration=cutoff_duration)
192                role_manager.handle_event(
193                    event, sendmails=sendmails)
194
195            stored_counter += 1
196            input_counter += 1
197
198        db_conn.commit()
199
200    elif validity_elem is not None:
201        raised_classes = parsechildstrs(validity_elem, ELEM_EVENT)
202
203        # prevent any other workers from starting an ingestion until we're done
204        db_conn.execute('LOCK TABLE EVENTS IN EXCLUSIVE MODE')
205
206        # build a list of existing events in our time range
207        logger.debug('Scanning for old events')
208        old_events = defaultdict(list)
209        for old_event in find_events(sid=alg_sid,
210                                     start_time=alg_start,
211                                     start_time_lt=alg_stop,
212                                     event_class=raised_classes):
213            old_events[old_event.start_time].append(old_event)
214
215        logger.debug('Found {cc} unique times in old events'.format(cc=len(old_events)))
216
217        # try to match new events to old ones
218        for new_event in new_events:
219            # logger.debug('processing new event stored {stored}'.format(stored=stored_counter))
220            result, old_event = find_match_and_mark(new_event, old_events)
221            # if new event is the same as old, just keep the old one
222            if result == 'same':
223                # even if they look the same, we update the gen_time of the old event
224                # This is not reported to the user as a change.
225                # We could also update gen_method here.
226                unchanged_counter += 1
227                # old_event.gen_time = new_event.gen_time
228                new_event.event_id = old_event.event_id
229                # update_event(new_event, properties=('gen_time',))
230
231            # if new event is similar to old, update the old one in place
232            elif result == 'similar':
233                updated_counter += 1
234                new_event.event_id = old_event.event_id
235                update_event(new_event)
236
237            # if new event is really new, store it in db
238            elif result == 'nomatch':
239                store_event(new_event, commit=False, quiet=quiet)
240                stored_counter += 1
241                event_notify(new_event, sendmails=sendmails, cutoff_duration=cutoff_duration)
242                role_manager.handle_event(new_event, sendmails=sendmails)
243
244            else:
245                raise ValueError('unknown response from find_match')
246
247            input_counter += 1
248            if input_counter % 10000 == 0:
249                logger.debug('Processed {cc} events'.format(cc=input_counter))
250
251        # delete old events that were neither updated nor kept as is
252        for purgee in chain(*old_events.values()):
253            if not hasattr(purgee, 'keep'):
254                delete_event(purgee.event_id, commit=False)
255                deleted_counter += 1
256
257        db_conn.commit()
258
259    else:
260        # it would be nice to detect unchanged and updated events, as then we could
261        # ingest more cleanly and without creating new event numbers
262
263        # but for now we just delete existing ones and re-ingest
264        for event in new_events:
265            existings = find_events(start_time=event.start_time,
266                                    stop_time=event.start_time,
267                                    event_class=event.event_class)
268            for existing in existings:
269                delete_event(existing.event_id)
270                deleted_counter += 1
271
272            store_event(event, commit=False, quiet=quiet)
273            if event.event_id is not None:
274                event_notify(event, sendmails=sendmails, cutoff_duration=cutoff_duration)
275                role_manager.handle_event(event, sendmails=sendmails)
276
277            stored_counter += 1
278            input_counter += 1
279
280        db_conn.commit()
281
282    logger.info('Ingested {input} events: {stored} new, {deleted} deleted, {updated} updated, '
283                 '{unchanged} unchanged'.format(input=input_counter,
284                                                stored=stored_counter,
285                                                deleted=deleted_counter,
286                                                updated=updated_counter,
287                                                unchanged=unchanged_counter))
288
289    return input_counter, stored_counter, deleted_counter, updated_counter, unchanged_counter