1#!/usr/bin/env python3
  2
  3"""Representation of XML algorithm result file.
  4For creating and reading files.
  5
  6Sample file (from SF00_INGESTER):
  7
  8<?xml version="1.0"?>
  9<result>
 10    <execution-start>2012-06-24T13:27:37Z</execution-start>
 11    <job>
 12        <id>1</id>
 13        <status>COMPLETED</status>
 14        <table>
 15            <name>MSG_SY_0</name>
 16            <sensing-start>2012-03-31T00:00:00</sensing-start>
 17            <sensing-stop>2012-03-31T01:00:00</sensing-stop>
 18        </table>
 19    </job>
 20    <execution-stop>2012-06-24T13:27:38Z</execution-stop>
 21</result>
 22"""
 23
 24import logging
 25from datetime import datetime
 26
 27from lxml.etree import Element, SubElement
 28
 29import chart.alg.settings
 30from chart.project import SID
 31from chart.common.xml import XMLElement
 32from chart.common.xml import datetime_to_xml
 33from chart.common.xml import ParseException
 34from chart.db.model.table import TableInfo
 35from chart.common.traits import name_of_thing
 36from chart.common.traits import is_listlike
 37from chart.backend.job import Job
 38from chart.backend.job import JobStatus
 39from chart.backend.activity import Activity
 40
 41pretty_print = True
 42
 43ELEM_ACTIVITY = 'activity'
 44ELEM_AUX_OUTPUT = 'aux-output'
 45ELEM_EXECUTION_START = 'execution-start'
 46ELEM_EXECUTION_STOP = 'execution-stop'
 47ELEM_ID = 'id'
 48ELEM_JOB = 'job'
 49ELEM_NAME = 'name'
 50ELEM_PRIMARY_OUTPUT = 'primary-output'
 51ELEM_RESULT = 'result'
 52ELEM_SENSING_START = 'sensing-start'
 53ELEM_SENSING_STOP = 'sensing-stop'
 54ELEM_STATUS = 'status'
 55ELEM_TABLE = 'table'
 56
 57logger = logging.getLogger()
 58
 59class ResultTable:
 60    """Representation of a range of data written to by an algorithm."""
 61    def __init__(self, table=None, sensing_start=None, sensing_stop=None, sid=None):
 62        self.table = table
 63        self.sensing_start = sensing_start
 64        self.sensing_stop = sensing_stop
 65        self.sid = sid
 66
 67    def __str__(self):
 68        return 'ResultTable(sid={sid}, table={table}, start={start}, stop={stop}'.format(
 69            sid=self.sid, table=self.table.name, start=self.sensing_start, stop=self.sensing_stop)
 70
 71    def __getitem__(self, arg):
 72        """Allow result file table attributes to be accesses like a dictionary for old code."""
 73        return self.__dict__[arg]
 74
 75class Result:
 76    """Representation of XML results file.
 77
 78    See chart.alg.settings.RESULT_FILENAME for default filename.
 79    """
 80
 81    def __init__(self,
 82                 filename,
 83                 mode='r',
 84                 activity=None):
 85        """
 86        Args:
 87            filename (str): File to read or write
 88            mode (str:'w'|'r'): Read or write mode
 89            activity (Activity): Activity for the job batch. This must be specified in write mode.
 90        """
 91        self.mode = mode
 92        if mode == 'w':
 93            self.elem = XMLElement(tag=ELEM_RESULT)
 94            self.elem.add(tag=ELEM_EXECUTION_START, text=datetime_to_xml(datetime.utcnow()))
 95
 96            if activity is not None:
 97                self.elem.add(tag=ELEM_ACTIVITY, text=activity.name)
 98
 99        elif mode == 'r':
