1#!/usr/bin/env python3
2
3"""Representation of XML algorithm result file.
4For creating and reading files.
5
6Sample file (from SF00_INGESTER):
7
8<?xml version="1.0"?>
9<result>
10 <execution-start>2012-06-24T13:27:37Z</execution-start>
11 <job>
12 <id>1</id>
13 <status>COMPLETED</status>
14 <table>
15 <name>MSG_SY_0</name>
16 <sensing-start>2012-03-31T00:00:00</sensing-start>
17 <sensing-stop>2012-03-31T01:00:00</sensing-stop>
18 </table>
19 </job>
20 <execution-stop>2012-06-24T13:27:38Z</execution-stop>
21</result>
22"""
23
24import logging
25from datetime import datetime
26
27from lxml.etree import Element, SubElement
28
29import chart.alg.settings
30from chart.project import SID
31from chart.common.xml import XMLElement
32from chart.common.xml import datetime_to_xml
33from chart.common.xml import ParseException
34from chart.db.model.table import TableInfo
35from chart.common.traits import name_of_thing
36from chart.common.traits import is_listlike
37from chart.backend.job import Job
38from chart.backend.job import JobStatus
39from chart.backend.activity import Activity
40
41pretty_print = True
42
43ELEM_ACTIVITY = 'activity'
44ELEM_AUX_OUTPUT = 'aux-output'
45ELEM_EXECUTION_START = 'execution-start'
46ELEM_EXECUTION_STOP = 'execution-stop'
47ELEM_ID = 'id'
48ELEM_JOB = 'job'
49ELEM_NAME = 'name'
50ELEM_PRIMARY_OUTPUT = 'primary-output'
51ELEM_RESULT = 'result'
52ELEM_SENSING_START = 'sensing-start'
53ELEM_SENSING_STOP = 'sensing-stop'
54ELEM_STATUS = 'status'
55ELEM_TABLE = 'table'
56
57logger = logging.getLogger()
58
59class ResultTable:
60 """Representation of a range of data written to by an algorithm."""
61 def __init__(self, table=None, sensing_start=None, sensing_stop=None, sid=None):
62 self.table = table
63 self.sensing_start = sensing_start
64 self.sensing_stop = sensing_stop
65 self.sid = sid
66
67 def __str__(self):
68 return 'ResultTable(sid={sid}, table={table}, start={start}, stop={stop}'.format(
69 sid=self.sid, table=self.table.name, start=self.sensing_start, stop=self.sensing_stop)
70
71 def __getitem__(self, arg):
72 """Allow result file table attributes to be accesses like a dictionary for old code."""
73 return self.__dict__[arg]
74
75class Result:
76 """Representation of XML results file.
77
78 See chart.alg.settings.RESULT_FILENAME for default filename.
79 """
80
81 def __init__(self,
82 filename,
83 mode='r',
84 activity=None):
85 """
86 Args:
87 filename (str): File to read or write
88 mode (str:'w'|'r'): Read or write mode
89 activity (Activity): Activity for the job batch. This must be specified in write mode.
90 """
91 self.mode = mode
92 if mode == 'w':
93 self.elem = XMLElement(tag=ELEM_RESULT)
94 self.elem.add(tag=ELEM_EXECUTION_START, text=datetime_to_xml(datetime.utcnow()))
95
96 if activity is not None:
97 self.elem.add(tag=ELEM_ACTIVITY, text=activity.name)
98
99 elif mode == 'r':
100 self.elem = XMLElement(filename=filename)
101
102 else:
103 raise ValueError('Mode must be "r" or "w"')
104
105 def __del__(self):
106 self.close()
107
108 def close(self):
109 """Write final file to disk."""
110 if self.mode == 'w':
111 self.elem.add(tag=ELEM_EXECUTION_STOP, text=datetime_to_xml(datetime.utcnow()))
112 self.elem.write(chart.alg.settings.RESULT_FILENAME, pretty_print)
113
114 @property
115 def path(self):
116 """Return Path of original XML file, if opened from file."""
117 return self.elem.filename()
118
119 def add_job(self, job, status=None, tables=None):
120 """Add a completed job to output result file.
121
122 Args:
123 job (int): Job ID
124 status (str): Exit status ('completed', 'failed', 'retry', 'timeout')
125 tables (list of dict): Each containing 'table' -> TableInfo,
126 'sensing_start' -> datetime, 'sensing_stop' -> datetime.
127
128 Returns:
129 None
130
131 Raises:
132 None
133 """
134 if status is None:
135 status = job.status
136
137 if status is None:
138 raise ValueError('Status not set for job {jobid}'.format(jobid=job.id))
139
140 job_elem = self.elem.add(tag=ELEM_JOB)
141
142 if job.job_id is not None:
143 job_elem.add(tag=ELEM_ID, text=job.job_id)
144
145 if status == 'COMPLETED':
146 logger.warning('Pass JobStatus.COMPLETED into add_job not a string value')
147 status = JobStatus.COMPLETED
148
149 # Allow status to be either a string or the new enum
150 if status in JobStatus:
151 job_elem.add(tag=ELEM_STATUS, text=status.name)
152
153 else:
154 if status not in JobStatus:
155 raise ValueError('No such status {status} for job {jobid}'.format(
156 status=status, jobid=job.job_id))
157
158 job_elem.add(tag=ELEM_STATUS, text=status)
159
160 if job.primary_output is not None:
161 job_elem.add(tag=ELEM_PRIMARY_OUTPUT, text=job.primary_output)
162
163 for o in job.aux_outputs:
164 job_elem.add(tag=ELEM_AUX_OUTPUT, text=o)
165
166 if tables is not None:
167 def add_table(table):
168 """Insert a table write description from the client parameter `tables`.
169 table should be a ResultTable object but we also accept a dict of:
170
171 - name
172 - sid
173 - sensing_start
174 - sensing_stop
175 """
176 # SID should really be set because some ingestion jobs don't set the SID
177 # field properly in the JOBS table first so this is the only way for ingestion
178 # code to tell the job chaining system the right source ID.
179 if isinstance(table, dict):
180 # Allow clients to pass in a dict
181 table = ResultTable(table=table.get('table'),
182 sensing_start=table.get('sensing_start'),
183 sensing_stop=table.get('sensing_stop'),
184 sid=table.get('sid'))
185
186 if table.table is None:
187 raise ValueError('Jobs result has no table set')
188
189 if table.sid is None:
190 if job.sid is not None:
191 logger.warning('Setting table SID to {sid} from job'.format(sid=job.sid))
192 table.sid = job.sid
193
194 else:
195 raise ValueError('Set the sid attribute of each table when calling add_job')
196
197 # accept a time range of None/None entry without complaint for algs that didn't find
198 # anything to ingest
199 if table.sensing_start is None and table.sensing_stop is None:
200 return
201
202 # make sure the table structure is well formed
203 if table.sensing_start is None:
204 raise ValueError('add_job() called with tables attribute but no start time')
205
206 if table.sensing_stop is None:
207 raise ValueError('add_job() called with tables attribute but no stop time')
208
209 # Not sure why we allow this actually
210 # if table.sensing_start is None or table.sensing_stop is None:
211 # raise ValueError('add_job() must be called with a valid time range')
212 # return
213
214 table_elem = job_elem.add(tag=ELEM_TABLE)
215 table_elem.add(tag=ELEM_NAME, text=name_of_thing(table.table))
216 table_elem.add(tag=ELEM_SENSING_START,
217 text=datetime_to_xml(table.sensing_start, include_us=True))
218 table_elem.add(tag=ELEM_SENSING_STOP,
219 text=datetime_to_xml(table.sensing_stop, include_us=True))
220
221 # if table.sid is not None:
222 table.sid.to_xml(table_elem.elem) # SID XML handling only uses raw
223 # Element objects
224
225 if is_listlike(tables):
226 for table in tables:
227 add_table(table)
228
229 else:
230 add_table(tables)
231
232 def read_jobs(self):
233 """For report files opened in read mode only, read in list of reported jobs."""
234 for job_elem in self.elem.findall(ELEM_JOB):
235 tables = []
236 for table_elem in job_elem.findall(ELEM_TABLE):
237 t = ResultTable(table=TableInfo(table_elem.parse_str(ELEM_NAME)),
238 sensing_start=table_elem.parse_datetime(ELEM_SENSING_START),
239 sensing_stop=table_elem.parse_datetime(ELEM_SENSING_STOP))
240 sid = SID.from_xml(table_elem.elem)
241 if sid is not None:
242 t.sid = sid
243
244 tables.append(t)
245
246 status_str = job_elem.parse_str(ELEM_STATUS, None)
247 job = Job(activity=self.activity,
248 job_id=job_elem.parse_int(ELEM_ID, None),
249 status=JobStatus[status_str] if status_str is not None else None)
250
251 job.tables = tables
252 job.primary_output = job_elem.parse_str(ELEM_PRIMARY_OUTPUT, None)
253 job.aux_outputs = job_elem.parse_strs(ELEM_AUX_OUTPUT)
254 yield job
255
256 @property
257 def execution_start(self):
258 """Read execution start time."""
259 return self.elem.parse_datetime(ELEM_EXECUTION_START)
260
261 @property
262 def execution_stop(self):
263 """Read execution stop time."""
264 return self.elem.parse_datetime(ELEM_EXECUTION_STOP)
265
266 @property
267 def activity(self):
268 """Read our <activity> as an Activity."""
269 # there is an odd case where we have a result file with a null activity - when
270 # the user has written a local report template, generated the report and is uploading
271 # it.
272 # This is a valid op since the report template itself contains all metadata required
273 # by ucm
274 name = self.elem.parse_str(ELEM_ACTIVITY, None)
275 if name is None:
276 return None
277
278 else:
279 return Activity(name)