1#!/usr/bin/env python3
2
3"""Generic implementation for selecting the next job(s) for a worker."""
4
5from datetime import datetime
6from typing import Optional
7
8from chart.backend.jobs import find_jobs
9from chart.backend.job import job_retrieve_fields
10from chart.backend.job import Job
11from chart.db.func import ANY
12from chart.backend.job import JobStatus
13
14def select_next_job(category,
15 now=datetime.utcnow(),
16 only_activities=None,
17 exclude_activities=None,
18 only_ingestion=False,
19 exclude_ingestion=False,
20 only_filenames=None,
21 db_conn=None,
22 lock=False) -> Optional[Job]:
23 """Pick the next job to process for `category`.
24 Look for ingestions first, then per-orbit algorithms, then others.
25 For some data types the order of ingestion counts, so we order all ingestions
26 by filename.
27 If activities is a non-empty list of Activity objects, restrict answer to just those activities.
28
29 Note, all these parameters can be safely ignored by alternate implementations of this function.
30 They are only needed if you want to run multiple `worker`s with each one handling different
31 types of jobs.
32
33 We use the seemingly-unnecessary only/exclude_ingestion parameters because it makes the
34 implementation of this function much easier and also makes it easier to set up special
35 ingestion-only workers and processing-only workers.
36
37 Args:
38 category (str): Category to process.
39 now (datetime): Assume the current time is `now`.
40 only_activities (List[str]): Only consider activities in this list.
41 exclude_activities (List[str]): Ignore these activities.
42 only_ingestion (bool): Only look at ingestion jobs.
43 exclude_ingestion (bool): Ignore all ingestion activities.
44 only_filenames (str): Not implemented. This could be used to construct worker processes
45 specific to certain file types.
46
47 Returns:
48 Optional[Job]: The next job to be executed
49 """
50 if only_ingestion:
51 # Scan for only ingestion jobs, applying filename wildcard if specified
52 result = find_jobs(fields=job_retrieve_fields(),
53 category=category,
54 status=JobStatus.PENDING,
55 filename_ne=None,
56 filename_like=only_filenames,
57 limit=1,
58 db_conn=db_conn,
59 lock=lock).fetchone()
60 if result is None:
61 return None
62
63 else:
64 return Job(row=result)
65
66 result = find_jobs(fields=job_retrieve_fields(),
67 category=category,
68 status=JobStatus.PENDING,
69 filename=None if exclude_ingestion else ANY,
70 earliest_execution_time_null_le=now,
71 activity=only_activities,
72 activity_ne=exclude_activities,
73 limit=1,
74 db_conn=db_conn,
75 lock=lock).fetchone()
76 if result is None:
77 return None
78
79 return Job(row=result)
80
81
82def find_similar_jobs(job, # (unused arg) pylint: disable=W0613
83 count, # pylint: disable=W0613
84 now, # pylint: disable=W0613
85 only_filenames=None): # pylint: disable=W0613
86 """Given a primary job `jobid` identify similar jobs that
87 can be performed as a group by a single call to the algorithm,
88 up to a maximum of `count` total jobs.
89 For report activities we return nothing since they are never batched together.
90 """
91 # It's only CHART-EPS where we run so many jobs that we really need to batch them so
92 # I'm not going to implement it for now
93 return []