def post_message(self, channel_name):
# self.channels_history(channel_name)
os.system("echo '\u001b[1m\u001b[31m To mention a user write @ while chatting \u001b[0m'")
text = prompt("your message > ", completer=WordCompleter(users))
channel_id = self.find_channel_id(channel_name)
url = "https://slack.com/api/chat.postMessage?token={token}&channel={channel_id}&text={text}&as_user=true&link_names=1".format(
token=settings.token,
text=text,
channel_id=channel_id)
response = requests.get(url).json()
# TODO : retrieve message history and print to screen while chatting
if response["ok"]:
os.system("figlet 'Sent' | lolcat")
time.sleep(2)
os.system("clear")
else:
print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
python类prompt()的实例源码
def channels_invite(self, channel_name):
channel_id = self.find_channel_id(channel_name)
invites = prompt("send invites -> ", completer=WordCompleter(users),
style=DocumentStyle)
for i in invites.split(" "):
user_id = self.find_user_id(i.strip("@"))
url = "https://slack.com/api/channels.invite?token={token}&channel={channel_id}&user={user}".format(
token=settings.token,
channel_id=channel_id,
user=user_id)
response = requests.get(url).json()
if response["ok"]:
os.system("figlet 'Invited " + i + "' | lolcat")
time.sleep(2)
os.system("clear")
else:
print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
def channels_create(self):
os.system("clear")
fields = ["name", "purpose(OPTINAL)", "send invite"]
for i in fields:
if i == "name":
name = raw_input("\u001b[1m\u001b[31m channel name -> \u001b[0m")
elif i == "purpose(OPTINAL)":
purpose = raw_input("\u001b[1m\u001b[31m purpose of the channel(OPTINAL) : \u001b[0m")
else:
invites = prompt("send invites -> ", completer=WordCompleter(users),
style=DocumentStyle)
url = "https://slack.com/api/channels.create?token={token}&name={name}&purpose={purpose}".format(
token=settings.token,
name=name,
purpose=purpose)
response = requests.get(url).json()
# TODO : send channel and user_id to channels_invite
if response["ok"]:
os.system("figlet 'Created' | lolcat")
time.sleep(2)
os.system("clear")
def main():
"""
Start the Slack Client
"""
os.system("clear; figlet 'Slack Gitsin' | lolcat")
history = FileHistory(os.path.expanduser("~/.slackHistory"))
while True:
text = prompt("slack> ", history=history,
auto_suggest=AutoSuggestFromHistory(),
on_abort=AbortAction.RETRY,
style=DocumentStyle,
completer=Completer(fuzzy_match=False,
text_utils=TextUtils()),
complete_while_typing=Always(),
get_bottom_toolbar_tokens=get_bottom_toolbar_tokens,
key_bindings_registry=manager.registry,
accept_action=AcceptAction.RETURN_DOCUMENT
)
slack = Slack(text)
slack.run_command()
def config_settings(event):
""" opens the configuration """
global PROMPTING
telemetry.track_key('F1')
PROMPTING = True
config = azclishell.configuration.CONFIGURATION
answer = ""
questions = {
"Do you want command descriptions" : "command_description",
"Do you want parameter descriptions" : "param_description",
"Do you want examples" : "examples"
}
for question in questions:
while answer.lower() != 'y' and answer.lower() != 'n':
answer = prompt(u'\n%s (y/n): ' % question)
config.set_val('Layout', questions[question], format_response(answer))
answer = ""
PROMPTING = False
print("\nPlease restart shell for changes to take effect.\n\n")
event.cli.set_return_value(event.cli.current_buffer)
def _process_input(self):
input_ = prompt('> ', get_bottom_toolbar_tokens=self._get_toolbar,
style=self.style, history=self.history,
completer=self.completer, complete_while_typing=False,
key_bindings_registry=self.registry)
input_ = input_.split(' ')
cmd = input_[0]
args = input_[1:]
if cmd == '':
return
try:
args = self.parser.parse_args(input_)
result = getattr(self, 'cmd_{}'.format(cmd))(args)
if result:
print(result)
except SystemExit:
pass
def interactive(self, context=None):
"""Runs interactive command line chat between user and bot. Runs
indefinitely until EOF is entered to the prompt.
context -- optional initial context. Set to {} if omitted
"""
if context is None:
context = {}
# input/raw_input are not interchangeable between Python 2 and 3
try:
input_function = raw_input
except NameError:
input_function = input
history = InMemoryHistory()
while True:
try:
message = prompt(INTERACTIVE_PROMPT, history=history, mouse_support=True).rstrip()
except (KeyboardInterrupt, EOFError):
return
print(self.message(message, context))
def BootstrapBlockchain():
current_chain_dir = settings.LEVELDB_PATH
bootstrap_file = settings.BOOTSTRAP_FILE
if bootstrap_file is None:
print("no bootstrap file specified. Please update your configuration file.")
sys.exit(0)
print("This will overwrite any data currently in %s.\nType 'confirm' to continue" % current_chain_dir)
confirm = prompt("[confirm]> ", is_password=False)
if confirm == 'confirm':
return do_bootstrap()
print("bootstrap cancelled")
sys.exit(0)
def _migrate_legacy_app_data_if_just_upgraded_and_user_agrees():
if is_cli_base_dir_untouched() and legacy_base_dir_exists():
print('Application data from previous Indy version has been found')
answer = prompt('Do you want to migrate it? [Y/n] ')
if not answer or answer.upper().startswith('Y'):
try:
combined_migration.migrate()
# Invalidate config caches to pick up overridden config
# parameters from migrated application data
invalidate_config_caches()
print('Application data has been migrated')
except Exception as e:
print('Error occurred when trying to migrate'
' application data: {}'.format(e))
traceback.print_exc()
print('Application data has not been migrated')
else:
print('Application data was not migrated')
def prompt_yesno(msg: str, default: bool = None, meta_dict: t.Dict[str, str] = None) -> bool:
"""
Prompt for simple yes or no decision.
:param msg: asked question
:param default: default value
:param meta_dict: mapping 'yes' or 'no' to further explanations
:return: user input converted to bool
"""
valid_words = ["yes", "no", "y", "n"]
if default is not None:
msg += "[" + ("y" if default else "n") + "] "
valid_words.append("")
completer = WordCompleter(["yes", "no"], ignore_case=True, meta_dict=meta_dict)
text = prompt(msg, completer=completer, display_completions_in_columns=True,
validator=WordValidator(valid_words, ignore_case=True))
if text == "":
return default
return text.lower().startswith("y")
def default_prompt(msg: str, default: t.Optional = None, **kwargs):
"""
Wrapper around prompt that shows a nicer prompt with a default value that isn't editable.
Interpretes the empty string as "use default value".
:param msg: message
:param default: default value
:param kwargs: arguments passed directly to the prompt function
:return: user input
"""
msg = message(msg, default)
if default is not None and "validator" in kwargs:
vali = kwargs["validator"]
if isinstance(vali, TypeValidator):
vali.allow_empty = True
if isinstance(vali, WordValidator):
vali.allow_empty = True
res = prompt(msg, **kwargs)
if res == "" and default is not None:
return default
return res
def prompt_attributes_dict(default_description: str = None) -> t.Dict[str, str]:
"""
Prompts for the contents of the attributes dict.
:param default_description: default value for the description attribute
:return: attributes dict
"""
attributes = {}
descr_msg = "Give a description for the current block: "
if default_description is not None:
attributes["description"] = default_prompt(descr_msg, default_description,
completer=WordCompleter([default_description]))
else:
attributes["description"] = prompt(descr_msg, validator=NonEmptyValidator())
try:
while prompt_yesno("Do you want to set or add another attribute? ", default=False):
name = prompt("Attribute name: ", validator=NonEmptyValidator(),
completer=WordCompleter(sorted(list(attributes.keys())), meta_dict=attributes))
default = attributes[name] if name in attributes else ""
attributes[name] = prompt("Attribute value: ", default=default, validator=NonEmptyValidator())
except KeyboardInterrupt:
pass
return attributes
def prompt_rusage_exec_dict(run_dict: dict) -> dict:
"""
Prompt for the config of the rusage exec runner.
:param run_dict: run config dict (without the runner part)
:return: runner config
"""
runner_dict = {}
default_props = ", ".join(RusageExecRunner.misc_options["properties"].get_default())
class RusagePropertiesValidator(Validator):
def validate(self, document: Document):
vals = [elem.strip() for elem in document.text.split(",")]
ret = verbose_isinstance(vals, ValidRusagePropertyList())
if not ret:
raise ValidationError(message=str(ret), cursor_position=len(document.text))
props = prompt("Which properties should be obtained from getrusage(1)? ",
validator=RusagePropertiesValidator(), default=default_props,
completer=WordCompleter(sorted(list(set(get_av_rusage_properties().keys()))),
meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True))
runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
return runner_dict
def prompt_time_exec_dict(run_dict: dict) -> dict:
"""
Prompt for the config of the time exec runner.
:param run_dict: run config dict (without the runner part)
:return: runner config
"""
runner_dict = {}
default_props = ", ".join(TimeExecRunner.misc_options["properties"].get_default())
class TimePropertiesValidator(Validator):
def validate(self, document: Document):
vals = [elem.strip() for elem in document.text.split(",")]
ret = verbose_isinstance(vals, ValidTimePropertyList())
if not ret:
raise ValidationError(message=str(ret), cursor_position=len(document.text))
props = prompt("Which properties should be obtained from gnu time? ",
validator=TimePropertiesValidator(), default=default_props,
completer=WordCompleter(sorted(list(set(get_av_time_properties().keys()))),
meta_dict=get_av_rusage_properties(), ignore_case=False, WORD=True))
runner_dict["properties"] = [prop.strip() for prop in props.split(",")]
return runner_dict
def read(self):
if self.multi_line:
self.multi_line = False
return prompt(u"", multiline=True, **self.prompt_args)
def get_prompt_tokens(_):
tokens = []
if self.tx is None:
tokens.append((Token.Prompt, "\n-> "))
else:
tokens.append((Token.Prompt, "\n-("))
tokens.append((Token.TxCounter, "{}".format(self.tx_counter)))
tokens.append((Token.Prompt, ")-> "))
return tokens
return prompt(get_prompt_tokens=get_prompt_tokens, **self.prompt_args)
def config_settings(event):
""" opens the configuration """
global PROMPTING
shell_telemetry.track_key('F1')
PROMPTING = True
config = azclishell.configuration.CONFIGURATION
answer = ""
questions = {
"Do you want command descriptions": "command_description",
"Do you want parameter descriptions": "param_description",
"Do you want examples": "examples"
}
for question in questions:
while answer.lower() != 'y' and answer.lower() != 'n':
answer = prompt(u'\n%s (y/n): ' % question)
config.set_val('Layout', questions[question], format_response(answer))
answer = ""
PROMPTING = False
print("\nPlease restart the interactive mode for changes to take effect.\n\n")
event.cli.set_return_value(event.cli.current_buffer)
def auto_complete_prompt(allowed_extensions, file_type):
os_walk = os.walk(".")
all_files = []
for cur_dir, child_dirs, files in os_walk:
if cur_dir == ".":
for file in files:
file_name, extension = os.path.splitext(file)
if (extension in allowed_extensions):
all_files.append(file)
break
auto_complete_files = WordCompleter(all_files, ignore_case=True)
video_file_name = prompt(
"Which {0} file you like to trim: ".format(file_type),
completer=auto_complete_files,
complete_while_typing=True)
return video_file_name
def trim_the_video():
allowed_extensions = [".mp4", ".mov", ".webm"]
video_file_name = auto_complete_prompt(allowed_extensions, "video")
start_time = prompt("Start time for the video: ")
end_time = prompt("End time for the video: ")
file_name, extension = os.path.splitext(video_file_name)
if extension not in allowed_extensions:
print("The file you specified isn't supported at the moment.")
print("Exiting now.")
exit()
command = (
"ffmpeg -i {0} -r 24 -ss {1} -to {2} trimmed-{0}.mp4"
).format(video_file_name, start_time, end_time, video_file_name)
args = shlex.split(command)
with Halo(text="Generating preface video...", spinner='earth'):
subprocess.run(args)
return "trimmed-{0}.mp4".format(video_file_name)
def confirm(prompt, yes=True):
"""Confirm with user input
:param prompt: Question or text that the user gets.
:type prompt: str
:param yes: If yes should be the default.
:type yes: bool
:returns: True if go ahead, False if stop
:rtype: bool
"""
import prompt_toolkit
result = prompt_toolkit.prompt(
prompt + ' (%s): ' % ('Y/n' if yes else 'y/N')
)
if yes:
return result not in ['N', 'n']
else:
return result not in ['Y', 'y']
def input(prompt, default=""):
"""Prompt user for input
:param prompt: Question or text that the user gets.
:type prompt: str
:param default: Default value to give if the user does not input anything
:type default: str
:returns: User input or default
:rtype: bool
"""
import prompt_toolkit
result = prompt_toolkit.prompt(
prompt + ' (%s): ' % (default)
)
return result if result else default
def tagger_repl(tagger, **kwargs):
mxlen = int(kwargs.get('mxlen', 100))
maxw = int(kwargs.get('maxw', 100))
#zeropad = int(kwargs.get('zeropad', 0))
prompt_name = kwargs.get('prompt', 'class> ')
history_file = kwargs.get('history_file', '.history')
history = FileHistory(history_file)
while True:
text = prompt(prompt_name, history=history)
text = text.strip()
if text == 'quit':
break
try:
tokens = text.split(' ')
best = tagger.predict_text(tokens, mxlen=mxlen, maxw=maxw)
print(best)
except Exception as e:
logging.exception('Error')
def test(self):
self._load_auth()
history = FileHistory('.history')
project_id = self._get_current_project_id()
bot = self._load_bot()
while True:
try:
line = prompt('BotHub> ', history=history)
if not line:
continue
event = make_event(line)
context = {}
bot.handle_message(event, context)
except (EOFError, KeyboardInterrupt):
break
except Exception:
traceback.print_exc()
def askYesNo(msg, default=None):
default = default and default.lower()
y = 'Y' if default == 'y' else 'y'
n = 'N' if default == 'n' else 'n'
prompt = msg + ' (%s/%s)? ' % (y, n)
value = None
while value is None:
value = input(prompt).lower()
if value == '' and default:
return default == 'y'
if value not in ('y', 'n', 'yes', 'no'):
value = None
return value in ('y', 'yes')
def prompt_path_if_none(arguments, key, key_desc, force_prompt=False, extension=""):
value = arguments[key] or jkv.get(key, None)
if not value or force_prompt:
confirm = False
while not confirm:
confirm = True
value = prompt(
message=key_desc+": ",
validator=NotEmptyValidator(error=key_desc+' required'),
completer=PathCompleter(
expanduser=True,
file_filter=has_extension(extension) ))
value = abspath(expanduser(value))
root, ext = splitext(value)
value = root + extension
print(value)
if not exists(value):
confirm = prompt_yn('Create {}? (Y/n): '.format(value))
jkv[key] = value
return value
def set_prompt_tokens(self, device_info: tuple) -> None:
"""
Set prompt tokens sourced from a command.device.device_info()
call.
:param device_info:
:return:
"""
device_name, system_name, model, system_version = device_info
self.prompt_tokens = [
(Token.Applicationname, device_name),
(Token.On, ' on '),
(Token.Devicetype, '(' + model + ': '),
(Token.Version, system_version + ') '),
(Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '),
]
def get_prompt_tokens(self, _) -> list:
"""
Return prompt tokens to use in the cli app.
If none were set during the init of this class, it
is assumed that the connection failed.
:param _:
:return:
"""
if self.prompt_tokens:
return self.prompt_tokens
return [
(Token.Applicationname, 'unknown application'),
(Token.On, ''),
(Token.Devicetype, ''),
(Token.Version, ' '),
(Token.Connection, '[' + state_connection.get_comms_type_string() + '] # '),
]
def _get_object_option_value_message(option, set_=None):
"""Retuns a message for object_option_value prompt."""
if option.default is not None:
default_msg = option.default
if option.type_name == 'boolean':
if default_msg:
default_msg = 'Y/n'
else:
default_msg = 'y/N'
msg = '{} [{}]'.format(option.verbose_name.title(), default_msg)
else:
msg = '{}'.format(option.verbose_name.title())
set_msg = ''
if set_ is not None:
set_msg = ' (installation set {})'.format(set_)
msg = '{}{}: '.format(msg, set_msg)
return msg
# pylint: disable=too-many-arguments
def file_upload(self, channel_name):
os.system("clear")
channel_id = self.find_channel_id(channel_name)
fields = ["file", "content", "filename", "title", "initial_comment"]
for i in fields:
if i == "file":
os.system("echo 'opening the file dialog. wait...' ")
file = subprocess.check_output(['zenity', '--file-selection'])
os.system("echo '\u001b[1m\u001b[31m file : \u001b[0m'" + file + "'")
elif i == "content":
content = raw_input("\u001b[1m\u001b[31m content : \u001b[0m")
elif i == "filename":
filename = raw_input("\u001b[1m\u001b[31m filename : \u001b[0m")
elif i == "title":
title = raw_input("\u001b[1m\u001b[31m title : \u001b[0m")
else:
initial_comment = prompt("add comment : ", completer=WordCompleter(users),
style=DocumentStyle)
url = "https://slack.com/api/files.upload?token={token}&content={content}&filename={filename}&channels={channel_id}&title={title}&initial_comment={initial_comment}".format(
token=settings.token,
content=content,
filename=filename,
channel_id=channel_id,
title=title,
initial_comment=initial_comment)
response = requests.get(url).json()
if response["ok"]:
os.system("figlet 'Uploaded!' | lolcat")
time.sleep(2)
os.system("clear")
else:
print "something goes wrong :( (\u001b[1m\u001b[31m " + response["error"] + "\u001b[0m)"
def ask_user_for_telemetry():
""" asks the user for if we can collect telemetry """
answer = " "
while answer.lower() != 'yes' and answer.lower() != 'no':
answer = prompt(u'\nDo you agree to sending telemetry (yes/no)? Default answer is yes: ')
if answer == '':
answer = 'yes'
return answer
def login(url, username, password, update_apidoc, display_meta):
s = requests.Session()
r = s.post('{}/api/rest/auth/login'.format(url),
json={'username': username, 'password': password})
rest = r.json()
if rest['status'] == 'login.failed':
click.secho('Fail to login, wrong username or password!', fg='red')
return
headers = {item: rest[item]
for item in ('token', 'apiToken', 'apiLicenseToken')}
s.headers = headers
if update_apidoc:
update_completions(s, url)
click.echo('Syntax: <command> [params] [options]')
click.echo('Press `Ctrl+D` to exit')
history = InMemoryHistory()
history.append('scm apiname')
history.append('help apiname')
while True:
try:
text = prompt(get_prompt_tokens=get_prompt_tokens,
completer=SCMCompleter(TextUtils(display_meta)),
auto_suggest=AutoSuggestFromHistory(),
style=DocumentStyle,
history=history,
on_abort=AbortAction.RETRY)
except EOFError:
break
process_command(s, url, text)