100            self.elem = XMLElement(filename=filename)
101
102        else:
103            raise ValueError('Mode must be "r" or "w"')
104
105    def __del__(self):
106        self.close()
107
108    def close(self):
109        """Write final file to disk."""
110        if self.mode == 'w':
111            self.elem.add(tag=ELEM_EXECUTION_STOP, text=datetime_to_xml(datetime.utcnow()))
112            self.elem.write(chart.alg.settings.RESULT_FILENAME, pretty_print)
113
114    @property
115    def path(self):
116        """Return Path of original XML file, if opened from file."""
117        return self.elem.filename()
118
119    def add_job(self, job, status=None, tables=None):
120        """Add a completed job to output result file.
121
122        Args:
123            job (int): Job ID
124            status (str): Exit status ('completed', 'failed', 'retry', 'timeout')
125            tables (list of dict): Each containing 'table' -> TableInfo,
126                'sensing_start' -> datetime, 'sensing_stop' -> datetime.
127
128        Returns:
129            None
130
131        Raises:
132            None
133        """
134        if status is None:
135            status = job.status
136
137        if status is None:
138            raise ValueError('Status not set for job {jobid}'.format(jobid=job.id))
139
140        job_elem = self.elem.add(tag=ELEM_JOB)
141
142        if job.job_id is not None:
143            job_elem.add(tag=ELEM_ID, text=job.job_id)
144
145        if status == 'COMPLETED':
146            logger.warning('Pass JobStatus.COMPLETED into add_job not a string value')
147            status = JobStatus.COMPLETED
148
149        # Allow status to be either a string or the new enum
150        if status in JobStatus:
151            job_elem.add(tag=ELEM_STATUS, text=status.name)
152
153        else:
154            if status not in JobStatus:
155                raise ValueError('No such status {status} for job {jobid}'.format(
156                        status=status, jobid=job.job_id))
157
158            job_elem.add(tag=ELEM_STATUS, text=status)
159
160        if job.primary_output is not None:
161            job_elem.add(tag=ELEM_PRIMARY_OUTPUT, text=job.primary_output)
162
163        for o in job.aux_outputs:
164            job_elem.add(tag=ELEM_AUX_OUTPUT, text=o)
165
166        if tables is not None:
167            def add_table(table):
168                """Insert a table write description from the client parameter `tables`.
169                table should be a ResultTable object but we also accept a dict of:
170
171                - name
172                - sid
173                - sensing_start
174                - sensing_stop
175                """
176                # SID should really be set because some ingestion jobs don't set the SID
177                # field properly in the JOBS table first so this is the only way for ingestion
178                # code to tell the job chaining system the right source ID.
179                if isinstance(table, dict):
180                    # Allow clients to pass in a dict
181                    table = ResultTable(table=table.get('table'),
182                                        sensing_start=table.get('sensing_start'),
183                                        sensing_stop=table.get('sensing_stop'),
184                                        sid=table.get('sid'))
185
186                if table.table is None:
187                    raise ValueError('Jobs result has no table set')
188
189                if table.sid is None:
190                    if job.sid is not None:
191                        logger.warning('Setting table SID to {sid} from job'.format(sid=job.sid))
192                        table.sid = job.sid
193
194                    else:
195                        raise ValueError('Set the sid attribute of each table when calling add_job')
196
197                # accept a time range of None/None entry without complaint for algs that didn't find
198                # anything to ingest
199                if table.sensing_start is None and table.sensing_stop is None:
200                    return
201
202                # make sure the table structure is well formed
203                if table.sensing_start is None:
204                    raise ValueError('add_job() called with tables attribute but no start time')
205
206                if table.sensing_stop is None:
207                    raise ValueError('add_job() called with tables attribute but no stop time')
208
209                # Not sure why we allow this actually
210                # if table.sensing_start is None or table.sensing_stop is None:
211                    # raise ValueError('add_job() must be called with a valid time range')
212                    # return
213
214                table_elem = job_elem.add(tag=ELEM_TABLE)
215                table_elem.add(tag=ELEM_NAME, text=name_of_thing(table.table))
216                table_elem.add(tag=ELEM_SENSING_START,
217                               text=datetime_to_xml(table.sensing_start, include_us=True))
218                table_elem.add(tag=ELEM_SENSING_STOP,
219                               text=datetime_to_xml(table.sensing_stop, include_us=True))
220
221                # if table.sid is not None:
222                table.sid.to_xml(table_elem.elem)  # SID XML handling only uses raw
223                # Element objects
224
225            if is_listlike(tables):
226                for table in tables:
227                    add_table(table)
228
229            else:
230                add_table(tables)
231
232    def read_jobs(self):
233        """For report files opened in read mode only, read in list of reported jobs."""
234        for job_elem in self.elem.findall(ELEM_JOB):
235            tables = []
236            for table_elem in job_elem.findall(ELEM_TABLE):
237                t = ResultTable(table=TableInfo(table_elem.parse_str(ELEM_NAME)),
238                                sensing_start=table_elem.parse_datetime(ELEM_SENSING_START),
239                                sensing_stop=table_elem.parse_datetime(ELEM_SENSING_STOP))
240                sid = SID.from_xml(table_elem.elem)
241                if sid is not None:
242                    t.sid = sid
243
244                tables.append(t)
245
246            status_str = job_elem.parse_str(ELEM_STATUS, None)
247            job = Job(activity=self.activity,
248                      job_id=job_elem.parse_int(ELEM_ID, None),
249                      status=JobStatus[status_str] if status_str is not None else None)
250
251            job.tables = tables
252            job.primary_output = job_elem.parse_str(ELEM_PRIMARY_OUTPUT, None)
253            job.aux_outputs = job_elem.parse_strs(ELEM_AUX_OUTPUT)
254            yield job
255
256    @property
257    def execution_start(self):
258        """Read execution start time."""
259        return self.elem.parse_datetime(ELEM_EXECUTION_START)
260
261    @property
262    def execution_stop(self):
263        """Read execution stop time."""
264        return self.elem.parse_datetime(ELEM_EXECUTION_STOP)
265
266    @property
267    def activity(self):
268        """Read our <activity> as an Activity."""
269        # there is an odd case where we have a result file with a null activity - when
270        # the user has written a local report template, generated the report and is uploading
271        # it.
272        # This is a valid op since the report template itself contains all metadata required
273        # by ucm
274        name = self.elem.parse_str(ELEM_ACTIVITY, None)
275        if name is None:
276            return None
277
278        else:
279            return Activity(name)