1#!/usr/bin/env python3
  2
  3"""Test module for the Implementation of APEX History file ingestion."""
  4
  5from datetime import datetime
  6
  7import pytest
  8
  9from chartepssg import project  # must come first
 10from chart.common.test_utils import compare
 11from chart.common.path import Path
 12#from chart.project import SID
 13#from chartepssg.alg.apex_common import TBL_APEX_LOG
 14#from chartepssg.alg.apex_common import TBL_APEX_INSTANCES
 15#from chartepssg.alg.hlch_ingester import HLCH_EXEC
 16#from chartepssg.alg import hlch_ingester
 17#from chartepssg.alg.pa_ingester import ProcedureArchiveParser
 18#from chartepssg.alg.pa_ingester import PAEvent
 19#from chartepssg.alg import pa_ingester
 20from chartepssg.alg.settings import scid_to_sid
 21
 22
 23# We use a standard test file for 2 tests
 24HLCH_FILENAME = Path('APEX/SGxx_LOG_OAS__BUC_PROC_HIST__G20220712083000Z_S20220712083000Z_E20220712090000Z.txt')
 25PA_FILENAME = Path('APEX/SGxx_LOG_OAS__BUC_PROC_HIST__G20220712083000Z_S20220712083000Z_E20220712090000Z.zip')
 26
 27
 28def test_hlch_fileattr():
 29    """Check we can decode a HLCH filename as per ICD."""
 30    from chartepssg.alg import hl_prochist_ingester
 31    sid, start, end = hl_prochist_ingester.fileattr(HLCH_FILENAME)
 32    # assert sid == scid_to_sid('SGA1')
 33    assert start == datetime(2022, 7, 12, 8, 30)
 34    assert end == datetime(2022, 7, 12, 9)
 35
 36
 37def test_pa_fileattr():
 38    """Check we can decode a PA filename as per ICD."""
 39    from chartepssg.alg import au_prochist_ingester
 40    sid, start, end = au_prochist_ingester.fileattr(PA_FILENAME)
 41    # assert sid == scid_to_sid('SGA1')
 42    assert start == datetime(2022, 7, 12, 8, 30)
 43    assert end == datetime(2022, 7, 12, 9)
 44
 45@pytest.mark.skip()
 46def test_ingestion(product, database):
 47    """Simple test to ingest APEX products."""
 48
 49    """If a writeable test database is available, ingest APEX History files."""
 50    pytest.skip('Test fails intermittently depending on existing data in APEX_INSTANCES')
 51    db = database(writeable=True)
 52    if not db:
 53        pytest.skip('No suitable test writeable database specified')
 54
 55    db_conn = db.connection.db_connect()
 56    tbl_instances = TBL_APEX_INSTANCES
 57    tbl_log = TBL_APEX_LOG
 58
 59    #
 60    # #
 61    # HLCH Ingestion
 62    # #
 63    #
 64    hlch_file = product(HLCH_FILENAME)
 65    ingester = HLCH_Ingester(hlch_file)
 66    ingester.ingest()
 67    start, stop = ingester.get_interval()
 68
 69    # check that we have read the correct number of events
 70    assert start == datetime(2019, 10, 23, 5, 36, 16, 525000)
 71    assert stop == datetime(2019, 10, 23, 5, 51, 24, 740000)
 72
 73    # query all instances that are procedures
 74    sql = ('select count(*) '
 75           'from {table} '
 76           'where sensing_time >= :start '
 77           'and sensing_time <= :stop '
 78           'and level = \'PROCEDURE\'').format(table=tbl_instances.name)
 79
 80    count_inst = db_conn.query(sql, start, stop).fetchone()[0]
 81    assert count_inst == 1, 'HLCH Instances Ingestion failed'
 82
 83    # query all log events that are procedures
 84    sql = ('select count(*) '
 85           'from {table} '
 86           'where sensing_time >= :start '
 87           'and sensing_time <= :stop '
 88           'and level = \'PROCEDURE\'').format(table=tbl_log.name)
 89
 90    count_log = db_conn.query(sql, start, stop).fetchone()[0]
 91    assert count_log == 15, 'HLCH Log Ingestion failed'
 92
 93    # check the FIRST procedure instance correctness
 94    expected_instance = {
 95        # 'SID': SAT_TO_SID.get('SGA1'),
 96        # 'INSTANCE': 'OPE_P2816',
 97        # 'EARLIEST_START_TIME': datetime(2019, 10, 23, 5, 36, 16, 525000),
 98        # 'LATEST_END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
 99        'DOMAIN': 'SGA1',
100        'EXECUTION_MODE': [['Automatic', '2019-10-23 05:36:16.525000']],
101        'START_TIME': datetime(2019, 10, 23, 5, 36, 16, 525000),
102        'END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
103        'IDENTIFIER': 'Send_17_01',
104        'VERSION': '6.0',
105        'SOURCE': 'CLIENT_A',
106        'PARENT': 'CLIENT_A',
107        'TITLE': 'Send TC(17,1)',
108        'STATE': 'Completed Verification in Error',
109        'INTERACTIONS': [
110            ['Pending', '2019-10-23 05:36:16.691000'],
111            ['Inactive', '2019-10-23 05:36:24.730000']
112        ],
113        'ARGUMENTS': {},
114        'VARIABLES': {},
115    }
116
117    actual_instance = ingester.proc_instances['OPE_P2816']
118
119    assert expected_instance == actual_instance
120
121    # check the LAST execution log
122    expected_log = HLCH_EXEC(
123            sid=SID('EPSSGA1'),
124            timestamp=datetime(2019, 10, 23, 5, 36, 24, 735000),
125            domain='SGA1',
126            instance='OPE_P2816',
127            attribute='STATE',
128            value='Completed Verification in Error',
129            counter=24
130    )
131
132    actual_log = ingester.proc_execution[-3]
133    assert expected_log == actual_log
134
135    #
136    # #
137    # PA Ingestion
138    # #
139    #
140    pa_file = product(PA_FILENAME)
141    ingester = ProcedureArchiveParser(pa_file)
142    ingester.ingest()
143    start, stop = ingester.get_interval()
144
145    assert start == datetime(2019, 10, 23, 5, 36, 16, 533000)
146    assert stop == datetime(2019, 10, 23, 5, 36, 24, 737000)
147
148    # query all instances that are NOT procedures
149    sql = ('select count(*) '
150           'from {table} '
151           'where sensing_time >= :start '
152           'and sensing_time <= :stop '
153           'and level <> \'PROCEDURE\'').format(table=tbl_instances.name)
154
155    count_inst = db_conn.query(sql, start, stop).fetchone()[0]
156    assert count_inst == 2, 'PA Instances Ingestion failed'
157
158    # query all log events that are NOT procedures
159    sql = ('select count(*) '
160           'from {table} '
161           'where sensing_time >= :start '
162           'and sensing_time <= :stop '
163           'and level <> \'PROCEDURE\'').format(table=tbl_log.name)
164
165    count_log = db_conn.query(sql, start, stop).fetchone()[0]
166    assert count_log == 13, 'PA Log Ingestion failed'
167
168    # check the LAST execution log
169    expected_step_log = PAEvent(
170            sid=SID('EPSSGA1'),
171            timestamp=datetime(2019, 10, 23, 5, 36, 16, 534000),
172            level='STEP',
173            domain='SGA1',
174            instance='OPE_P2816',
175            identifier='P.1',
176            attribute='STATE',
177            value='Executing Active',
178            counter=12
179    )
180    actual_step_log = ingester.pa_events[1]
181    # Check the step log event match
182    assert expected_step_log == actual_step_log
183
184    # check the FIRST procedure instance correctness
185    """
186    expected_step_inst = {
187        #'EVENT': expected_step_log,
188        #'START_TIME': datetime(2019, 10, 23, 5, 36, 16, 534000),
189        #'END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
190        # 'SOURCE': 'CLIENT_A',
191        # 'PARENT': 'CLIENT_A',
192        # 'PROCEDURE': 'Send_17_01',
193        'PROPERTIES': {
194            'Identifier': 'P.1', 'stepType': 'Send Command Step', 'Title': 'Send 17,1', 'Label': ''
195        },
196        'AUTHORISED': 'false',
197        '!AUTHORISED': 'false',
198        'TIME TAGGED': 'false',
199        'INSTANCE_IDENTIFIER': '0',
200        'COMMAND_COMPLETION_STATUS': 'RELEASE (UNSUCCESSFUL)',
201        'FAILURE_RECOVERY': 'COMPLETED',
202        'STATE': 'Aborted'
203    }
204
205    actual_step_inst = ingester.pa_objects['OPE_P2816']['P.1']
206    # check the step instance match
207    assert expected_step_inst == actual_step_inst
208    """
209
210
211"""
212# Use marks to declare this function as both a web related test, and a test that spawns a webserver
213# pytest has special flags to run only named marks, or exclude them
214@pytest.mark.web
215@pytest.mark.webserver
216def test_apex_api(product, webserver, database):
217    ""Ingest a HLCH product if not present, start a webserver and examine it.""
218    hlch_file = product(HLCH_FILENAME)
219    sid, start = fileattr(HLCH_FILENAME)
220    stop = start + timedelta(minutes=60)  # testfile has 10 rows, with 1 hour worth of data
221
222    db = database(writeable=True)
223    if not db:
224        # So, we don't have a writeable database.
225        # Therefore if we're running with a local webserver we'll skip the test because
226        # the user should be able to supply a development database.
227        # However, if it's not a local database, allow the test to procede because we could be
228        # running against an OPE/VAL/TST system running an existing webserver
229        if webserver['local']:
230            pytest.skip('No writeable database configured')
231
232    urlbase = webserver['base_url']
233    # print('urlbase', urlbase)
234
235    def urltime(dt):
236        return dt.strftime('%Y-%m-%dT%H:%M:%S')
237
238    url = urlbase + 'api2/ts?table=APEX_INSTANCES&'
239                    'field=INSTANCE&sid={sid}&start={start}&stop={stop}'.format(
240        sid=sid.short_name,
241        start=urltime(start),
242        stop=urltime(stop))
243
244    print('Requesting', url)
245    res = urllib.request.urlopen(url)
246
247    # print(dir(res))
248    # print('status', res.status)
249    # print('code', res.getcode())
250    # data = res.read()
251    # print('data', data, 'len', len(data))
252    lines = [l.decode().strip() for l in res.readlines()]
253    print('Got', len(lines), 'lines')
254    req_lines =  ['OPE_P2816']
255    compare(lines, req_lines)
256"""