1#!/usr/bin/env python3
2
3"""Access to the PROCESSES table."""
4
5import logging
6from datetime import datetime
7from enum import Enum
8
9from chart.db.connection import db_connect
10from chart.db.exceptions import SQLError
11from chart.project import settings
12
13logger = logging.getLogger()
14
15# Allowed values for the PROCESS table STATUS field:
16# - status cannot be null
17# - COMPLETED: Process finished for no global problems
18# - FAILED: Dispatcher failed to properly run the process, or it messed up the results.xml
19# file or something similar
20# - TERMINATED: Usually from a user signal
21# - TIMEOUT: Timeout by the worker
22# - RETRY: Not used much - /leo used to fill up reguarly so we used this. Could be used to
23# record a temporary problem in executing the jobs
24ProcessStatus = Enum('ProcessStatus', 'COMPLETED FAILED RUNNING TERMINATED TIMEOUT RETRY')
25
26default_db_conn = db_connect('PROCESSES')
27
28
29def find_processes(fields,
30 process_id=None,
31 status=None,
32 worker=None,
33 gen_time_lt=None,
34 execute_start_ge=None,
35 execute_stop_le=None,
36 ordering=None,
37 db_conn=default_db_conn):
38 """Return a cursor of rows from the PROCESSES table."""
39 bindvars = {}
40 clauses = ['p.id=j.process_id']
41
42 if process_id is not None:
43 clauses.append('p.id=:procid')
44 bindvars['procid'] = process_id
45
46 if status is not None:
47 clauses.append('p.status=:status')
48 bindvars['status'] = status
49
50 if worker is not None:
51 clauses.append('p.worker=:worker')
52 bindvars['worker'] = worker
53
54 if gen_time_lt is not None:
55 clauses.append('p.gen_time<:gen_time_lt')
56 bindvars['gen_time_lt'] = gen_time_lt
57
58 if execute_start_ge is not None:
59 clauses.append('p.execute_start>=:execute_start_ge')
60 bindvars['execute_start_ge'] = execute_start_ge
61
62 if execute_stop_le is not None:
63 clauses.append('p.execute_stop<=:execute_stop_le')
64 bindvars['execute_stop_le'] = execute_stop_le
65
66 if ordering is not None:
67 ordering_sql = ' ORDER BY {o}'.format(o=','.join(ordering))
68
69 else:
70 ordering_sql = ''
71
72 if settings.DATABASE_PROJECT_ID is not None:
73 clauses.append('j.project=:project')
74 bindvars['project'] = settings.DATABASE_PROJECT_ID
75
76 # The join is only needed in the case we are using DATABASE_PROJECT_ID
77 # so could be optimised away for projects that don't use it
78 return db_conn.query(('SELECT DISTINCT {fields} '
79 'FROM PROCESSES p, JOBS j '
80 'WHERE {where}{order}').format(
81 fields=','.join('p.{f}'.format(f=f) for f in fields),
82 where=' AND '.join(clauses),
83 order=ordering_sql),
84 **bindvars)
85
86
87def find_single_process(fields,
88 process_id=None,
89 db_conn=default_db_conn):
90 """Return a single row from the PROCESSES table."""
91 return find_processes(fields=fields, process_id=process_id, db_conn=db_conn).fetchone()
92
93
94def insert_process(worker,
95 exe_start=None,
96 status=ProcessStatus.RUNNING.name,
97 db_conn=default_db_conn,
98 commit=True):
99 """Create a new entry in the PROCESSES table, returning it's process id."""
100 if exe_start is None:
101 exe_start = datetime.utcnow()
102
103 proc_id = db_conn.query_insert_with_return(
104 'INSERT INTO processes (worker,execute_start,status) '
105 'VALUES (:worker, :exestart, :status) ',
106 'id',
107 worker=worker,
108 exestart=exe_start,
109 status=status)
110
111 if commit:
112 db_conn.commit()
113
114 logging.debug('Created process {id} for {worker}'.format(id=proc_id, worker=worker))
115 return proc_id
116
117
118def update_process(process_id,
119 set_working_dir=None,
120 set_status=None,
121 set_execute_stop=None,
122 set_critical_count=None,
123 set_error_count=None,
124 set_warning_count=None,
125 set_info_count=None,
126 commit=True,
127 db_conn=default_db_conn):
128 """Alter the PROCESSES table.
129
130 Args:
131 `process_id` (int): Process ID to change
132 `set_working_dir` (Path): New working_dir
133 `set_status` (ProcessStatus): New status
134 `set_execute_stop` (datetime): New execution stop time
135 `commit` (bool): Autocommit
136 """
137
138 wheres = []
139 updates = []
140 bindvars = {}
141
142 desc_affected = []
143 desc_changes = []
144
145 # search clauses
146 if process_id is not None:
147 wheres.append('ID=:process_id')
148 bindvars['process_id'] = process_id
149 desc_affected.append(process_id)
150
151 # change clauses
152 if set_status is not None:
153 updates.append('STATUS=:set_status')
154 bindvars['set_status'] = set_status.name
155 desc_changes.append('status={c}'.format(c=set_status.name))
156
157 if set_execute_stop is not None:
158 updates.append('EXECUTE_STOP=:set_execute_stop')
159 bindvars['set_execute_stop'] = set_execute_stop
160 desc_changes.append('execute_stop={c}'.format(c=set_execute_stop))
161
162 if set_working_dir is not None:
163 updates.append('WORKING_DIR=:set_working_dir')
164 bindvars['set_working_dir'] = str(set_working_dir)
165 desc_changes.append('working_dir={c}'.format(c=set_working_dir))
166
167 db_conn.query('UPDATE PROCESSES SET {updates} WHERE {where}'.format(
168 updates=','.join(updates), where=' AND '.join(wheres)),
169 **bindvars)
170
171 logging.debug('Set {changes} of process {affected} with commit {commit}'.format(
172 changes=', '.join(desc_changes),
173 affected=', '.join(str(i) for i in desc_affected),
174 commit=commit))
175
176 if commit:
177 db_conn.commit()
178
179 # logging.debug('Updated process {id} to dir {dir}'.format(id=process_id, dir=set_working_dir))
180
181
182def delete_processes(gen_time_lt=None,
183 commit=True,
184 db_conn=default_db_conn):
185 """Delete from PROCESSES table."""
186 clauses = []
187 bindvars = {}
188
189 if gen_time_lt is not None:
190 clauses.append('gen_time<:gen_time_lt')
191 bindvars['gen_time_lt'] = gen_time_lt
192
193 cur = db_conn.query('DELETE FROM PROCESSES WHERE {where}'.format(where=','.join(clauses)),
194 **bindvars)
195
196 if commit:
197 db_conn.commit()
198
199 return cur.rowcount