1#!/usr/bin/env python3
  2
  3"""Access to the PROCESSES table."""
  4
  5import logging
  6from datetime import datetime
  7from enum import Enum
  8
  9from chart.db.connection import db_connect
 10from chart.db.exceptions import SQLError
 11from chart.project import settings
 12
 13logger = logging.getLogger()
 14
 15# Allowed values for the PROCESS table STATUS field:
 16# - status cannot be null
 17# - COMPLETED: Process finished for no global problems
 18# - FAILED: Dispatcher failed to properly run the process, or it messed up the results.xml
 19#           file or something similar
 20# - TERMINATED: Usually from a user signal
 21# - TIMEOUT: Timeout by the worker
 22# - RETRY: Not used much - /leo used to fill up reguarly so we used this. Could be used to
 23#          record a temporary problem in executing the jobs
 24ProcessStatus = Enum('ProcessStatus', 'COMPLETED FAILED RUNNING TERMINATED TIMEOUT RETRY')
 25
 26default_db_conn = db_connect('PROCESSES')
 27
 28
 29def find_processes(fields,
 30                   process_id=None,
 31                   status=None,
 32                   worker=None,
 33                   gen_time_lt=None,
 34                   execute_start_ge=None,
 35                   execute_stop_le=None,
 36                   ordering=None,
 37                   db_conn=default_db_conn):
 38    """Return a cursor of rows from the PROCESSES table."""
 39    bindvars = {}
 40    clauses = ['p.id=j.process_id']
 41
 42    if process_id is not None:
 43        clauses.append('p.id=:procid')
 44        bindvars['procid'] = process_id
 45
 46    if status is not None:
 47        clauses.append('p.status=:status')
 48        bindvars['status'] = status
 49
 50    if worker is not None:
 51        clauses.append('p.worker=:worker')
 52        bindvars['worker'] = worker
 53
 54    if gen_time_lt is not None:
 55        clauses.append('p.gen_time<:gen_time_lt')
 56        bindvars['gen_time_lt'] = gen_time_lt
 57
 58    if execute_start_ge is not None:
 59        clauses.append('p.execute_start>=:execute_start_ge')
 60        bindvars['execute_start_ge'] = execute_start_ge
 61
 62    if execute_stop_le is not None:
 63        clauses.append('p.execute_stop<=:execute_stop_le')
 64        bindvars['execute_stop_le'] = execute_stop_le
 65
 66    if ordering is not None:
 67        ordering_sql = ' ORDER BY {o}'.format(o=','.join(ordering))
 68
 69    else:
 70        ordering_sql = ''
 71
 72    if settings.DATABASE_PROJECT_ID is not None:
 73        clauses.append('j.project=:project')
 74        bindvars['project'] = settings.DATABASE_PROJECT_ID
 75
 76    # The join is only needed in the case we are using DATABASE_PROJECT_ID
 77    # so could be optimised away for projects that don't use it
 78    return db_conn.query(('SELECT DISTINCT {fields} '
 79                          'FROM PROCESSES p, JOBS j '
 80                          'WHERE {where}{order}').format(
 81        fields=','.join('p.{f}'.format(f=f) for f in fields),
 82        where=' AND '.join(clauses),
 83        order=ordering_sql),
 84                         **bindvars)
 85
 86
 87def find_single_process(fields,
 88                        process_id=None,
 89                        db_conn=default_db_conn):
 90    """Return a single row from the PROCESSES table."""
 91    return find_processes(fields=fields, process_id=process_id, db_conn=db_conn).fetchone()
 92
 93
 94def insert_process(worker,
 95                   exe_start=None,
 96                   status=ProcessStatus.RUNNING.name,
 97                   db_conn=default_db_conn,
 98                   commit=True):
 99    """Create a new entry in the PROCESSES table, returning it's process id."""
100    if exe_start is None:
101        exe_start = datetime.utcnow()
102
103    proc_id = db_conn.query_insert_with_return(
104        'INSERT INTO processes (worker,execute_start,status) '
105        'VALUES (:worker, :exestart, :status) ',
106        'id',
107        worker=worker,
108        exestart=exe_start,
109        status=status)
110
111    if commit:
112        db_conn.commit()
113
114    logging.debug('Created process {id} for {worker}'.format(id=proc_id, worker=worker))
115    return proc_id
116
117
118def update_process(process_id,
119                   set_working_dir=None,
120                   set_status=None,
121                   set_execute_stop=None,
122                   set_critical_count=None,
123                   set_error_count=None,
124                   set_warning_count=None,
125                   set_info_count=None,
126                   commit=True,
127                   db_conn=default_db_conn):
128    """Alter the PROCESSES table.
129
130    Args:
131        `process_id` (int): Process ID to change
132        `set_working_dir` (Path): New working_dir
133        `set_status` (ProcessStatus): New status
134        `set_execute_stop` (datetime): New execution stop time
135        `commit` (bool): Autocommit
136    """
137
138    wheres = []
139    updates = []
140    bindvars = {}
141
142    desc_affected = []
143    desc_changes = []
144
145    # search clauses
146    if process_id is not None:
147        wheres.append('ID=:process_id')
148        bindvars['process_id'] = process_id
149        desc_affected.append(process_id)
150
151    # change clauses
152    if set_status is not None:
153        updates.append('STATUS=:set_status')
154        bindvars['set_status'] = set_status.name
155        desc_changes.append('status={c}'.format(c=set_status.name))
156
157    if set_execute_stop is not None:
158        updates.append('EXECUTE_STOP=:set_execute_stop')
159        bindvars['set_execute_stop'] = set_execute_stop
160        desc_changes.append('execute_stop={c}'.format(c=set_execute_stop))
161
162    if set_working_dir is not None:
163        updates.append('WORKING_DIR=:set_working_dir')
164        bindvars['set_working_dir'] = str(set_working_dir)
165        desc_changes.append('working_dir={c}'.format(c=set_working_dir))
166
167    db_conn.query('UPDATE PROCESSES SET {updates} WHERE {where}'.format(
168        updates=','.join(updates), where=' AND '.join(wheres)),
169                  **bindvars)
170
171    logging.debug('Set {changes} of process {affected} with commit {commit}'.format(
172            changes=', '.join(desc_changes),
173            affected=', '.join(str(i) for i in desc_affected),
174            commit=commit))
175
176    if commit:
177        db_conn.commit()
178
179    # logging.debug('Updated process {id} to dir {dir}'.format(id=process_id, dir=set_working_dir))
180
181
182def delete_processes(gen_time_lt=None,
183                     commit=True,
184                     db_conn=default_db_conn):
185    """Delete from PROCESSES table."""
186    clauses = []
187    bindvars = {}
188
189    if gen_time_lt is not None:
190        clauses.append('gen_time<:gen_time_lt')
191        bindvars['gen_time_lt'] = gen_time_lt
192
193    cur = db_conn.query('DELETE FROM PROCESSES WHERE {where}'.format(where=','.join(clauses)),
194                  **bindvars)
195
196    if commit:
197        db_conn.commit()
198
199    return cur.rowcount