1#!/usr/bin/env python3
2
3"""Accessors for the PRODUCTS system table."""
4
5import os
6from enum import Enum
7from datetime import datetime
8from typing import Iterable
9from typing import Union
10
11from chart.db.connection import db_connect
12from chart.common.traits import is_listlike
13from chart.common.traits import name_of_thing
14from chart.project import SID
15
16db_conn = db_connect('PRODUCTS')
17
18# Pass to `find_products` as the `fields` argument to retrieve rowcount
19COUNT = ('count(*)', )
20
21
22class ProductResult(Enum):
23 """Values for PRODUCTS.RESULT field."""
24
25 OK = 'ok'
26 FAILED = 'failed'
27 MISSING = 'missing'
28 IN_PROGRESS = 'in_progress'
29
30
31def find_products(fields:Iterable[str], # =('FILENAME', ),
32 activity:Union["Activity",str]=None,
33 product_id:int=None,
34 sid:SID=None,
35 filename:str=None,
36 sensing_start_ge:datetime=None,
37 sensing_start_lt:datetime=None,
38 ingestion_time_ge:datetime=None,
39 ingestion_time_lt:datetime=None,
40 ordering:Union[str,Iterable[str]]=None) -> "Cursor":
41 """Scan the PRODUCTS table."""
42 clauses = []
43 bindvars = {}
44
45 if ordering is None:
46 ordering = ''
47
48 else:
49 if is_listlike(ordering):
50 ordering = ' ORDER BY {o}'.format(o=','.join(ordering))
51
52 else:
53 ordering = ' ORDER BY {o}'.format(o=ordering)
54
55 if activity is not None:
56 clauses.append('ACTIVITY=:activity')
57 bindvars['activity'] = name_of_thing(activity)
58
59 if product_id is not None:
60 clauses.append('id=:product_id')
61 bindvars['product_id'] = product_id
62
63 if sid is not None:
64 clauses.append(sid.sql_sys_where_bind('PRODUCTS'))
65 bindvars.update(sid.bind_sys_where('PRODUCTS'))
66
67 if filename is not None:
68 clauses.append('filename=:filename')
69 bindvars['filename'] = filename.name
70
71 if sensing_start_ge is not None:
72 clauses.append('sensing_start>=:sensing_start_ge')
73 bindvars['sensing_start_ge'] = sensing_start_ge
74
75 if sensing_start_lt is not None:
76 clauses.append('sensing_start<:sensing_start_lt')
77 bindvars['sensing_start_lt'] = sensing_start_lt
78
79 if ingestion_time_ge is not None:
80 clauses.append('ingestion_time>=:ingestion_time_ge')
81 bindvars['ingestion_time_ge'] = ingestion_time_ge
82
83 if ingestion_time_lt is not None:
84 clauses.append('ingestion_time<:ingestion_time_lt')
85 bindvars['ingestion_time_lt'] = ingestion_time_lt
86
87 return db_conn.query('SELECT {fields} '
88 'FROM PRODUCTS '
89 'WHERE {where}'
90 '{order}'.format(
91 fields=','.join(fields),
92 where=' AND '.join(clauses),
93 order=ordering),
94 **bindvars)
95
96
97def count_products(**kwargs):
98 return find_products(fields=COUNT, **kwargs).fetchone()[0]
99
100
101def insert_product(activity,
102 filename,
103 result,
104 sensing_start=None,
105 sid=None,
106 notes=None,
107 mtime=None,
108 filesize=None,
109 commit=True):
110 """Insert or update an entry in the PRODUCTS table.
111
112 Args:
113 `activity` (Activity): Activity which processed the product
114 `filename` (Path): Name (without directory) of the product
115 `result` (Result): Ingestion status
116 `sensing_start` (datetime): Theoretical start time of product
117 `sid` (SID): Associated source-ID
118 `notes` (str): For failed ingestions, record the reason
119 `mtime` (datetime): File modification time
120 `filesie` (int): File size
121 `commit` (bool): Commit change or wait
122
123 When inserting a new product all fields are used.
124 When changing an existing entry only the status and notes are used,
125 although activity and filename must be passed to identify the product.
126
127 Raises:
128 IntegrityError.
129 """
130 project_is_eps = os.environ['CHART_SETTINGS_MODULE'] == 'charteps.project_settings'
131 if project_is_eps:
132 # we never got around to adding the NOTES field to PRODUCTS in chart-eps
133 has_notes = False
134
135 else:
136 has_notes = True
137
138 row_id = None
139 for row_id, in find_products(fields=('ID',), activity=activity, filename=filename, sid=sid):
140 pass
141
142 ingestion_time = datetime.utcnow()
143
144 # if count_products(activity=activity, filename=filename, sid=sid) > 0:
145 if row_id is not None:
146 # (assumes sid won't change)
147 db_conn.query(
148 'UPDATE PRODUCTS '
149 'SET result=:result, notes=:notes '
150 'WHERE activity=:activity AND filename=:filename',
151 result=result.name,
152 notes=notes,
153 activity=name_of_thing(activity),
154 filename=filename.name)
155
156 else:
157 sid_fields, sid_binds = SID.sql_sys_insert('PRODUCTS')
158 if has_notes:
159 row_id = db_conn.query_insert_with_return(
160 'INSERT INTO PRODUCTS (activity, filename, result, sensing_start, notes, ingestion_time, mtime, filesize{sidfields}) '
161 'VALUES (:activity, :filename, :result, :sensing_start, :notes, :ingestion_time, :mtime, :filesize{sidbinds})'.format(
162 sidfields=sid_fields, sidbinds=sid_binds),
163 'ID',
164 activity=name_of_thing(activity),
165 filename=filename.name,
166 result=result.name,
167 sensing_start=sensing_start,
168 notes=notes,
169 ingestion_time=ingestion_time,
170 mtime=mtime,
171 filesize=filesize,
172 **SID.bind_sys_insert('PRODUCTS', sid))
173
174 else:
175 db_conn.query(
176 'INSERT INTO PRODUCTS (activity, filename, result, sensing_start, ingestion_time{sidfields}) '
177 'VALUES (:activity, :filename, :result, :sensing_start, :ingestion_time{sidbinds})'.format(
178 sidfields=sid_fields, sidbinds=sid_binds),
179 activity=name_of_thing(activity),
180 filename=filename.name,
181 result=result.name,
182 sensing_start=sensing_start,
183 ingestion_time=ingestion_time,
184 **SID.bind_sys_insert('PRODUCTS', sid))
185
186 if commit:
187 db_conn.commit()
188
189 return row_id