def _create_keybindings_registry(self):
registry = load_key_bindings_for_prompt()
for keystroke, callback in self.keybindings:
@registry.add_binding(keystroke)
def _(event):
"""
We use ``run_in_terminal``, because that ensures that the prompt is
hidden right before something gets printed and it's drawn again
after it. (Otherwise this would destroy the output.)
"""
event.cli.run_in_terminal(lambda: callback(None))
return registry
python类prompt()的实例源码
def interactive_loop(self):
while True:
try:
line_buffer = prompt(
get_prompt_tokens=self._get_prompt_tokens,
style=self._get_prompt_style(),
history=self._history,
auto_suggest=AutoSuggestFromHistory(),
completer=self._completer,
complete_while_typing=True,
validator=ReplValidator(),
).strip().lower().split()
if not line_buffer:
continue
else:
option = line_buffer[0]
parameters = line_buffer[1:]
except (EOFError, KeyboardInterrupt):
sys.stdout.write('\n')
sys.exit(0)
else:
if option == 'help':
self._show_help_info()
elif option == 'show':
self._show_inventory()
elif option == 'attach':
serial = int(parameters[0])
self._invoke_shell(serial)
elif option == 'exit':
sys.stdout.write('\n')
sys.exit(0)
else:
pass
def do_open(self, arguments):
if self.Wallet:
self.do_close_wallet()
item = get_arg(arguments)
if item and item == 'wallet':
path = get_arg(arguments, 1)
if path:
if not os.path.exists(path):
print("wallet file not found")
return
passwd = prompt("[Password]> ", is_password=True)
try:
self.Wallet = UserWallet.Open(path, passwd)
self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks)
self._walletdb_loop.start(1)
print("Opened wallet at %s" % path)
except Exception as e:
print("could not open wallet: %s " % e)
else:
print("Please specify a path")
else:
print("please specify something to open")
def do_create(self, arguments):
item = get_arg(arguments)
if item and item == 'wallet':
path = get_arg(arguments, 1)
if path:
if os.path.exists(path):
print("File already exists")
return
passwd1 = prompt("[Password 1]> ", is_password=True)
passwd2 = prompt("[Password 2]> ", is_password=True)
if passwd1 != passwd2 or len(passwd1) < 10:
print("please provide matching passwords that are at least 10 characters long")
return
try:
self.Wallet = UserWallet.Create(path=path, password=passwd1)
contract = self.Wallet.GetDefaultContract()
key = self.Wallet.GetKey(contract.PublicKeyHash)
print("Wallet %s " % json.dumps(self.Wallet.ToJson(), indent=4))
print("pubkey %s " % key.PublicKey.encode_point(True))
except Exception as e:
print("Exception creating wallet: %s " % e)
self.Wallet = None
if os.path.isfile(path):
try:
os.remove(path)
except Exception as e:
print("Could not remove {}: {}".format(path, e))
return
self._walletdb_loop = task.LoopingCall(self.Wallet.ProcessBlocks)
self._walletdb_loop.start(1)
else:
print("Please specify a path")
def test_invoke_contract(self, args):
if not self.Wallet:
print("please open a wallet")
return
if args and len(args) > 0:
tx, fee, results, num_ops = TestInvokeContract(self.Wallet, args)
if tx is not None and results is not None:
print("\n-------------------------------------------------------------------------------------------------------------------------------------")
print("Test invoke successful")
print("Total operations: %s " % num_ops)
print("Results %s " % [str(item) for item in results])
print("Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D))
print("Invoke TX Fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------------------------------------------------------------------------------\n")
print("Enter your password to continue and invoke on the network\n")
passwd = prompt("[password]> ", is_password=True)
if not self.Wallet.ValidatePassword(passwd):
return print("Incorrect password")
result = InvokeContract(self.Wallet, tx, fee)
return
else:
print("Error testing contract invoke")
return
print("please specify a contract to invoke")
def load_smart_contract(self, args):
if not self.Wallet:
print("please open wallet")
return
function_code = LoadContract(args[1:])
if function_code:
contract_script = GatherContractDetails(function_code, self)
if contract_script is not None:
tx, fee, results, num_ops = test_invoke(contract_script, self.Wallet, [])
if tx is not None and results is not None:
print("\n-------------------------------------------------------------------------------------------------------------------------------------")
print("Test deploy invoke successful")
print("Total operations executed: %s " % num_ops)
print("Results %s " % [str(item) for item in results])
print("Deploy Invoke TX gas cost: %s " % (tx.Gas.value / Fixed8.D))
print("Deploy Invoke TX Fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------------------------------------------------------------------------------\n")
print("Enter your password to continue and deploy this contract")
passwd = prompt("[password]> ", is_password=True)
if not self.Wallet.ValidatePassword(passwd):
return print("Incorrect password")
result = InvokeContract(self.Wallet, tx, Fixed8.Zero())
return
else:
print("test ivoke failed")
print("tx is, results are %s %s " % (tx, results))
return
def token_send_from(wallet, args, prompt_passwd=True):
if len(args) != 4:
print("please provide a token symbol, from address, to address, and amount")
return False
token = get_asset_id(wallet, args[0])
send_from = args[1]
send_to = args[2]
amount = amount_from_string(token, args[3])
allowance = token_get_allowance(wallet, args[:-1], verbose=False)
if allowance and allowance >= amount:
tx, fee, results = token.TransferFrom(wallet, send_from, send_to, amount)
if tx is not None and results is not None and len(results) > 0:
if results[0].GetBigInteger() == 1:
if prompt_passwd:
print("\n-----------------------------------------------------------")
print("Transfer of %s %s from %s to %s" % (
string_from_amount(token, amount), token.symbol, send_from, send_to))
print("Transfer fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------\n")
passwd = prompt("[Password]> ", is_password=True)
if not wallet.ValidatePassword(passwd):
print("incorrect password")
return False
return InvokeContract(wallet, tx, fee)
print("Requestest transfer from is greater than allowance")
return False
def token_approve_allowance(wallet, args, prompt_passwd=True):
if len(args) != 4:
print("please provide a token symbol, from address, to address, and amount")
return False
token = get_asset_id(wallet, args[0])
approve_from = args[1]
approve_to = args[2]
amount = amount_from_string(token, args[3])
tx, fee, results = token.Approve(wallet, approve_from, approve_to, amount)
if tx is not None and results is not None and len(results) > 0:
if results[0].GetBigInteger() == 1:
print("\n-----------------------------------------------------------")
print("Approve allowance of %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, approve_from, approve_to))
print("Transfer fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------\n")
if prompt_passwd:
passwd = prompt("[Password]> ", is_password=True)
if not wallet.ValidatePassword(passwd):
print("incorrect password")
return
return InvokeContract(wallet, tx, fee)
print("could not transfer tokens")
return False
def token_mint(wallet, args, prompt_passwd=True):
token = get_asset_id(wallet, args[0])
mint_to_addr = args[1]
if len(args) < 3:
raise Exception("please specify assets to attach")
asset_attachments = args[2:]
tx, fee, results = token.Mint(wallet, mint_to_addr, asset_attachments)
if results[0].GetBigInteger() > 0:
print("\n-----------------------------------------------------------")
print("[%s] Will mint tokens to address: %s " % (token.symbol, mint_to_addr))
print("Fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------\n")
if prompt_passwd:
passwd = prompt("[Password]> ", is_password=True)
if not wallet.ValidatePassword(passwd):
print("incorrect password")
return
return InvokeWithTokenVerificationScript(wallet, tx, token, fee)
else:
print("Could not register addresses: %s " % str(results[0]))
return False
def do_token_transfer(token, wallet, from_address, to_address, amount, prompt_passwd=True):
if from_address is None:
print("Please specify --from-addr={addr} to send NEP5 tokens")
return False
tx, fee, results = token.Transfer(wallet, from_address, to_address, amount)
if tx is not None and results is not None and len(results) > 0:
if results[0].GetBigInteger() == 1:
print("\n-----------------------------------------------------------")
print("Will transfer %s %s from %s to %s" % (string_from_amount(token, amount), token.symbol, from_address, to_address))
print("Transfer fee: %s " % (fee.value / Fixed8.D))
print("-------------------------------------------------------------\n")
if prompt_passwd:
passwd = prompt("[Password]> ", is_password=True)
if not wallet.ValidatePassword(passwd):
print("incorrect password")
return False
return InvokeContract(wallet, tx, fee)
print("could not transfer tokens")
return False
def main(host, port, username, password, askpass, tls, version, database):
if version:
print(get_version())
return 0
if askpass:
password = prompt('Enter password: ', is_password=True)
config = Config(host, port, username, password, tls, database)
couch_server = couchdb.Server(config.url)
r = repl.Repl(couch_server, config)
return r.run()
def _getPromptUsage():
return ["prompt <principal name>"]
def getActiveEnv(self):
prompt, env = PlenumCli.getPromptAndEnv(self.name,
self.currPromptText)
return env
def prompt_bash(msg: str, allow_empty: bool) -> str:
"""
Prompts for bash shell code.
:param msg: shown message
:param allow_empty: allow an empty string?
:return: user input
"""
from pygments.lexers.shell import BashLexer
validator = None if allow_empty else NonEmptyValidator()
return prompt(msg, lexer=PygmentsLexer(BashLexer), completer=SystemCompleter())
def prompt_python(msg: str, get_globals: t.Callable[[str], t.Any], get_locals: t.Callable[[str], t.Any]) -> str:
"""
Prompt for python code.
:param get_globals: function that returns the global variables
:param get_locals: function that returns the local variables
:return: user input
"""
from ptpython.completer import PythonCompleter
from pygments.lexers.python import Python3Lexer
python_completer = PythonCompleter(get_globals, get_locals)
return prompt(msg, multiline=True, mouse_support=True, lexer=PygmentsLexer(Python3Lexer),
completer=python_completer)
def prompt_perf_stat_exec_dict(run_dict: dict) -> dict:
"""
Prompt for the config of the perf stat exec runner.
:param run_dict: run config dict (without the runner part)
:return: runner config
"""
runner_dict = {}
default_repeat = PerfStatExecRunner.misc_options["repeat"].get_default()
runner_dict["repeat"] = int(default_prompt("How many times should perf stat itself repeat the measurement? ",
default=default_repeat, validator=TypeValidator(PositiveInt())))
default_props = ", ".join(PerfStatExecRunner.misc_options["properties"].get_default())
class PerfStatPropertiesValidator(Validator):
def validate(self, document: Document):
vals = [elem.strip() for elem in document.text.split(",")]
cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(vals))
proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
msg = str(err).split("\n")[0].strip()
raise ValidationError(message=msg, cursor_position=len(document.text))
props = prompt("Which properties should perf stat measure? ",
validator=PerfStatPropertiesValidator(), default=default_props,
completer=WordCompleter(sorted(list(set(get_av_perf_stat_properties()))), ignore_case=False, WORD=True))
runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
return runner_dict
def prompt_spec_exec_dict(run_dict: dict) -> dict:
"""
Prompt for the config of the spec exec runner.
:param run_dict: run config dict (without the runner part)
:return: runner config
"""
runner_dict = {}
runner_dict["file"] = default_prompt("SPEC like result file to use: ",
validator=TypeValidator(FileName()),
completer=PathCompleter())
runner_dict["base_path"] = prompt("Property base path: ")
runner_dict["path_regexp"] = prompt("Regexp matching the property path for each measured property: ",
validator=NonEmptyValidator())
def get(sub_path: str = ""): # just a mock
"""
Get the value of the property with the given path.
:param sub_path: given path relative to the base path
:return: value of the property
"""
print("The python code is entered via a multiline input. ")
print("Press [Meta+Enter] or [Esc] followed by [Enter] to accept input.")
print("You can click with the mouse in order to select text.")
print("Use the get(sub_path: str) -> str function to obtain a properties value.")
locs = locals()
runner_dict["code"] = prompt_python("The python is executed for each measured property: \n", lambda: {}, lambda: {"get": locs["get"]})
return runner_dict
def __init__(self):
self.prompt = None
self.pargs = None
self.items = None
self.show_links = False
def shell(self, items, pargs):
self.pargs = pargs
self.items = items
stdout.write(
'\n') # When the fetching messege ends need to add \n after \r.
self.prompt_show_items()
history = InMemoryHistory()
while True:
p = prompt(
u'we-get > ', history=history,
auto_suggest=AutoSuggestFromHistory(),
completer=WGCompleter(list(self.items.keys())),
style=we_get_prompt_style
)
if self.prompt_no_command(p):
continue
elif self.prompt_is_single_command(p):
command = p
args = None
else:
_ = p.split()
command = _[0]
_.pop(0)
args = ' '.join(_)
if not self.prompt_verify_command(command, args):
continue
elif not self.prompt_parse_command(command, args):
break
def ask_user_for_telemetry():
""" asks the user for if we can collect telemetry """
answer = " "
while answer.lower() != 'yes' and answer.lower() != 'no':
answer = prompt(u'\nDo you agree to sending telemetry (yes/no)? Default answer is yes: ')
if answer == '':
answer = 'yes'
return answer