1#!/usr/bin/env python3
2
3"""Test module for the Implementation of APEX History file ingestion."""
4
5from datetime import datetime
6
7import pytest
8
9from chartepssg import project # must come first
10from chart.common.test_utils import compare
11from chart.common.path import Path
12#from chart.project import SID
13#from chartepssg.alg.apex_common import TBL_APEX_LOG
14#from chartepssg.alg.apex_common import TBL_APEX_INSTANCES
15#from chartepssg.alg.hlch_ingester import HLCH_EXEC
16#from chartepssg.alg import hlch_ingester
17#from chartepssg.alg.pa_ingester import ProcedureArchiveParser
18#from chartepssg.alg.pa_ingester import PAEvent
19#from chartepssg.alg import pa_ingester
20from chartepssg.alg.settings import scid_to_sid
21
22
23# We use a standard test file for 2 tests
24HLCH_FILENAME = Path('APEX/SGxx_LOG_OAS__BUC_PROC_HIST__G20220712083000Z_S20220712083000Z_E20220712090000Z.txt')
25PA_FILENAME = Path('APEX/SGxx_LOG_OAS__BUC_PROC_HIST__G20220712083000Z_S20220712083000Z_E20220712090000Z.zip')
26
27
28def test_hlch_fileattr():
29 """Check we can decode a HLCH filename as per ICD."""
30 from chartepssg.alg import hl_prochist_ingester
31 sid, start, end = hl_prochist_ingester.fileattr(HLCH_FILENAME)
32 # assert sid == scid_to_sid('SGA1')
33 assert start == datetime(2022, 7, 12, 8, 30)
34 assert end == datetime(2022, 7, 12, 9)
35
36
37def test_pa_fileattr():
38 """Check we can decode a PA filename as per ICD."""
39 from chartepssg.alg import au_prochist_ingester
40 sid, start, end = au_prochist_ingester.fileattr(PA_FILENAME)
41 # assert sid == scid_to_sid('SGA1')
42 assert start == datetime(2022, 7, 12, 8, 30)
43 assert end == datetime(2022, 7, 12, 9)
44
45@pytest.mark.skip()
46def test_ingestion(product, database):
47 """Simple test to ingest APEX products."""
48
49 """If a writeable test database is available, ingest APEX History files."""
50 pytest.skip('Test fails intermittently depending on existing data in APEX_INSTANCES')
51 db = database(writeable=True)
52 if not db:
53 pytest.skip('No suitable test writeable database specified')
54
55 db_conn = db.connection.db_connect()
56 tbl_instances = TBL_APEX_INSTANCES
57 tbl_log = TBL_APEX_LOG
58
59 #
60 # #
61 # HLCH Ingestion
62 # #
63 #
64 hlch_file = product(HLCH_FILENAME)
65 ingester = HLCH_Ingester(hlch_file)
66 ingester.ingest()
67 start, stop = ingester.get_interval()
68
69 # check that we have read the correct number of events
70 assert start == datetime(2019, 10, 23, 5, 36, 16, 525000)
71 assert stop == datetime(2019, 10, 23, 5, 51, 24, 740000)
72
73 # query all instances that are procedures
74 sql = ('select count(*) '
75 'from {table} '
76 'where sensing_time >= :start '
77 'and sensing_time <= :stop '
78 'and level = \'PROCEDURE\'').format(table=tbl_instances.name)
79
80 count_inst = db_conn.query(sql, start, stop).fetchone()[0]
81 assert count_inst == 1, 'HLCH Instances Ingestion failed'
82
83 # query all log events that are procedures
84 sql = ('select count(*) '
85 'from {table} '
86 'where sensing_time >= :start '
87 'and sensing_time <= :stop '
88 'and level = \'PROCEDURE\'').format(table=tbl_log.name)
89
90 count_log = db_conn.query(sql, start, stop).fetchone()[0]
91 assert count_log == 15, 'HLCH Log Ingestion failed'
92
93 # check the FIRST procedure instance correctness
94 expected_instance = {
95 # 'SID': SAT_TO_SID.get('SGA1'),
96 # 'INSTANCE': 'OPE_P2816',
97 # 'EARLIEST_START_TIME': datetime(2019, 10, 23, 5, 36, 16, 525000),
98 # 'LATEST_END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
99 'DOMAIN': 'SGA1',
100 'EXECUTION_MODE': [['Automatic', '2019-10-23 05:36:16.525000']],
101 'START_TIME': datetime(2019, 10, 23, 5, 36, 16, 525000),
102 'END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
103 'IDENTIFIER': 'Send_17_01',
104 'VERSION': '6.0',
105 'SOURCE': 'CLIENT_A',
106 'PARENT': 'CLIENT_A',
107 'TITLE': 'Send TC(17,1)',
108 'STATE': 'Completed Verification in Error',
109 'INTERACTIONS': [
110 ['Pending', '2019-10-23 05:36:16.691000'],
111 ['Inactive', '2019-10-23 05:36:24.730000']
112 ],
113 'ARGUMENTS': {},
114 'VARIABLES': {},
115 }
116
117 actual_instance = ingester.proc_instances['OPE_P2816']
118
119 assert expected_instance == actual_instance
120
121 # check the LAST execution log
122 expected_log = HLCH_EXEC(
123 sid=SID('EPSSGA1'),
124 timestamp=datetime(2019, 10, 23, 5, 36, 24, 735000),
125 domain='SGA1',
126 instance='OPE_P2816',
127 attribute='STATE',
128 value='Completed Verification in Error',
129 counter=24
130 )
131
132 actual_log = ingester.proc_execution[-3]
133 assert expected_log == actual_log
134
135 #
136 # #
137 # PA Ingestion
138 # #
139 #
140 pa_file = product(PA_FILENAME)
141 ingester = ProcedureArchiveParser(pa_file)
142 ingester.ingest()
143 start, stop = ingester.get_interval()
144
145 assert start == datetime(2019, 10, 23, 5, 36, 16, 533000)
146 assert stop == datetime(2019, 10, 23, 5, 36, 24, 737000)
147
148 # query all instances that are NOT procedures
149 sql = ('select count(*) '
150 'from {table} '
151 'where sensing_time >= :start '
152 'and sensing_time <= :stop '
153 'and level <> \'PROCEDURE\'').format(table=tbl_instances.name)
154
155 count_inst = db_conn.query(sql, start, stop).fetchone()[0]
156 assert count_inst == 2, 'PA Instances Ingestion failed'
157
158 # query all log events that are NOT procedures
159 sql = ('select count(*) '
160 'from {table} '
161 'where sensing_time >= :start '
162 'and sensing_time <= :stop '
163 'and level <> \'PROCEDURE\'').format(table=tbl_log.name)
164
165 count_log = db_conn.query(sql, start, stop).fetchone()[0]
166 assert count_log == 13, 'PA Log Ingestion failed'
167
168 # check the LAST execution log
169 expected_step_log = PAEvent(
170 sid=SID('EPSSGA1'),
171 timestamp=datetime(2019, 10, 23, 5, 36, 16, 534000),
172 level='STEP',
173 domain='SGA1',
174 instance='OPE_P2816',
175 identifier='P.1',
176 attribute='STATE',
177 value='Executing Active',
178 counter=12
179 )
180 actual_step_log = ingester.pa_events[1]
181 # Check the step log event match
182 assert expected_step_log == actual_step_log
183
184 # check the FIRST procedure instance correctness
185 """
186 expected_step_inst = {
187 #'EVENT': expected_step_log,
188 #'START_TIME': datetime(2019, 10, 23, 5, 36, 16, 534000),
189 #'END_TIME': datetime(2019, 10, 23, 5, 36, 24, 737000),
190 # 'SOURCE': 'CLIENT_A',
191 # 'PARENT': 'CLIENT_A',
192 # 'PROCEDURE': 'Send_17_01',
193 'PROPERTIES': {
194 'Identifier': 'P.1', 'stepType': 'Send Command Step', 'Title': 'Send 17,1', 'Label': ''
195 },
196 'AUTHORISED': 'false',
197 '!AUTHORISED': 'false',
198 'TIME TAGGED': 'false',
199 'INSTANCE_IDENTIFIER': '0',
200 'COMMAND_COMPLETION_STATUS': 'RELEASE (UNSUCCESSFUL)',
201 'FAILURE_RECOVERY': 'COMPLETED',
202 'STATE': 'Aborted'
203 }
204
205 actual_step_inst = ingester.pa_objects['OPE_P2816']['P.1']
206 # check the step instance match
207 assert expected_step_inst == actual_step_inst
208 """
209
210
211"""
212# Use marks to declare this function as both a web related test, and a test that spawns a webserver
213# pytest has special flags to run only named marks, or exclude them
214@pytest.mark.web
215@pytest.mark.webserver
216def test_apex_api(product, webserver, database):
217 ""Ingest a HLCH product if not present, start a webserver and examine it.""
218 hlch_file = product(HLCH_FILENAME)
219 sid, start = fileattr(HLCH_FILENAME)
220 stop = start + timedelta(minutes=60) # testfile has 10 rows, with 1 hour worth of data
221
222 db = database(writeable=True)
223 if not db:
224 # So, we don't have a writeable database.
225 # Therefore if we're running with a local webserver we'll skip the test because
226 # the user should be able to supply a development database.
227 # However, if it's not a local database, allow the test to procede because we could be
228 # running against an OPE/VAL/TST system running an existing webserver
229 if webserver['local']:
230 pytest.skip('No writeable database configured')
231
232 urlbase = webserver['base_url']
233 # print('urlbase', urlbase)
234
235 def urltime(dt):
236 return dt.strftime('%Y-%m-%dT%H:%M:%S')
237
238 url = urlbase + 'api2/ts?table=APEX_INSTANCES&'
239 'field=INSTANCE&sid={sid}&start={start}&stop={stop}'.format(
240 sid=sid.short_name,
241 start=urltime(start),
242 stop=urltime(stop))
243
244 print('Requesting', url)
245 res = urllib.request.urlopen(url)
246
247 # print(dir(res))
248 # print('status', res.status)
249 # print('code', res.getcode())
250 # data = res.read()
251 # print('data', data, 'len', len(data))
252 lines = [l.decode().strip() for l in res.readlines()]
253 print('Got', len(lines), 'lines')
254 req_lines = ['OPE_P2816']
255 compare(lines, req_lines)
256"""