1#!/usr/bin/env python3
2
3"""Wrapper to extend the argparse class with some timeseries specific functionality."""
4
5import os
6import sys
7import argparse
8
9import chart
10from chart.common.path import Path
11
12# class boldArgumentDefaultsHelpFormatter(argparse.HelpFormatter):
13# """Help message formatter which adds default values to argument help.
14
15# Only the name of this class is considered a public API. All the methods
16# provided by the class are considered an implementation detail.
17# """
18
19# def _get_help_string(self, action):
20# from chart.common.terminal import TerminalController
21# tc = TerminalController()
22# help = action.help
23# if '%(default)' not in action.help:
24# if action.default is not argparse.SUPPRESS:
25# defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
26# if action.option_strings or action.nargs in defaulting_nargs:
27# help += ' (default: %(default)s)' + tc.NORMAL
28# return help
29
30
31class ArgumentParser(argparse.ArgumentParser):
32 """Replace system ArgumentParser with a few defaults changed."""
33
34 @staticmethod
35 def input_filename(filename):
36 """Accept only valid, existing filename but do not open it."""
37 path = Path(filename)
38
39 if not path.is_file():
40 if path.is_dir():
41 raise argparse.ArgumentError(None, 'Directory names are not allowed ({p})'.format(
42 p=path))
43
44 else:
45 raise argparse.ArgumentError(None, 'File {path} not found'.format(path=path))
46
47 return path
48
49 # Mode = enum('Mode', 'READ WRITE CREATE UPDATE')
50 # read - must already exist
51 # write - must be writeable. existing will probably be replaced. does not need to exist
52 # create - must be writeable but not already exist
53 # update - must be writeable, must already exist
54 # FileClass = enum('FileClass', 'FILE DIR FILE_OR_DIR')
55 # @staticmethod
56 # def path(mode, type):
57 # def imp():
58 # pass
59
60 # return imp
61
62 @staticmethod
63 def input_dirname(filename):
64 """Accept only valid, existing directory name."""
65 path = Path(filename)
66
67 if not path.is_dir():
68 if path.is_file():
69 raise argparse.ArgumentError(None, 'File names are not allowed')
70
71 else:
72 raise argparse.ArgumentError(None, 'File {path} not found'.format(path=path))
73
74 return path
75
76 @staticmethod
77 def input_filename_or_dirname(test):
78 """Accept either existing readable filename or existing readable directory name."""
79 path = Path(test)
80 if not path.exists():
81 raise argparse.ArgumentError(None, 'Cannot find {p}'.format(p=path))
82
83 return path
84
85 @staticmethod
86 def output_filename(filename):
87 """Accept only valid, writeable filename but do not open it."""
88 path = Path(filename)
89 if path.is_dir():
90 raise argparse.ArgumentError(None, '{path} is a directory'.format(path=path))
91
92 return Path(filename)
93
94 @staticmethod
95 def output_filename_or_dir(name):
96 """Accept only valid, writeable file or directory name but do not open it."""
97 return Path(name)
98
99 @staticmethod
100 def output_dir(name):
101 """Accept only valid, writeable file or directory name but do not open it."""
102 return Path(name)
103
104 @staticmethod
105 def create_dir(name):
106 """Accept only valid, writeable directory name. Create if not exists."""
107 result = Path(name)
108 if result.exists():
109 if result.is_file():
110 raise ValueError('{name} exists but is a file not a directory'.format(
111 name=name))
112
113 else:
114 # Create new directory and parents if needed
115 result.mkdir(parents=True)
116
117 return result
118
119 @staticmethod
120 def sid(test_sids):
121 """Accept only valid SID from settings.py.
122 Parameter could be a list or a string."""
123 from chart.project import SID
124 # from chart import project
125 # sid = project.sid(string=...)
126
127 if isinstance(test_sids, list):
128 return [SID(s) for s in test_sids]
129
130 else:
131 return SID(test_sids)
132
133 @staticmethod
134 def ts_table(table_name):
135 """Accept only strings which are timeseries (raw, view, derived) table names.
136 Return a TableInfo object."""
137 from chart.db.model.table import TableInfo
138 table_info = TableInfo(table_name)
139 if not table_info.is_timeseries:
140 raise ValueError(table_name + ' is not a timeseries table')
141
142 return table_info
143
144 @staticmethod
145 def cfid(oper_vali):
146 """For MSG all data is stamped as OPER or VALI."""
147
148 upper = oper_vali.upper()
149 if upper == 'OPER' or upper == 'VALI':
150 return upper
151
152 else:
153 raise ValueError('CFID must be "OPER" or "VALI"')
154
155 @staticmethod
156 def ogsid(oper_vali):
157 """For MSG all data is stamped as PRIM or BACK."""
158
159 upper = oper_vali.upper()
160 if upper == 'PRIM' or upper == 'BACK':
161 return upper
162
163 else:
164 raise ValueError('OGSID must be "PRIM" or "BACK"')
165
166 @staticmethod
167 def activity(activity):
168 """Accept only activity names and return an Activity object."""
169 from chart.backend.activity import Activity
170 res = Activity(activity)
171 return res
172
173 @staticmethod
174 def eventclass(eventclass):
175 """Accept only strings which are event class names
176 Returns an EventClass object."""
177
178 from chart.events.eventclass import EventClass
179 res = EventClass(eventclass)
180
181 return res
182
183 @staticmethod
184 def timedelta(instr):
185 """Parse timedelta with reasonable error handling."""
186 from chart.common.xml import xml_to_timedelta
187
188 try:
189 return xml_to_timedelta(instr.upper())
190 except ValueError:
191 _, exc_value, _ = sys.exc_info()
192 raise argparse.ArgumentError(None, exc_value)
193
194 @staticmethod
195 def datetime(instr):
196 """Parse datetime with reasonable error handling."""
197 from chart.common.xml import xml_to_datetime
198
199 try:
200 # print(instr, xml_to_datetime(instr))
201 return xml_to_datetime(instr)
202 except ValueError:
203 _, exc_value, _ = sys.exc_info()
204 raise argparse.ArgumentError(None, exc_value)
205
206 @staticmethod
207 def table(instr):
208 """Parse a table name and return a TableInfo object."""
209 from chart.db.model.table import TableInfo
210 return TableInfo(instr)
211
212 @staticmethod
213 def start_time(passthrough):
214 """Handled in the parse_args function. This function only exists as a marker."""
215 return passthrough
216
217 @staticmethod
218 def stop_time(passthrough):
219 """Handled in the parse_args function."""
220 return passthrough
221
222 @staticmethod
223 def period(passthrough):
224 """Handled in the parse_args function."""
225 return passthrough
226
227 def __init__(self, description=None):
228 from chart.project import settings
229 from chart.common import errors
230
231 # Install our exception handler
232 # (nothing do to with argument parsing, just a convenient place to call it)
233 errors.init_handler()
234
235 # pylint doesn't like super() here
236 # uper(argparse.ArgumentParser, self).__init__()
237 argparse.ArgumentParser.__init__(self)
238
239 if sys.argv[0].endswith('.py'):
240 tool = os.path.splitext(os.path.basename(sys.argv[0]))[0]
241
242 else:
243 tool = os.path.basename(sys.argv[0])
244
245 epilog = 'This program is part of {app}.'.format(app=settings.APPNAME)
246 argparse.ArgumentParser.__init__(self,
247 prog=tool,
248 description=description,
249 epilog=epilog,
250 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
251 # sneak in a special hidden option to log query text.
252 # Not sure if a good idea because there's no way for the user to see this -
253 # maybe we can add a --long-help option,
254 # or maybe it's better to rely on environment variables for debug options
255 # but make sure they can be clearly enumerated in settings tool
256 self.add_argument('--debug-show-queries',
257 action='store_true',
258 help=argparse.SUPPRESS)
259
260 def error(self, message):
261 """Keep pylint happy."""
262 argparse.ArgumentParser.error(self, message)
263
264 # (disable diff args)
265 def parse_args(self, args=None, namespace=None, no_log_init=False): # pylint: disable=W0221
266 """Call through to base class."""
267 res = super(ArgumentParser, self).parse_args(args, namespace)
268 if res.debug_show_queries:
269 # this doesn't work
270 os.environ['CHART_SHOW_QUERIES'] = '1'
271
272 return self._decode(res, no_log_init)
273
274 def parse_known_args(self, args=None, namespace=None):
275 """Call through to base class."""
276 res, sub_res = super(ArgumentParser, self).parse_known_args(args, namespace)
277 # return self._decode(res), sub_res
278 # warning: parse_args calls parse_known args
279 # _decode is not safe when called twice on the namespace
280 # as a temporary hack, clients who call parse_known_args can manually call _decode
281 # on the result.
282 # There's probably a better fix but only a couple of tools need it
283 return res, sub_res
284
285 def _decode(self, res, no_log_init=False):
286 """San the configured parameter options looking types of start, stop and sid
287 where found, transfer the actual user inputs to the input_* variables."""
288 from chart.common.traits import is_listlike
289
290 # text, user specified start value
291 input_start = None
292
293 # text user stop value
294 input_stop = None
295
296 # text, user scid value
297 input_sid = None
298
299 input_period = None
300
301 # destination variable for the start time
302 dest_start = None
303
304 # destination variable for the stop time
305 dest_stop = None
306
307 # scan the configured parameter options looking types of start, stop and sid
308 # where found, transfer the actual user inputs to the input_* variables
309 for a in self._actions:
310 if a.type == ArgumentParser.start_time: # is?
311 input_start = getattr(res, a.dest)
312 dest_start = a.dest
313
314 elif a.type == ArgumentParser.stop_time:
315 input_stop = getattr(res, a.dest)
316 dest_stop = a.dest
317
318 elif a.type == ArgumentParser.period:
319 input_period = getattr(res, a.dest)
320
321 # elif a.type == ArgumentParser.sid:
322 # input_sid = getattr(res, a.dest)
323
324 elif a.type == ArgumentParser.sid:
325 # sid can be a list of a single object
326 s = getattr(res, a.dest)
327 if s is None:
328 input_sid = None
329
330 elif is_listlike(s): # and len(s) > 1:
331 # raise NotImplementedError('Cannot decode multiple sids')
332 input_sid = [user_str for user_str in s]
333
334 elif s is not None:
335 input_sid = s
336
337 else:
338 input_sid = None
339
340 if input_period is not None:
341 from chart.common.texttime import decode_period
342 period = decode_period(input_period)
343 if period is None:
344 raise ValueError(input_period)
345
346 setattr(res, dest_start, period[0])
347 setattr(res, dest_stop, period[1])
348
349 else:
350 # now attempt to interpret what we have and repack it into the final namespace
351 # (the return value the user sees from parse_args())
352 if dest_start is not None or dest_stop is not None:
353 # avoid circular dependencies
354 from chart.common.texttime import texttime_to_timerange
355
356 # This could throw an exception, but we let it pass here since the user
357 # will soon see it
358
359 start_time, stop_time = texttime_to_timerange(
360 input_start,
361 input_stop,
362 input_sid[0] if is_listlike(input_sid) else input_sid)
363
364 if dest_start is not None:
365 setattr(res, dest_start, start_time)
366
367 if dest_stop is not None:
368 setattr(res, dest_stop, stop_time)
369
370 # Configure built in logging system
371 # (nothing do to with argument parsing, just a convenient place to call it)
372 if not no_log_init:
373 from chart.common.log import init_log
374 init_log()
375
376 return res