1#!/usr/bin/env python3
  2
  3"""Handlers for MU archive format and filename."""
  4
  5import re
  6import logging
  7import zipfile
  8import functools
  9import codecs
 10from datetime import datetime, timedelta
 11
 12import numpy as np
 13
 14from chart.common.path import Path
 15from chart.common.timerange import TimeRange
 16from chart.project import SID
 17from chart.sids.sid_msg import CFID
 18
 19np_uint32_be = np.dtype('>u4')
 20np_uint16_be = np.dtype('>u2')
 21
 22# when ingesting fail if any correlated timestamps are more than SAFETY_MARGIN
 23# away from the nominal time range of the MU
 24SAFETY_MARGIN = timedelta(minutes=5)
 25
 26# MUTYPE = Enum('MUTYPE', 'DP', 'TM')  # and full names
 27
 28
 29class BadMUFile(Exception):
 30    """Format error in MU archive."""
 31
 32    def __init__(self, msg=None, path=None):
 33        super(BadMUFile, self).__init__(msg)
 34        self.path = path
 35
 36
 37class MultipleSCID(BadMUFile):
 38    """Product contains data from multiple satellites."""
 39    pass
 40
 41
 42class BadMUContent(BadMUFile):
 43    """Attempt to open something as an MU Content which is not recognised."""
 44    pass
 45
 46
 47class NotMUFilename(BadMUFile):
 48    """Attempt for open a file whose name does not match the MU protocol."""
 49
 50    def __str__(self):
 51        return '{pth} is not an MU filename'.format(pth=self.path)
 52
 53
 54class NotAZipFile(BadMUFile):
 55    """Attempt to open an MU which cannot be opened by the zipfile library."""
 56
 57    def __str__(self):
 58        return 'not a Zip file'
 59
 60
 61class UnsafeTimerange(BadMUFile):
 62    """MU contains data outside the value returned by the MU.safe_timerange() function."""
 63
 64    pass
 65
 66
 67class MUReader:
 68    """Read MU products."""
 69
 70    def __init__(self, path, cfid, handle=None):
 71        """Open an MU either as a local file or from a memory buffer.
 72
 73        For netsat.py we must allow this to be created with a non-existent filename,
 74        then have handle assigned later on.
 75
 76        Args:
 77        - `path` (Path): Either a local filename to open, or a record of the original name of
 78            `handle`
 79        - `handle` (file): File handle
 80        """
 81        # mu ingester passes us a list of args
 82        if not isinstance(path, Path):
 83            path = Path(path)
 84
 85        assert cfid is not None and cfid in CFID
 86
 87        self.handle = handle
 88        self.path = path
 89
 90        matcher = re.compile(r'(?P<scid>[A-Z0-9]{4})'
 91                             r'(?P<dtype>[A-Z]{2})'
 92                             r'(?P<year>\d{4})'
 93                             r'(?P<doy>\d{3})'
 94                             r'_'
 95                             r'(?P<hour>\d{2}).(zip|ZIP)')
 96        match = matcher.match(path.name)
 97
 98        if match is None:
 99            raise NotMUFilename(path.name)
