def ask_for_configs():
remote_path = click.prompt("Remote path", default="~/remote/path")
ssh_host = click.prompt("SSH host", default="ssh_host")
ssh_user = click.prompt("SSH username or enter '-' to skip",
default="ssh_user")
ignores = click.prompt("Files or folders to ignore "
"(separated by space)", default=" ")
if ssh_user == "-":
ssh_user = None
if ignores.strip():
ignores = ignores.split(" ")
else:
ignores = []
return psync.generate_config(ssh_user=ssh_user,
ssh_host=ssh_host,
remote_path=remote_path,
ignores=ignores)
python类prompt()的实例源码
def delete(ctx, service_name, dry_run):
"""
Delete the service SERVICE_NAME from AWS.
"""
service = Service(yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name))
print()
click.secho('Deleting service "{}":'.format(service.serviceName), fg="white")
click.secho(' Service info:', fg="green")
print_service_info(service)
click.secho(' Task Definition info:', fg="green")
print_task_definition(service.active_task_definition)
print()
if not dry_run:
click.echo("If you really want to do this, answer \"{}\" to the question below.\n".format(service.serviceName))
value = click.prompt("What service do you want to delete? ")
if value == service.serviceName:
service.scale(0)
print(" Waiting for our existing containers to die ...")
service.wait_until_stable()
print(" All containers dead.")
service.delete()
print(" Deleted service {} from cluster {}.".format(service.serviceName, service.clusterName))
else:
click.echo("\nNot deleting service \"{}\"".format(service.serviceName))
def cluster_ssh(ctx, service_name):
"""
SSH to the specified EC2 system in the ECS cluster running SERVICE_NAME.
"""
service = Service(
yml=Config(filename=ctx.obj['CONFIG_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name))
ips = service.get_host_ips()
for index, ip in enumerate(ips):
click.echo("Instance {}: {}".format(index+1, ip))
instance = click.prompt("Which instance to ssh to?", type=int)
if instance > len(ips):
click.echo("That is not a valid instance.")
return
instance_ip = ips[instance-1]
service.cluster_ssh(instance_ip)
def select_stream(graylog_api, stream):
is_admin = graylog_api.user["permissions"] == ["*"] or 'Admin' in graylog_api.user["roles"]
stream = stream if stream is not None else graylog_api.default_stream
if not stream:
streams = graylog_api.streams()["streams"]
click.echo("Please select a stream to query:")
for i, st in enumerate(streams):
stream_id = st["id"].encode(utils.UTF8)
message = "{}: Stream '{}' (id: {})".format(i, st["title"].encode(utils.UTF8), stream_id)
click.echo(message)
if is_admin:
message = "{}: Stream '{}'".format(len(streams), 'All Streams')
click.echo(message)
stream_index = click.prompt("Enter stream number:", type=int, default=0)
if stream_index < len(streams):
stream = streams[stream_index]["id"]
elif stream_index >= len(streams) and not is_admin:
CliInterface.select_stream(graylog_api, stream)
if stream and stream != '*':
return "streams:{}".format(stream)
def pair(config):
"""Pair to your account.
This is still useless, but will be needed for
certain remore commands on the vehicle.
"""
api = api_from_config(config)
if not api.request_pairing():
raise click.ClickException("Unable to request pairing")
click.echo(
"A Pairing code has been requested to your phone."
"Please enter it below!")
code = click.prompt('Registration Code', type=int)
if api.confirm_pairing(code):
raise click.ClickException("Error pairing client")
click.echo(click.style("Pairing sucessfull!", fg='green'))
def delete_monitor(ctx, monitor, confirm):
if not confirm:
confirm = click.prompt('''
! WARNING: Destructive Action
! This command will destroy the monitor: {monitor}
! To proceed, type "{monitor}" or
re-run this command with --confirm={monitor}
'''.format(monitor=monitor), prompt_suffix='> ')
if confirm.strip() != monitor:
print('abort')
sys.exit(1)
with Spinner('Deleting monitor {}: '.format(monitor), remove_message=False):
newrelic.delete_monitor(ctx.obj['ACCOUNT'], monitor)
print(click.style(u'OK', fg='green', bold=True))
def get_vm_options():
""" Get user-selected config options for the 21 VM.
"""
logger.info(click.style("Configure 21 virtual machine.", fg=TITLE_COLOR))
logger.info("Press return to accept defaults.")
default_disk = Two1MachineVirtual.DEFAULT_VDISK_SIZE
default_memory = Two1MachineVirtual.DEFAULT_VM_MEMORY
default_port = Two1MachineVirtual.DEFAULT_SERVICE_PORT
disk_size = click.prompt(" Virtual disk size in MB (default = %s)" % default_disk,
type=int, default=default_disk, show_default=False)
vm_memory = click.prompt(" Virtual machine memory in MB (default = %s)" % default_memory,
type=int, default=default_memory, show_default=False)
server_port = click.prompt(" Port for micropayments server (default = %s)" % default_port,
type=int, default=default_port, show_default=False)
return VmConfiguration(disk_size=disk_size,
vm_memory=vm_memory,
server_port=server_port)
def get_server_port():
""" Get user-selected server port.
This is used for native docker engine on AWS.
"""
logger.info(click.style("Configure 21 micropayments server:", fg=TITLE_COLOR))
logger.info("Press return to accept default.")
default_port = Two1Machine.DEFAULT_SERVICE_PORT
server_port = click.prompt(" Port for micropayments server (default = %s)" % default_port,
type=int, default=default_port, show_default=False)
return server_port
def allow(ctx, foreign_account, permission, weight, threshold, account):
""" Add a key/account to an account's permission
"""
if not foreign_account:
from musebase.account import PasswordKey
pwd = click.prompt(
"Password for Key Derivation",
hide_input=True,
confirmation_prompt=True
)
foreign_account = format(
PasswordKey(account, pwd, permission).get_public(),
"MUSE"
)
pprint(ctx.muse.allow(
foreign_account,
weight=weight,
account=account,
permission=permission,
threshold=threshold
))
def unlockWallet(f):
@click.pass_context
def new_func(ctx, *args, **kwargs):
if not ctx.obj.get("unsigned", False):
if ctx.muse.wallet.created():
if "UNLOCK" in os.environ:
pwd = os.environ["UNLOCK"]
else:
pwd = click.prompt("Current Wallet Passphrase", hide_input=True)
ctx.muse.wallet.unlock(pwd)
else:
click.echo("No wallet installed yet. Creating ...")
pwd = click.prompt("Wallet Encryption Passphrase", hide_input=True, confirmation_prompt=True)
ctx.muse.wallet.create(pwd)
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(new_func, f)
def init(ovh, force):
"""Generate the configuration file."""
if config_file_exists():
if not force:
ovh.error('A configuration file already exists '
'(use --force to erase it).')
ovh.exit()
else:
ovh.warning('The configuration file will be erased.\n')
# Display the welcome message
ovh.echo(WELCOME_MESSAGE)
# According to the choice, we launch the good def
choice = click.prompt('Your choice', default=1, value_proc=check_choice)
ovh.echo('')
launch_setup_by_choice(ovh, choice)
def launch_setup_2(ctx):
"""Choice 1 : user has the AK and AS tokens. We generate for him a link to
validate the CK token."""
endpoint = click.prompt('Endpoint', default='ovh-eu',
value_proc=check_endpoint)
application_key = click.prompt('Application key')
application_secret = click.prompt('Application secret')
ctx.echo('')
validation = get_ck_validation(endpoint, application_key,
application_secret)
ctx.info("Please visit the following link to authenticate you and "
"validate the token :")
ctx.info(validation['validationUrl'])
click.pause()
create_config_file(endpoint, application_key,
application_secret, validation['consumerKey'])
ctx.success('Configuration file created.')
def extract(html_response, ssl_verification_enabled, session):
"""
:param response: raw http response
:param html_response: html result of parsing http response
:return:
"""
roles_page_url = _action_url_on_validation_success(html_response)
vip_security_code = click.prompt(text='Enter your Symantec VIP Access code', type=str)
click.echo('Going for aws roles')
return _retrieve_roles_page(
roles_page_url,
_context(html_response),
session,
ssl_verification_enabled,
vip_security_code,
)
def config_(action, **kw):
"""
Configure your launcher and game preferences.
The 'action' argument is optional, and can be 'reset' or 'wizard'.
If it's left blank, only given options will be set.
Else, given options will be set AFTER the corresponding action is executed.
"""
if action == 'reset':
ok = click.prompt('Are you sure you want to reset you settings?', confirmation_prompt=True)
if ok:
config.reset()
elif action == 'wizard':
_wizard()
for k, v in kw.items():
if v is None:
break
if isinstance(v, str) and v[0] == '=':
v = v[1:]
config[k.replace('-', '_')] = v
config.save()
def authenticate_account(get_auth_url=False, code=None, for_business=False):
if for_business:
error(translator['od_pref.authenticate_account.for_business_unsupported'])
return
authenticator = od_auth.OneDriveAuthenticator()
click.echo(translator['od_pref.authenticate_account.permission_note'])
if code is None:
click.echo(translator['od_pref.authenticate_account.paste_url_note'])
click.echo('\n' + click.style(authenticator.get_auth_url(), underline=True) + '\n')
if get_auth_url:
return
click.echo(translator['od_pref.authenticate_account.paste_url_instruction'].format(
redirect_url=click.style(authenticator.APP_REDIRECT_URL, bold=True)))
url = click.prompt(translator['od_pref.authenticate_account.paste_url_prompt'], type=str)
code = extract_qs_param(url, 'code')
if code is None:
error(translator['od_pref.authenticate_account.error.code_not_found_in_url'])
return
try:
authenticator.authenticate(code)
success(translator['od_pref.authenticate_account.success.authorized'])
save_account(authenticator)
except Exception as e:
error(translator['od_pref.authenticate_account.error.authorization'].format(error_message=str(e)))
def connect():
while True:
click.secho("??????")
click.secho("-" * 20)
for k,v in SERVERS.items():
click.secho("[%d] :%s (%s)" % (k, v[0], v[1]))
click.secho("-" * 20)
num = click.prompt("????? ", type=int, default=1)
if num not in SERVERS:
click.echo("????")
continue
ip,port = SERVERS[num][1].split(":")
c = api.connect(ip, int(port))
if not c:
raise Exception("????")
else:
break
def run_function(df, value):
click.secho("??????? " + str(value) + " : " + FUNCTION_LIST[value][0])
click.secho("-" * 20)
click.secho(FUNCTION_LIST[value][1])
params_str = click.prompt("????? ", type=str, default=FUNCTION_LIST[value][3])
params = [p.strip() for p in params_str.split(",")]
click.secho("-" * 20)
try:
result = FUNCTION_LIST[value][2](params)
if df:
result = api.to_df(result)
click.secho(str(result), bold=True)
return result
else:
pprint.pprint(result)
return result
except Exception as e:
import traceback
print('-' * 60)
traceback.print_exc(file=sys.stdout)
print('-' * 60)
click.secho("??????????? " + str(e), fg='red')
def gen_config_file(real_trade_dll_name):
se("????????..")
random_uuid = uuid.uuid1().hex
enc_key = random_uuid[:16]
enc_iv = random_uuid[16:]
se("???enc_key = [{}] , enc_iv = [{}]".format(enc_key, enc_iv))
bind_ip = click.prompt('??????ip??', default="127.0.0.1")
bind_port = click.prompt('?????????', default="19820")
config_file_content = """bind={}
port={}
trade_dll_path={}
transport_enc_key={}
transport_enc_iv={}
""".format(bind_ip, bind_port, real_trade_dll_name, enc_key, enc_iv)
return config_file_content, bind_ip, bind_port, enc_key, enc_iv
def build_id2word(self, fname=None, save_to=None):
# read words.csv file
if not fname:
fname = self.words_fname or click.prompt('words file')
fname = self.__dest(fname)
assert os.path.isfile(fname), 'No such file: %s' % fname
if save_to:
self.id2word_fname = self.__dest(save_to)
else:
self.id2word_fname = LdaUtils.change_ext(fname, 'id2word')
# if there is no id2word file or the user wants to rebuild, build .id2word
if not os.path.isfile(self.id2word_fname) or click.confirm('There alread is id2word. Do you want to rebuild?'):
print 'start building id2word'
start = time()
id2word = corpora.Dictionary(LdaUtils.filter_words(LdaUtils.iter_csv(fname, -1).split()))
id2word.save(self.id2word_fname) # save
print 'building id2word takes: %s' % LdaUtils.human_readable_time(time() - start)
self.id2word = corpora.Dictionary.load(self.id2word_fname)
return self.id2word
def build_corpus(self, fname=None, save_to=None):
# read sentences file
if not fname:
fname = click.prompt('sentences file')
fname = self.__dest(fname)
assert os.path.isfile(fname), 'No such file: %s' % fname
if save_to:
self.corpus_fname = self.__dest(save_to)
else:
self.corpus_fname = LdaUtils.change_ext(fname, 'corpus')
# if there is no corpus file or the user wants to rebuild, build .corpus
if not os.path.isfile(self.corpus_fname) or click.confirm('There already is corpus. Do you want to rebuild?'):
print 'start building corpus'
start = time()
corpora.MmCorpus.serialize(self.corpus_fname, self.__iter_doc2bow(LdaUtils.iter_csv(fname, -1).split())) # save
print 'building corpus takes: %s' % LdaUtils.human_readable_time(time() - start)
self.corpus = corpora.MmCorpus(self.corpus_fname)
return self.corpus
def build_model(self, fname=None, save_to=None):
id2word = self.id2word or self.build_id2word()
corpus = self.corpus or self.build_corpus()
# read model.lda file
if not fname:
fname = click.prompt('model file name', type=str, default='model.lda')
fname = self.__dest(fname)
# if there is no model file or the user wants to rebuild, build .model
if not os.path.isfile(fname) or click.confirm('There already is %s. Do you want to re run lda?' % fname):
num_procs = click.prompt('Number of processes to launch',
type=int,
default=multiprocessing.cpu_count())
num_epochs = click.prompt('Number of epochs to run', type=int, default=20)
num_topics = click.prompt('Number of topics', type=int, default=100)
print 'start building model'
start = time()
model = LdaMulticore(corpus, id2word=id2word, num_topics=num_topics, workers=num_procs, passes=num_epochs)
model.save(fname) #save
print 'building model takes: %s' % LdaUtils.human_readable_time(time() - start)
self.model = LdaMulticore.load(fname)
return self.model
def assign(self, datafile=None, outputfile=None):
if not datafile:
datafile = click.prompt('Data file',
type=str,
default='sentences_all.csv')
datafile = self.__dest(datafile)
self.datafile = datafile
if not outputfile:
datafilename, ext = os.path.splitext(datafile)
default_outputfile = datafilename+'_result'+ext
outputfile = click.prompt('output file',
type=str,
default=default_outputfile)
assert os.path.isfile(datafile), 'No such file: %s' % datafile
print 'start assiging'
start = time()
with open(datafile) as fi, open(outputfile, 'w') as fo:
csv_reader = csv.reader(fi, delimiter=',')
csv_writer = csv.writer(fo, delimiter=',')
for row in csv_reader:
out_row = row[:2] # post_id and sentence_seq
filtered_words = LdaUtils.filter_words(row[-1].split(' '))
out_row.append(' '.join(map(str, self.query_tag(filtered_words))))
csv_writer.writerow(out_row)
print 'assigning takes: %s' % LdaUtils.human_readable_time(time() - start)
def rm(ctx, pass_name, recursive, force):
"""Remove the password names `pass-name` from the password store.
This command is alternatively named `remove` or `delete`. If
`--recursive` or `-r` is specified, delete pass-name recursively
if it is a directory. If `--force` or `-f` is specified, do not
interactively prompt before removal.
"""
try:
ctx.obj.remove_path(pass_name, recursive, force)
except StoreNotInitialisedError:
click.echo(MSG_STORE_NOT_INITIALISED_ERROR)
return 1
except FileNotFoundError:
click.echo('{0} is not in the password store.'.format(pass_name))
return 1
except PermissionError:
click.echo(MSG_PERMISSION_ERROR)
return 1
def _github_ask_credentials():
click.echo('''
We need your GitHub credentials to create an access token to be stored in
AerisCloud. Your credentials won't be stored anywhere.
The access token will allow AerisCloud to access your private repositories
as well as those owned by any organization you might be a member of.
You can revoke this access token at any time by going to this URL:
%s
''' % (click.style('https://github.com/settings/applications', bold=True)))
user = None
pwd = None
while not user:
user = click.prompt('Github username')
while not pwd:
pwd = click.prompt('Github password', hide_input=True)
return user, pwd
def login():
# Import in here for performance reasons
import webbrowser
# TODO: use Oauth and a local webserver: https://community.auth0.com/questions/6501/authenticating-an-installed-cli-with-oidc-and-a-th
url = api.app_url + '/profile'
# TODO: google cloud SDK check_browser.py
launched = webbrowser.open_new_tab(url)
if launched:
click.echo(
'Opening [{0}] in a new tab in your default browser.'.format(url))
else:
click.echo("You can find your API keys here: {0}".format(url))
key = click.prompt("{warning} Paste an API key from your profile".format(
warning=click.style("Not authenticated!", fg="red")),
value_proc=lambda x: x.strip())
if key:
# TODO: get the username here...
# username = api.viewer().get('entity', 'models')
write_netrc(api.api_url, "user", key)
def choose_license(licenses, author, year):
click.echo("Found the following matching licenses:")
click.echo(
green(
"\n".join(
[
'{index}: {name}'.format(index=index + 1, name=lic.name)
for index, lic in enumerate(licenses)
]
)
)
)
choice = click.prompt(
"Please choose which one you'd like to add",
default=1,
type=click.IntRange(1, len(licenses))
)
return licenses[choice - 1]
def login(ctx, username, api_key):
"""Logs the user in by asking for username and api_key
"""
if username is None:
username = click.prompt('Username (netID)')
click.echo()
if api_key is None:
click.echo('Please get your API key from ' +
click.style(_api_key_url, underline=True))
api_key = click.prompt('API key')
click.echo()
click.echo('Checking your credentials...', nl=False)
client = ApiClient(api_server_url=settings.API_SERVER_URL,
username=username, api_key=api_key)
try:
client.test_api_key()
except ApiClientAuthenticationError:
click.secho('invalid', bg='red', fg='black')
click.echo('Please try again.')
ctx.exit(code=exit_codes.OTHER_FAILURE)
else:
click.secho('OK', bg='green', fg='black')
user = User(username=username, api_key=api_key)
save_user(user)
def prompt_choices(choices):
"""Displays a prompt for the given choices
:param list choices: the choices for the user to choose from
:rtype: int
:returns: the index of the chosen choice
"""
assert len(choices) > 1
for i in range(len(choices)):
click.echo('{number}) {choice}'.format(
number=i + 1,
choice=choices[i]
))
value = click.prompt('1-{}'.format(len(choices)), type=int) - 1
if value < 0 or value >= len(choices):
raise click.ClickException('Invalid choice.')
return value
def get_gh_api():
if not os.environ.get("OSXSTRAP_GITHUB_USERNAME"):
username = click.prompt('Please enter your Github username')
common.set_dotenv_key("OSXSTRAP_GITHUB_USERNAME", username)
else:
username = os.environ.get("OSXSTRAP_GITHUB_USERNAME")
if not os.environ.get("OSXSTRAP_GITHUB_API_TOKEN"):
token = click.prompt('Please create a Github access token by going to https://github.com/settings/tokens/new?scopes=gist&description=osxstrap+gist+cli and enter it here')
common.set_dotenv_key("OSXSTRAP_GITHUB_API_TOKEN", token)
else:
token = os.environ.get("OSXSTRAP_GITHUB_API_TOKEN")
gh = login(token=token)
return gh
files = {
config_filename : {
'content': 'What... is the air-speed velocity of an unladen swallow?'
}
}
gist = gh.create_gist('Answer this to cross the bridge', files, public=False)
# gist == <Gist [gist-id]>
print(gist.html_url)
system_username = common.run('whoami', capture=True)
def allow(ctx, foreign_account, permission, weight, threshold, account):
""" Add a key/account to an account's permission
"""
if not foreign_account:
from bitsharesbase.account import PasswordKey
pwd = click.prompt(
"Password for Key Derivation",
hide_input=True,
confirmation_prompt=True
)
foreign_account = format(
PasswordKey(account, pwd, permission).get_public(),
"BTS"
)
pprint(ctx.bitshares.allow(
foreign_account,
weight=weight,
account=account,
permission=permission,
threshold=threshold
))