def __init__(self, option):
if option.choices:
type_ = click.Choice(option.choices)
elif option.min is not None or option.max is not None:
type_ = click.IntRange(min=option.min, max=option.max)
else:
type_ = TYPES[option.type_name]
super().__init__(
param_decls=option.cli, help=option.help,
default=None, type=type_)
self.metadata = option.metadata
python类Choice()的实例源码
def test_string_with_choices_type(self):
class Option(StringOption):
metadata = 'name'
choices = ['one', 'two', 'three']
cli = ['--name']
click_option = ClickObjectOption(Option)
self.assertIsInstance(click_option.type, click.Choice)
self.assertEqual(click_option.type.choices, ['one', 'two', 'three'])
def _add_format_option(self, command, action):
"""
Add format option.
This option defines the format type of the command output. This
option is attached only on `ListCommand` and `RetrieveCommand`.
:param command: Command instance.
:param action: Action type, e.g. 'list', 'retrieve', etc.
"""
if action in self.READ_ACTIONS:
return click.option(
'--format', type=click.Choice(['json', 'table']),
default='json')(command)
def init():
"""Return top level command handler"""
@click.command()
@click.option('--cell', required=True,
envvar='TREADMILL_CELL',
callback=cli.handle_context_opt,
expose_value=False)
@click.option('--once', help='Run once.', is_flag=True, default=False)
@click.option('--interval', help='Wait interval between checks.',
default=_DEFAULT_INTERVAL)
@click.option('--threshold', help='Number of failed checks before reap.',
default=1)
@click.option('--proto', help='Endpoint protocol.', default='tcp',
type=click.Choice(['tcp', 'udp']))
@click.argument('pattern')
@click.argument('endpoint')
@click.argument('command', nargs=-1)
def reaper(once, interval, threshold, proto, pattern, endpoint, command):
"""Removes unhealthy instances of the app.
The health check script reads from STDIN and prints to STDOUT.
The input it list of instance host:port, similar to discovery.
Output - list of instances that did not pass health check.
For example, specifying awk '{print $1}' as COMMAND will remove all
instances.
"""
command = list(command)
failed = collections.Counter()
while True:
failed.update(_health_check(pattern, proto, endpoint, command))
for instance, count in failed.items():
_LOGGER.info('Failed: %s, count: %s', instance, count)
reaped = _reap([instance for instance, count in failed.items()
if count > threshold])
for instance in reaped:
del failed[instance]
if once:
break
time.sleep(utils.to_seconds(interval))
return reaper
def explain_group(parent):
"""Scheduler explain CLI group."""
def _print_frame(df):
"""Prints dataframe."""
if not df.empty:
pd.set_option('display.max_rows', None)
pd.set_option('float_format', lambda f: '%f' % f)
pd.set_option('expand_frame_repr', False)
print(df.to_string(index=False))
@parent.group()
def explain():
"""Explain scheduler internals"""
pass
@explain.command()
@click.option('--instance', help='Application instance')
@click.option('--partition', help='Cell partition', default='_default')
@cli.admin.ON_EXCEPTIONS
def queue(instance, partition):
"""Explain the application queue"""
cell_master = make_readonly_master()
frame = reports.explain_queue(cell_master.cell,
partition,
pattern=instance)
_print_frame(frame)
@explain.command()
@click.argument('instance')
@click.option('--mode', help='Tree traversal method',
type=click.Choice(reports.WALKS.keys()), default='default')
@cli.admin.ON_EXCEPTIONS
def placement(instance, mode):
"""Explain application placement"""
cell_master = make_readonly_master()
if instance not in cell_master.cell.apps:
cli.bad_exit('Instance not found.')
app = cell_master.cell.apps[instance]
if app.server:
cli.bad_exit('Instace already placed on %s' % app.server)
frame = reports.explain_placement(cell_master.cell, app, mode)
_print_frame(frame)
del queue
del placement
def init():
"""Top level command handler."""
@click.command()
@click.option('-r', '--register', required=False, default=False,
is_flag=True, help='Register as /nodeinfo in Zookeeper.')
@click.option('-p', '--port', required=False, default=0)
@click.option('-a', '--auth', type=click.Choice(['spnego']))
@click.option('-m', '--modules', help='API modules to load.',
required=False, type=cli.LIST)
@click.option('-t', '--title', help='API Doc Title',
default='Treadmill Nodeinfo REST API')
@click.option('-c', '--cors-origin', help='CORS origin REGEX')
def server(register, port, auth, modules, title, cors_origin):
"""Runs nodeinfo server."""
if port == 0:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 0))
port = sock.getsockname()[1]
sock.close()
hostname = sysinfo.hostname()
hostport = '%s:%s' % (hostname, port)
if register:
zkclient = context.GLOBAL.zk.conn
zkclient.add_listener(zkutils.exit_on_lost)
appname = 'root.%s#%010d' % (hostname, os.getpid())
path = z.path.endpoint(appname, 'tcp', 'nodeinfo')
_LOGGER.info('register endpoint: %s %s', path, hostport)
zkutils.create(zkclient, path, hostport,
acl=[_SERVERS_ACL],
ephemeral=True)
_LOGGER.info('Starting nodeinfo server on port: %s', port)
utils.drop_privileges()
api_paths = []
if modules:
api_paths = api.init(modules, title.replace('_', ' '), cors_origin)
rest_server = rest.TcpRestServer(port, auth_type=auth,
protect=api_paths)
rest_server.run()
return server
def init():
"""App main."""
# pylint: disable=W0612
@click.command()
@click.option('-p', '--port', help='Port for TCP server')
@click.option('-s', '--socket', help='Socket for UDS server')
@click.option('-a', '--auth', type=click.Choice(['spnego']))
@click.option('-t', '--title', help='API Doc Title',
default='Treadmill REST API')
@click.option('-c', '--cors-origin', help='CORS origin REGEX',
required=True)
@click.option('--workers', help='Number of workers',
default=5)
@click.option('--interval', help='interval to refresh cgroups',
default=60)
def server(port, socket, auth, title, cors_origin, workers, interval):
"""Create pge server to provide authorize service."""
(base_api, cors) = api.base_api(title, cors_origin)
endpoint = cgroup_api.init(base_api, cors, interval=interval)
if not endpoint.startswith('/'):
endpoint = '/' + endpoint
if port:
rest_server = rest.TcpRestServer(
port, auth_type=auth,
protect=[endpoint],
workers=workers
)
# TODO: need to rename that - conflicts with import socket.
elif socket:
rest_server = rest.UdsRestServer(socket)
else:
click.echo('port or socket must be specified')
sys.exit(1)
try:
rest_server.run()
except sock.error as sock_err:
print(sock_err)
if sock_err.errno == errno.EADDRINUSE:
# TODO: hack, but please keep it for now, otherwise on the
# setup several master processes run on same server
# lookup api (listen on port 8080) is in tight loop.
time.sleep(5)
return server
def load_dump(collection_dump, collection_name):
"""
Restore an index from a dump file.
:param collection_dump: Path to a local gzipped dump to load.
:param collection_name: Name for the local index to restore the dump to. Optional; will be derived from the dump name, at your own risk. Note that the pipeline will add a "owa_" prefix string to the collection name, to ensure the proper mapping and settings are applied.
"""
available_dumps = glob(os.path.join(LOCAL_DUMPS_DIR, '*/*.gz'))
if not collection_dump:
choices = []
for i, dump in enumerate(available_dumps):
choices.append(unicode(i+1))
click.secho('{i}) {dump}'.format(i=i+1, dump=dump), fg='green')
dump_idx = click.prompt('Choose one of the dumps listed above',
type=click.Choice(choices))
collection_dump = available_dumps[int(dump_idx) - 1]
collection = os.path.abspath(collection_dump)
collection_id = '_'.join(collection.split('/')[-1].split('.')[0].split('_')[:2])
if not collection_name:
collection_name = collection_id.replace('owa_', '')
source_definition = {
'id': collection_id,
'extractor': 'ocd_backend.extractors.staticfile.StaticJSONDumpExtractor',
'transformer': 'ocd_backend.transformers.BaseTransformer',
'loader': 'ocd_backend.loaders.ElasticsearchLoader',
'item': 'ocd_backend.items.LocalDumpItem',
'dump_path': collection,
'index_name': collection_name
}
click.secho(str(source_definition), fg='yellow')
setup_pipeline(source_definition)
click.secho('Queued items from {}. Please make sure your Celery workers'
' are running, so the loaded items are processed.'.format(collection),
fg='green')
# Register commands explicitly with groups, so we can easily use the docstring
# wrapper
def option_performance(f):
"""Defines options for all aspects of performance tuning"""
_preset = {
# Fixed
'O1': {'autotune': True, 'dse': 'basic', 'dle': 'basic'},
'O2': {'autotune': True, 'dse': 'advanced', 'dle': 'advanced'},
'O3': {'autotune': True, 'dse': 'aggressive', 'dle': 'advanced'},
# Parametric
'dse': {'autotune': True,
'dse': ['basic', 'advanced', 'speculative', 'aggressive'],
'dle': 'advanced'},
'dle': {'autotune': True,
'dse': 'advanced',
'dle': ['basic', 'advanced']}
}
def from_preset(ctx, param, value):
"""Set all performance options according to bench-mode preset"""
ctx.params.update(_preset[value])
return value
def from_value(ctx, param, value):
"""Prefer preset values and warn for competing values."""
return ctx.params[param.name] or value
options = [
click.option('-bm', '--bench-mode', is_eager=True,
callback=from_preset, expose_value=False, default='O2',
type=click.Choice(['O1', 'O2', 'O3', 'dse', 'dle']),
help='Choose what to benchmark; ignored if execmode=run'),
click.option('--arch', default='unknown',
help='Architecture on which the simulation is/was run'),
click.option('--dse', callback=from_value,
type=click.Choice(['noop'] + configuration._accepted['dse']),
help='Devito symbolic engine (DSE) mode'),
click.option('--dle', callback=from_value,
type=click.Choice(['noop'] + configuration._accepted['dle']),
help='Devito loop engine (DLE) mode'),
click.option('-a', '--autotune', is_flag=True, callback=from_value,
help='Switch auto tuning on/off'),
]
for option in reversed(options):
f = option(f)
return f