100
101        self.sid = SID(match.group('scid'), cfid=cfid, ogsid=None)
102        self.dtype = match.group('dtype')
103        self.year = match.group('year')
104        self.doy = match.group('doy')
105        self.hour = match.group('hour')
106        self.sensing_start = datetime(int(self.year), 1, 1, int(self.hour)) +\
107            timedelta(days=int(self.doy) - 1)
108
109        # cache the zip object
110        self._archive = None
111
112        # bad and wrong - file may not exist locally
113        # if path.size() == 0:
114            # raise IOError('Zero length file')
115
116    def timerange(self):
117        """Extract the nominal time range for this MU.
118        This is used by the --replace option to know what time range to blank,
119        and to compute the safe timerange to sanity check obtutc correlations.
120        This is not accurate as there is a crossover period between 2008-03-27
121        and 2008-04-14 where both 1- and 4-hour MUs were generated. As assume the
122        newer format is always used where available.
123        """
124        # matcher = re.compile(r'# Parameter (?P<key>\d): (?P<value>\d+)$')
125        # if self.dtype == 'TM':
126        #     params = []
127        #     for line in self.archive.read('0$nUlL$3$25$2044$1.txt').split('\n'):
128        #         # Look for 3 lines like:
129        #         # Parameter 1: 3
130        #         # Parameter 2: 1207562400
131        #         # Parameter 3: 1207566000
132        #         # we use these unix times to get the product duration
133        #         line = line.strip()
134        #         match = matcher.match(line)
135        #         if match is not None:
136        #             params.append((int(match.group('key')), int(match.group('value'))))
137        #             if len(params) == 3:
138        #                 break
139
140        #     if params[0][0] == 1 and params[1][0] == 2 and params[2][0] == 3:
141        #         # duration = int(params[2][1]) - int(params[1][1])
142        #         # print('duration ' + str(duration))
143        #         return TimeRange(datetime.utcfromtimestamp(params[1][1]),
144        #                          datetime.utcfromtimestamp(params[2][1]))
145
146        # crossovers:
147        # first msg1 1hour MSG1DP2008087_14.ZIP
148        # last msg1 4hour MSG1DP2008105_09.zip
149        # first msg2 1hour MSG2DP2008087_14.ZIP
150        # last msg2 4hour MSG2DP2008105_09.zip
151
152        if self.sensing_start >= datetime(2008, 3, 27, 14):
153            # new MUs are always 1 hour
154            duration = timedelta(hours=1)
155
156        else:
157            # old MUs are always 4 hours
158            duration = timedelta(hours=5)
159
160        return TimeRange(
161            self.sensing_start,
162            self.sensing_start + duration)
163
164    def safe_timerange(self, safety_timedelta=None):
165        """Compute the timerange for which the MU can realistically contain data.
166        If after applying OBT/UTC correlation we end up with a result outside this range we
167        can reject the data or the MU."""
168        res = self.timerange()
169
170        if safety_timedelta is None:
171            res.start -= SAFETY_MARGIN
172            res.stop += SAFETY_MARGIN
173
174        else:
175            res.start -= safety_timedelta
176            res.stop += safety_timedelta
177
178        return res
179
180    def replace_dtype(self, dtype):
181        """Alter the datatype of an MUFilename. Returns the new name."""
182        return Path(self.path).parent.child(self.path.name[:4] + dtype + self.path.name[6:])
183
184    @property
185    def archive(self):
186        """Return a zipfile object of us."""
187        if self._archive is None:
188            try:
189                if self.handle is None:
190                    if not self.path.exists():
191                        raise IOError('Input file {path} does not exist'.format(path=self.path))
192
193                    self._archive = zipfile.ZipFile(str(self.path), 'r')
194
195                else:
196                    self._archive = zipfile.ZipFile(self.handle)
197
198            except zipfile.BadZipfile:
199                raise NotAZipFile(self.path)
200
201        return self._archive
202
203    def gen_contents(self,
204                     apid=None,
205                     service_type=None,
206                     subservice_type=None,
207                     structure_id=None,
208                     field=None):
209        """Extract the individual files from the MU product.
210        Sample content filenames:
211
212        Derived Parameter MU:
213         M5603Z$nUlL.txt (field=M5603Z, sin=nUlL)
214         TTAG15$nUlL.txt (field=TTAG15, sin=nUlL)
215
216        Telemetry MU:
217         0$nUlL$3$25$2044$19.txt (sin=nUlL, service_type=3, subservice_type=25, apid=2044,
218                                   structure_id=19)
219
220        Args:
221            `field` (list of str):
222            `apid` (int):
223            `service_type` (int):
224            `subservice_type` (int):
225            `structure_id` (int):
226
227        Returns:
228            Yields a series of MUContentReader objects.
229        """
230        # logging.debug('gen_contents scan')
231        for name in self.archive.namelist():
232            # path = Path(name)
233            # try:
234                # meta = meta_mu_content(path)
235            # except NotMUContentFilename:
236                # logging.debug('    unmatching ' + path)
237                # continue
238
239            # logging.debug('matched ' + path)
240
241            # logging.debug('apid {a} {b}'.format(a=apid, b=mu_path.apid))
242            # logging.debug('service type {a} {b}'.format(a=service_type, b=mu_path.service_type))
243            # logging.debug('subservice type {a} {b}'.format(a=subservice_type,
244            # b=mu_path.subservice_type))
245            # logging.debug('structure id type {a} {b}'.format(a=structure_id,
246            # b=mu_path.structure_id))
247            # logging.debug('field type {a} {b}'.format(a=field, b=mu_path.field))
248
249            content = MUContentReader(Path(name),
250                                      functools.partial(self.archive.read, name),
251                                      mu=self)
252
253            if (apid is None or apid == content.apid) and \
254                    (service_type is None or service_type == content.service_type) and \
255                    (subservice_type is None or subservice_type == content.subservice_type) and \
256                    (structure_id is None or structure_id == content.structure_id) and \
257                    (field is None or content.field in field):
258                yield content
259
260            # if (apid is None or apid == mu_path.apid) and \
261                    # (service_type is None or service_type == mu_path.service_type) and \
262                    # (subservice_type is None or subservice_type == mu_path.subservice_type) and \
263                    # (structure_id is None or structure_id == mu_path.structure_id) and \
264                    # (field is None or mu_path.field in field):
265                # logging.debug('    yielding content ' + mu_path)
266                # yield MUContentReader(mu_path, self.zip.read(name), parent=self)
267
268            # else:
269                # logging.debug('    not yielding ' + mu_path + ' field ' + mu_path.field)
270
271
272class MUContentReader:
273    """Extract packet payloads from a text file taken from a MU archive."""
274
275    # Suppress immediate logging of various warnings:
276    # - non-zero 'checks failed'
277    # - data field header flag equals zero
278    quiet = False
279
280    def __init__(self, path, source, mu=None):
281        """Args:
282        - `path` (str): Name of the file (sans dir)
283        - `source` (object): Callable, string or handle to get the data from
284        - `mu` (MUReader): Product this came from
285        """
286        self.path = path
287        self.source = source
288        self.sid = None
289        self.mu = mu
290        # keep a count of how many packets had non-zero checks failed values
291        self.checks_failed = None
292        # count how many data field header flag 0 errors we found
293        self.dfhfz = None
294
295        parts = path.stem.split('$')
296
297        if len(parts) == 6:
298            # 0$nUlL$3$25$2044$12.txt
299            # path.field =
300            # _, path.sin, path.service_type, path.subservice_type, path.apid, path.structure_id = \
301                # map(int, parts)
302            self.sin = parts[1]
303            self.service_type = int(parts[2])
304            self.subservice_type = int(parts[3])
305            self.apid = int(parts[4])
306            self.structure_id = int(parts[5])
307            self.field = None
308
309        elif len(parts) in (2, 3):
310            self.field = parts[0]
311            self.sin = None
312            self.service_type = None
313            self.subservice_type = None
314            self.apid = None
315            self.structure_id = None
316
317        else:
318            raise BadMUContent('Cannot decode name ' + path)
319
320    def __str__(self):
321        if self.sin is not None:
322            return 'MUContent({sin},{stype},{sstype},{apid},{structid})'.format(
323                sin=self.sin, stype=self.service_type, sstype=self.subservice_type, apid=self.apid,
324                structid=self.strucuture_id)
325
326        else:
327            return 'MUContent({field})'.format(field=self.field)
328
329    def gen_lines(self):
330        """Yield non-comment non-empty lines."""
331
332        # for line in self.buff.split('\n'):
333        if callable(self.source):
334            self.source = self.source().decode('utf-8')
335
336        for line in self.source.splitlines():  # ('\n'):
337            line = line.strip()
338            if len(line) == 0 or line[0] == '#':
339                continue
340
341            yield line
342
343    def gen_packets(self):
344        """Yield dictionaries containing all the interpreted bits of each packet."""
345        # Validates that all packets have same SCID
346        # probably belongs in tm.py
347        # gen_lines strips out the comments in advance
348        # print('Gen packets called on ' + self.path)
349        self.checks_failed = 0
350        self.dfhfz = 0
351        for line in self.gen_lines():
352            split = line.split()
353
354            if len(split) != 10:
355                logging.warning('Skipping line {line}'.format(
356                        line=line))
357                continue
358
359            scid = split[2]
360            if scid == '****':
361                raise BadMUFile('Found illegal scid {scid}'.format(scid=scid))
362
363            sid = SID(scid, cfid=self.mu.sid.cfid)
364
365            stream_id = split[3]
366            contents = split[6]
367            size = split[7]
368            checks_failed = split[8]
369            checks_performed = split[9]
370
371            # gen_time = datetime.utcfromtimestamp(
372                # int(gen_time_s) + int(gen_time_ns) / 1000000000)
373
374            # reception_time = datetime.utcfromtimestamp(
375                # int(reception_time_s) + int(reception_time_ns) / 1000000000)
376
377            if len(contents) % 2 != 0:
378                logging.warning('Found contents with odd length {len}'.format(
379                        len=len(contents) % 2))
380                continue
381
382            if len(contents) != int(size) * 2:
383                logging.warning('Found contents with actual length of '
384                                '{act} but expected length {exp}'.format(
385                        act=len(contents), exp=size))
386                continue
387
388            if self.sid is None:
389                self.sid = sid
390
391            else:
392                if self.sid.scid != sid.scid:
393                    raise MultipleSCID(
394                        'Found multiple SCIDs ({scid1}, {scid2})'.format(
395                            scid1=sid.scid, scid2=self.sid.scid))
396
397            if stream_id == 'AU':
398                # RADAUX packets use a 38 byte header (so not pos 84)
399                # payload = np.fromstring(contents[48:].decode('hex'), dtype=np.uint8)
400                payload = np.fromstring(codecs.decode(contents[48:], 'hex'), dtype=np.uint8)
401                # payload = [ord(b) for b in binascii.unhexlify(contents[48:])]
402                # payload len 210
403                # print(len(payload))
404
405                # This appear to be another timestamp in the RADAUX stream:
406                # obt_coarse = int(payload[204])
407                # obt_coarse = int(obt_coarse) * 256
408                # obt_coarse += payload[205]
409                # obt_coarse = int(obt_coarse) * 256
410                # obt_coarse += payload[206]
411                # obt_coarse = int(obt_coarse) * 256
412                # obt_coarse += payload[207]
413                # obt_fine = payload[208]
414                # obt_fine = int(obt_fine) * 256
415                # obt_fine += payload[209]
416                # obt2 = obt_coarse + (obt_fine / 65536)
417
418                # This is an OBT time relative to packet start
419                obt_coarse = int(payload[8])
420                obt_coarse = int(obt_coarse) * 256
421                obt_coarse += payload[9]
422                obt_coarse = int(obt_coarse) * 256
423                obt_coarse += payload[10]
424                obt_coarse = int(obt_coarse) * 256
425                obt_coarse += payload[11]
426                obt_fine = payload[12]
427                obt_fine = int(obt_fine) * 256
428                obt_fine += payload[13]
429                obt = obt_coarse + (obt_fine / 65536)
430
431                # Ticks is what PARDET uses for timestamping.
432                # Conversion function is unknown.
433                # ticks = int(payload[6])
434                # ticks = (ticks*256) + int(payload[7])
435                # ticks = (ticks*256) + int(payload[8])
436                # ticks = (ticks*256) + int(payload[9])
437                # ticks = (ticks*256) + int(payload[10])
438                # ticks = (ticks*256) + int(payload[11])
439
440                # p = contents[48:]
441                # print('buff ' + p[12:12+12] + ' ticks ' + str(ticks) + ' obt ' + str(obt))
442                # raise SystemExit
443                # print('obt ' + p[408:408+16] + ' ' + str(obt) + ' ticks ' + str(ticks))
444                # print(p)
445
446                # print('obt ' + p[12:12+12] + ' ' + str(obt_coarse) + ' / ' + str(obt_fine))
447
448                # logging.debug('S1272KLB is ' + str(((int(payload[15]) + int((payload[14]<<8))))))
449                yield {'stream_id': stream_id,
450                       'payload': payload,
451                       'rel_time': obt,
452                       }
453                       # 'obt2': obt}
454                continue
455
456                # payload = [ord(b) for b in binascii.unhexlify(contents[76:])]
457                # payload = np.fromiter(
458                    # (int(contents[i:i + 2], 16) for i in xrange(84, len(contents), 2)), np.uint8)
459
460            else:
461                # other packets 42 bytes
462                # payload = np.fromiter(
463                    # (int(contents[i:i + 2], 16) for i in xrange(84, len(contents), 2)), np.uint8)
464                #payload = np.fromstring(contents[84:].decode('hex'), dtype=np.uint8)
465                payload = np.fromstring(codecs.decode(contents[84:], 'hex'), dtype=np.uint8)
466
467            if len(payload) < 8:
468                # logging.info('Abandoning MU content {path} as size is only {size}'.format(
469                    # path=self.path, size=len(payload)))
470                # break
471                raise BadMUContent('Abandoning MU content {path} as size is only {size}'.format(
472                    path=self.path, size=len(payload)))
473
474            # from payload we can pull:
475            # sequence count
476            # scid
477            # service
478            # subservice
479            # type
480            # length
481            # validate every bit
482            # obt time
483            # reported errors
484
485            version_number = payload[0] >> 5
486            ccsds_type = (payload[0] >> 4) & 1
487            dfhf = (payload[0] >> 3) & 1  # Data Field Header Flag
488            apid = ((payload[0] & 7) << 8) | payload[1]
489            segmentation_flags = payload[2] >> 6
490            sequence_count = ((payload[2] & 63) << 8) + payload[3]
491            packet_length = (payload[4] << 8) | payload[5]
492            spare = payload[6] >> 7
493            checksum = (payload[6] >> 6) & 1
494            service_type = payload[6] & 63
495            subservice_type = payload[7]
496            # obt = np.uint64()
497            # obt = obt | (np.uint64(payload[8]) << 40)
498            # | (payload[9] << 32) | (payload[10] << 24) |\
499                # (payload[11] << 16) | (payload[12]) << 8 | payload[13]
500
501            # OBT coarse gives time in seconds
502            # obt_coarse = int(payload[8])
503            # obt_coarse = int(obt_coarse) * 256
504            # obt_coarse += payload[9]
505            # obt_coarse = int(obt_coarse) * 256
506            # obt_coarse += payload[10]
507            # obt_coarse = int(obt_coarse) * 256
508            # obt_coarse += payload[11]
509
510            # OBT fine has units of 1/65536s
511            # obt_fine = payload[12]
512            # obt_fine = int(obt_fine) * 256
513            # obt_fine += payload[13]
514
515            # obt = obt_coarse + (obt_fine / 65536)
516
517            # print('obt coarse {coarse} fine {fine} combined {comb}'.format(
518                    # coarse=obt_coarse, fine=obt_fine, comb=obt))
519
520            # obt_coarse_2 = payload[8:12]
521            # obt_coarse_2.dtype = np_uint32_be
522
523            # logging.debug(str(obt_coarse) + ' ' + str(obt_coarse_2))
524
525            if len(payload) < 14:
526                raise BadMUContent('Found line less than 14 chars in ' + self.path)
527
528            obt_coarse = payload[8:12].view(np_uint32_be)[0]
529            obt_fine = payload[12:14].view(np_uint16_be)[0]
530            obt = float(obt_coarse + (obt_fine / 0x10000))
531            # logging.debug(str(obt) + ' ' + str(type(obt)))
532
533            if version_number != 4:
534                if not MUContentReader.quiet:
535                    logging.warning('Found packet with version number {val}'.format(
536                        val=version_number))
537
538            if int(checks_failed, 16) != 0:
539                # a non-zero 'checks failed' means we discard this packet
540                if not MUContentReader.quiet:
541                    logging.warning('Found packet with checks failed {val}'.format(
542                        val=checks_failed))
543
544                self.checks_failed += 1
545                continue
546
547            if dfhf != 1:
548                # I don't know how serious this is. It seems to happen more in older
549                # data. We just throw a warning
550                if not MUContentReader.quiet:
551                    logging.warning('Found packet with data field header flag {val}'.format(
552                        val=dfhf))
553
554                self.dfhfz += 1
555
556            if segmentation_flags != 3:
557                if not MUContentReader.quiet:
558                    logging.warning('Found packet with segmentation flags {val}'.format(
559                        val=segmentation_flags))
560
561            if spare != 0:
562                if not MUContentReader.quiet:
563                    logging.warning('Found packet with spare {val}'.format(
564                        val=spare))
565
566            if checksum != 0:
567                if not MUContentReader.quiet:
568                    logging.warning('Found packet with checksum {val}'.format(
569                        val=checksum))
570
571            yield {
572                # 'gen_time': gen_time,
573                # 'reception_time': reception_time,
574                'payload': payload,
575                # 'version_number': version_number,
576                'type': ccsds_type,
577                # 'dfhf': dfhf,
578                'apid': apid,
579                'sid': sid,
580                'stream_id': stream_id,
581                # 'checks_failed': checks_failed,
582                'checks_performed': checks_performed,
583                # 'segmentation_flags': segmentation_flags,
584                'sequence_count': sequence_count,
585                'packet_length': packet_length,
586                # 'spare': spare,
587                # 'checksum': checksum,
588                'service_type': service_type,
589                'subservice_type': subservice_type,
590                'obt': obt,
591                }