1#!/usr/bin/env python3
2
3"""Handlers for MU archive format and filename."""
4
5import re
6import logging
7import zipfile
8import functools
9import codecs
10from datetime import datetime, timedelta
11
12import numpy as np
13
14from chart.common.path import Path
15from chart.common.timerange import TimeRange
16from chart.project import SID
17from chart.sids.sid_msg import CFID
18
19np_uint32_be = np.dtype('>u4')
20np_uint16_be = np.dtype('>u2')
21
22# when ingesting fail if any correlated timestamps are more than SAFETY_MARGIN
23# away from the nominal time range of the MU
24SAFETY_MARGIN = timedelta(minutes=5)
25
26# MUTYPE = Enum('MUTYPE', 'DP', 'TM') # and full names
27
28
29class BadMUFile(Exception):
30 """Format error in MU archive."""
31
32 def __init__(self, msg=None, path=None):
33 super(BadMUFile, self).__init__(msg)
34 self.path = path
35
36
37class MultipleSCID(BadMUFile):
38 """Product contains data from multiple satellites."""
39 pass
40
41
42class BadMUContent(BadMUFile):
43 """Attempt to open something as an MU Content which is not recognised."""
44 pass
45
46
47class NotMUFilename(BadMUFile):
48 """Attempt for open a file whose name does not match the MU protocol."""
49
50 def __str__(self):
51 return '{pth} is not an MU filename'.format(pth=self.path)
52
53
54class NotAZipFile(BadMUFile):
55 """Attempt to open an MU which cannot be opened by the zipfile library."""
56
57 def __str__(self):
58 return 'not a Zip file'
59
60
61class UnsafeTimerange(BadMUFile):
62 """MU contains data outside the value returned by the MU.safe_timerange() function."""
63
64 pass
65
66
67class MUReader:
68 """Read MU products."""
69
70 def __init__(self, path, cfid, handle=None):
71 """Open an MU either as a local file or from a memory buffer.
72
73 For netsat.py we must allow this to be created with a non-existent filename,
74 then have handle assigned later on.
75
76 Args:
77 - `path` (Path): Either a local filename to open, or a record of the original name of
78 `handle`
79 - `handle` (file): File handle
80 """
81 # mu ingester passes us a list of args
82 if not isinstance(path, Path):
83 path = Path(path)
84
85 assert cfid is not None and cfid in CFID
86
87 self.handle = handle
88 self.path = path
89
90 matcher = re.compile(r'(?P<scid>[A-Z0-9]{4})'
91 r'(?P<dtype>[A-Z]{2})'
92 r'(?P<year>\d{4})'
93 r'(?P<doy>\d{3})'
94 r'_'
95 r'(?P<hour>\d{2}).(zip|ZIP)')
96 match = matcher.match(path.name)
97
98 if match is None:
99 raise NotMUFilename(path.name)
100
101 self.sid = SID(match.group('scid'), cfid=cfid, ogsid=None)
102 self.dtype = match.group('dtype')
103 self.year = match.group('year')
104 self.doy = match.group('doy')
105 self.hour = match.group('hour')
106 self.sensing_start = datetime(int(self.year), 1, 1, int(self.hour)) +\
107 timedelta(days=int(self.doy) - 1)
108
109 # cache the zip object
110 self._archive = None
111
112 # bad and wrong - file may not exist locally
113 # if path.size() == 0:
114 # raise IOError('Zero length file')
115
116 def timerange(self):
117 """Extract the nominal time range for this MU.
118 This is used by the --replace option to know what time range to blank,
119 and to compute the safe timerange to sanity check obtutc correlations.
120 This is not accurate as there is a crossover period between 2008-03-27
121 and 2008-04-14 where both 1- and 4-hour MUs were generated. As assume the
122 newer format is always used where available.
123 """
124 # matcher = re.compile(r'# Parameter (?P<key>\d): (?P<value>\d+)$')
125 # if self.dtype == 'TM':
126 # params = []
127 # for line in self.archive.read('0$nUlL$3$25$2044$1.txt').split('\n'):
128 # # Look for 3 lines like:
129 # # Parameter 1: 3
130 # # Parameter 2: 1207562400
131 # # Parameter 3: 1207566000
132 # # we use these unix times to get the product duration
133 # line = line.strip()
134 # match = matcher.match(line)
135 # if match is not None:
136 # params.append((int(match.group('key')), int(match.group('value'))))
137 # if len(params) == 3:
138 # break
139
140 # if params[0][0] == 1 and params[1][0] == 2 and params[2][0] == 3:
141 # # duration = int(params[2][1]) - int(params[1][1])
142 # # print('duration ' + str(duration))
143 # return TimeRange(datetime.utcfromtimestamp(params[1][1]),
144 # datetime.utcfromtimestamp(params[2][1]))
145
146 # crossovers:
147 # first msg1 1hour MSG1DP2008087_14.ZIP
148 # last msg1 4hour MSG1DP2008105_09.zip
149 # first msg2 1hour MSG2DP2008087_14.ZIP
150 # last msg2 4hour MSG2DP2008105_09.zip
151
152 if self.sensing_start >= datetime(2008, 3, 27, 14):
153 # new MUs are always 1 hour
154 duration = timedelta(hours=1)
155
156 else:
157 # old MUs are always 4 hours
158 duration = timedelta(hours=5)
159
160 return TimeRange(
161 self.sensing_start,
162 self.sensing_start + duration)
163
164 def safe_timerange(self, safety_timedelta=None):
165 """Compute the timerange for which the MU can realistically contain data.
166 If after applying OBT/UTC correlation we end up with a result outside this range we
167 can reject the data or the MU."""
168 res = self.timerange()
169
170 if safety_timedelta is None:
171 res.start -= SAFETY_MARGIN
172 res.stop += SAFETY_MARGIN
173
174 else:
175 res.start -= safety_timedelta
176 res.stop += safety_timedelta
177
178 return res
179
180 def replace_dtype(self, dtype):
181 """Alter the datatype of an MUFilename. Returns the new name."""
182 return Path(self.path).parent.child(self.path.name[:4] + dtype + self.path.name[6:])
183
184 @property
185 def archive(self):
186 """Return a zipfile object of us."""
187 if self._archive is None:
188 try:
189 if self.handle is None:
190 if not self.path.exists():
191 raise IOError('Input file {path} does not exist'.format(path=self.path))
192
193 self._archive = zipfile.ZipFile(str(self.path), 'r')
194
195 else:
196 self._archive = zipfile.ZipFile(self.handle)
197
198 except zipfile.BadZipfile:
199 raise NotAZipFile(self.path)
200
201 return self._archive
202
203 def gen_contents(self,
204 apid=None,
205 service_type=None,
206 subservice_type=None,
207 structure_id=None,
208 field=None):
209 """Extract the individual files from the MU product.
210 Sample content filenames:
211
212 Derived Parameter MU:
213 M5603Z$nUlL.txt (field=M5603Z, sin=nUlL)
214 TTAG15$nUlL.txt (field=TTAG15, sin=nUlL)
215
216 Telemetry MU:
217 0$nUlL$3$25$2044$19.txt (sin=nUlL, service_type=3, subservice_type=25, apid=2044,
218 structure_id=19)
219
220 Args:
221 `field` (list of str):
222 `apid` (int):
223 `service_type` (int):
224 `subservice_type` (int):
225 `structure_id` (int):
226
227 Returns:
228 Yields a series of MUContentReader objects.
229 """
230 # logging.debug('gen_contents scan')
231 for name in self.archive.namelist():
232 # path = Path(name)
233 # try:
234 # meta = meta_mu_content(path)
235 # except NotMUContentFilename:
236 # logging.debug(' unmatching ' + path)
237 # continue
238
239 # logging.debug('matched ' + path)
240
241 # logging.debug('apid {a} {b}'.format(a=apid, b=mu_path.apid))
242 # logging.debug('service type {a} {b}'.format(a=service_type, b=mu_path.service_type))
243 # logging.debug('subservice type {a} {b}'.format(a=subservice_type,
244 # b=mu_path.subservice_type))
245 # logging.debug('structure id type {a} {b}'.format(a=structure_id,
246 # b=mu_path.structure_id))
247 # logging.debug('field type {a} {b}'.format(a=field, b=mu_path.field))
248
249 content = MUContentReader(Path(name),
250 functools.partial(self.archive.read, name),
251 mu=self)
252
253 if (apid is None or apid == content.apid) and \
254 (service_type is None or service_type == content.service_type) and \
255 (subservice_type is None or subservice_type == content.subservice_type) and \
256 (structure_id is None or structure_id == content.structure_id) and \
257 (field is None or content.field in field):
258 yield content
259
260 # if (apid is None or apid == mu_path.apid) and \
261 # (service_type is None or service_type == mu_path.service_type) and \
262 # (subservice_type is None or subservice_type == mu_path.subservice_type) and \
263 # (structure_id is None or structure_id == mu_path.structure_id) and \
264 # (field is None or mu_path.field in field):
265 # logging.debug(' yielding content ' + mu_path)
266 # yield MUContentReader(mu_path, self.zip.read(name), parent=self)
267
268 # else:
269 # logging.debug(' not yielding ' + mu_path + ' field ' + mu_path.field)
270
271
272class MUContentReader:
273 """Extract packet payloads from a text file taken from a MU archive."""
274
275 # Suppress immediate logging of various warnings:
276 # - non-zero 'checks failed'
277 # - data field header flag equals zero
278 quiet = False
279
280 def __init__(self, path, source, mu=None):
281 """Args:
282 - `path` (str): Name of the file (sans dir)
283 - `source` (object): Callable, string or handle to get the data from
284 - `mu` (MUReader): Product this came from
285 """
286 self.path = path
287 self.source = source
288 self.sid = None
289 self.mu = mu
290 # keep a count of how many packets had non-zero checks failed values
291 self.checks_failed = None
292 # count how many data field header flag 0 errors we found
293 self.dfhfz = None
294
295 parts = path.stem.split('$')
296
297 if len(parts) == 6:
298 # 0$nUlL$3$25$2044$12.txt
299 # path.field =
300 # _, path.sin, path.service_type, path.subservice_type, path.apid, path.structure_id = \
301 # map(int, parts)
302 self.sin = parts[1]
303 self.service_type = int(parts[2])
304 self.subservice_type = int(parts[3])
305 self.apid = int(parts[4])
306 self.structure_id = int(parts[5])
307 self.field = None
308
309 elif len(parts) in (2, 3):
310 self.field = parts[0]
311 self.sin = None
312 self.service_type = None
313 self.subservice_type = None
314 self.apid = None
315 self.structure_id = None
316
317 else:
318 raise BadMUContent('Cannot decode name ' + path)
319
320 def __str__(self):
321 if self.sin is not None:
322 return 'MUContent({sin},{stype},{sstype},{apid},{structid})'.format(
323 sin=self.sin, stype=self.service_type, sstype=self.subservice_type, apid=self.apid,
324 structid=self.strucuture_id)
325
326 else:
327 return 'MUContent({field})'.format(field=self.field)
328
329 def gen_lines(self):
330 """Yield non-comment non-empty lines."""
331
332 # for line in self.buff.split('\n'):
333 if callable(self.source):
334 self.source = self.source().decode('utf-8')
335
336 for line in self.source.splitlines(): # ('\n'):
337 line = line.strip()
338 if len(line) == 0 or line[0] == '#':
339 continue
340
341 yield line
342
343 def gen_packets(self):
344 """Yield dictionaries containing all the interpreted bits of each packet."""
345 # Validates that all packets have same SCID
346 # probably belongs in tm.py
347 # gen_lines strips out the comments in advance
348 # print('Gen packets called on ' + self.path)
349 self.checks_failed = 0
350 self.dfhfz = 0
351 for line in self.gen_lines():
352 split = line.split()
353
354 if len(split) != 10:
355 logging.warning('Skipping line {line}'.format(
356 line=line))
357 continue
358
359 scid = split[2]
360 if scid == '****':
361 raise BadMUFile('Found illegal scid {scid}'.format(scid=scid))
362
363 sid = SID(scid, cfid=self.mu.sid.cfid)
364
365 stream_id = split[3]
366 contents = split[6]
367 size = split[7]
368 checks_failed = split[8]
369 checks_performed = split[9]
370
371 # gen_time = datetime.utcfromtimestamp(
372 # int(gen_time_s) + int(gen_time_ns) / 1000000000)
373
374 # reception_time = datetime.utcfromtimestamp(
375 # int(reception_time_s) + int(reception_time_ns) / 1000000000)
376
377 if len(contents) % 2 != 0:
378 logging.warning('Found contents with odd length {len}'.format(
379 len=len(contents) % 2))
380 continue
381
382 if len(contents) != int(size) * 2:
383 logging.warning('Found contents with actual length of '
384 '{act} but expected length {exp}'.format(
385 act=len(contents), exp=size))
386 continue
387
388 if self.sid is None:
389 self.sid = sid
390
391 else:
392 if self.sid.scid != sid.scid:
393 raise MultipleSCID(
394 'Found multiple SCIDs ({scid1}, {scid2})'.format(
395 scid1=sid.scid, scid2=self.sid.scid))
396
397 if stream_id == 'AU':
398 # RADAUX packets use a 38 byte header (so not pos 84)
399 # payload = np.fromstring(contents[48:].decode('hex'), dtype=np.uint8)
400 payload = np.fromstring(codecs.decode(contents[48:], 'hex'), dtype=np.uint8)
401 # payload = [ord(b) for b in binascii.unhexlify(contents[48:])]
402 # payload len 210
403 # print(len(payload))
404
405 # This appear to be another timestamp in the RADAUX stream:
406 # obt_coarse = int(payload[204])
407 # obt_coarse = int(obt_coarse) * 256
408 # obt_coarse += payload[205]
409 # obt_coarse = int(obt_coarse) * 256
410 # obt_coarse += payload[206]
411 # obt_coarse = int(obt_coarse) * 256
412 # obt_coarse += payload[207]
413 # obt_fine = payload[208]
414 # obt_fine = int(obt_fine) * 256
415 # obt_fine += payload[209]
416 # obt2 = obt_coarse + (obt_fine / 65536)
417
418 # This is an OBT time relative to packet start
419 obt_coarse = int(payload[8])
420 obt_coarse = int(obt_coarse) * 256
421 obt_coarse += payload[9]
422 obt_coarse = int(obt_coarse) * 256
423 obt_coarse += payload[10]
424 obt_coarse = int(obt_coarse) * 256
425 obt_coarse += payload[11]
426 obt_fine = payload[12]
427 obt_fine = int(obt_fine) * 256
428 obt_fine += payload[13]
429 obt = obt_coarse + (obt_fine / 65536)
430
431 # Ticks is what PARDET uses for timestamping.
432 # Conversion function is unknown.
433 # ticks = int(payload[6])
434 # ticks = (ticks*256) + int(payload[7])
435 # ticks = (ticks*256) + int(payload[8])
436 # ticks = (ticks*256) + int(payload[9])
437 # ticks = (ticks*256) + int(payload[10])
438 # ticks = (ticks*256) + int(payload[11])
439
440 # p = contents[48:]
441 # print('buff ' + p[12:12+12] + ' ticks ' + str(ticks) + ' obt ' + str(obt))
442 # raise SystemExit
443 # print('obt ' + p[408:408+16] + ' ' + str(obt) + ' ticks ' + str(ticks))
444 # print(p)
445
446 # print('obt ' + p[12:12+12] + ' ' + str(obt_coarse) + ' / ' + str(obt_fine))
447
448 # logging.debug('S1272KLB is ' + str(((int(payload[15]) + int((payload[14]<<8))))))
449 yield {'stream_id': stream_id,
450 'payload': payload,
451 'rel_time': obt,
452 }
453 # 'obt2': obt}
454 continue
455
456 # payload = [ord(b) for b in binascii.unhexlify(contents[76:])]
457 # payload = np.fromiter(
458 # (int(contents[i:i + 2], 16) for i in xrange(84, len(contents), 2)), np.uint8)
459
460 else:
461 # other packets 42 bytes
462 # payload = np.fromiter(
463 # (int(contents[i:i + 2], 16) for i in xrange(84, len(contents), 2)), np.uint8)
464 #payload = np.fromstring(contents[84:].decode('hex'), dtype=np.uint8)
465 payload = np.fromstring(codecs.decode(contents[84:], 'hex'), dtype=np.uint8)
466
467 if len(payload) < 8:
468 # logging.info('Abandoning MU content {path} as size is only {size}'.format(
469 # path=self.path, size=len(payload)))
470 # break
471 raise BadMUContent('Abandoning MU content {path} as size is only {size}'.format(
472 path=self.path, size=len(payload)))
473
474 # from payload we can pull:
475 # sequence count
476 # scid
477 # service
478 # subservice
479 # type
480 # length
481 # validate every bit
482 # obt time
483 # reported errors
484
485 version_number = payload[0] >> 5
486 ccsds_type = (payload[0] >> 4) & 1
487 dfhf = (payload[0] >> 3) & 1 # Data Field Header Flag
488 apid = ((payload[0] & 7) << 8) | payload[1]
489 segmentation_flags = payload[2] >> 6
490 sequence_count = ((payload[2] & 63) << 8) + payload[3]
491 packet_length = (payload[4] << 8) | payload[5]
492 spare = payload[6] >> 7
493 checksum = (payload[6] >> 6) & 1
494 service_type = payload[6] & 63
495 subservice_type = payload[7]
496 # obt = np.uint64()
497 # obt = obt | (np.uint64(payload[8]) << 40)
498 # | (payload[9] << 32) | (payload[10] << 24) |\
499 # (payload[11] << 16) | (payload[12]) << 8 | payload[13]
500
501 # OBT coarse gives time in seconds
502 # obt_coarse = int(payload[8])
503 # obt_coarse = int(obt_coarse) * 256
504 # obt_coarse += payload[9]
505 # obt_coarse = int(obt_coarse) * 256
506 # obt_coarse += payload[10]
507 # obt_coarse = int(obt_coarse) * 256
508 # obt_coarse += payload[11]
509
510 # OBT fine has units of 1/65536s
511 # obt_fine = payload[12]
512 # obt_fine = int(obt_fine) * 256
513 # obt_fine += payload[13]
514
515 # obt = obt_coarse + (obt_fine / 65536)
516
517 # print('obt coarse {coarse} fine {fine} combined {comb}'.format(
518 # coarse=obt_coarse, fine=obt_fine, comb=obt))
519
520 # obt_coarse_2 = payload[8:12]
521 # obt_coarse_2.dtype = np_uint32_be
522
523 # logging.debug(str(obt_coarse) + ' ' + str(obt_coarse_2))
524
525 if len(payload) < 14:
526 raise BadMUContent('Found line less than 14 chars in ' + self.path)
527
528 obt_coarse = payload[8:12].view(np_uint32_be)[0]
529 obt_fine = payload[12:14].view(np_uint16_be)[0]
530 obt = float(obt_coarse + (obt_fine / 0x10000))
531 # logging.debug(str(obt) + ' ' + str(type(obt)))
532
533 if version_number != 4:
534 if not MUContentReader.quiet:
535 logging.warning('Found packet with version number {val}'.format(
536 val=version_number))
537
538 if int(checks_failed, 16) != 0:
539 # a non-zero 'checks failed' means we discard this packet
540 if not MUContentReader.quiet:
541 logging.warning('Found packet with checks failed {val}'.format(
542 val=checks_failed))
543
544 self.checks_failed += 1
545 continue
546
547 if dfhf != 1:
548 # I don't know how serious this is. It seems to happen more in older
549 # data. We just throw a warning
550 if not MUContentReader.quiet:
551 logging.warning('Found packet with data field header flag {val}'.format(
552 val=dfhf))
553
554 self.dfhfz += 1
555
556 if segmentation_flags != 3:
557 if not MUContentReader.quiet:
558 logging.warning('Found packet with segmentation flags {val}'.format(
559 val=segmentation_flags))
560
561 if spare != 0:
562 if not MUContentReader.quiet:
563 logging.warning('Found packet with spare {val}'.format(
564 val=spare))
565
566 if checksum != 0:
567 if not MUContentReader.quiet:
568 logging.warning('Found packet with checksum {val}'.format(
569 val=checksum))
570
571 yield {
572 # 'gen_time': gen_time,
573 # 'reception_time': reception_time,
574 'payload': payload,
575 # 'version_number': version_number,
576 'type': ccsds_type,
577 # 'dfhf': dfhf,
578 'apid': apid,
579 'sid': sid,
580 'stream_id': stream_id,
581 # 'checks_failed': checks_failed,
582 'checks_performed': checks_performed,
583 # 'segmentation_flags': segmentation_flags,
584 'sequence_count': sequence_count,
585 'packet_length': packet_length,
586 # 'spare': spare,
587 # 'checksum': checksum,
588 'service_type': service_type,
589 'subservice_type': subservice_type,
590 'obt': obt,
591 }