def test_start(self):
def start(bot, update):
update.message.reply_text('Hi!')
self.updater.dispatcher.add_handler(CommandHandler("start", start))
self.updater.start_polling()
# Here you can see how we would handle having our own user and chat
user = self.ug.get_user(first_name="Test", last_name="The Bot")
chat = self.cg.get_chat(user=user)
update = self.mg.get_message(user=user, chat=chat, text="/start")
self.bot.insertUpdate(update)
self.assertEqual(len(self.bot.sent_messages), 1)
sent = self.bot.sent_messages[0]
self.assertEqual(sent['method'], "sendMessage")
self.assertEqual(sent['text'], "Hi!")
self.updater.stop()
python类text()的实例源码
def test_echo(self):
def echo(bot, update):
update.message.reply_text(update.message.text)
self.updater.dispatcher.add_handler(MessageHandler(Filters.text, echo))
self.updater.start_polling()
update = self.mg.get_message(text="first message")
update2 = self.mg.get_message(text="second message")
self.bot.insertUpdate(update)
self.bot.insertUpdate(update2)
self.assertEqual(len(self.bot.sent_messages), 2)
sent = self.bot.sent_messages
self.assertEqual(sent[0]['method'], "sendMessage")
self.assertEqual(sent[0]['text'], "first message")
self.assertEqual(sent[1]['text'], "second message")
self.updater.stop()
def filef(bot,update):
# get file id
file = bot.getFile(update.message.document.file_id)
# create a randon temp file name
tf = tempfile.mkstemp()
# download file
file.download(tf[1])
# read file content
try:
tfco = open(tf[1],"r")
tfc = tfco.read()
tfco.close()
except Exception as e:
# if it is not text file
bot.sendMessage(chat_id=update.message.chat_id,reply_to_message_id=update.message.message_id,text=" ?? Please send me a text file")
# get user name for author
try:
author = update.message.from_user.first_name + " " + update.message.from_user.last_name
except Exception:
author = "Anonymous user"
# call paste function
url = paste(tfc , author)
# replay the url to user
bot.sendMessage(chat_id=update.message.chat_id,reply_to_message_id=update.message.message_id,text=url)
def start_consensus(bot, update):
"""Cleans/initializes data structures (runs new reasoning)
:param bot:
:param update: telegranm.ext.Update
:return:
"""
# TODO(scezar): right now requested format is /start_consensus int h
global message_stack
message_stack = []
operated_message = update.message.text
new_meeting_len = ''
for letter in operated_message:
if letter.isdigit():
new_meeting_len += letter
elif new_meeting_len:
global meeting_length
meeting_length = int(new_meeting_len)
return
def intent_extractor(bot, update):
"""
:param bot:
:param update:
:return: Parses the intent and calls the appropriate callback
"""
intent = intent_parser.extract_intent(update.message.text)
global state
feedback, give_reply, state = gf.give_feedback(intent,state)
if give_reply:
bot.sendMessage(update.message.chat_id, text=feedback)
if state.value < States.FINALIZING.value:
process_callback[intent](bot,update)
else:
finalize_schedule(bot,update)
def help(self, bot, update):
icon = emojize(":information_source: ", use_aliases=True)
text = icon + " The following commands are available:\n"
commands = [["/new", "Create new alarm"],
["/list", "List alarms, enable/disable and remove alarms"],
["/stop", "Stop all alarms"],
["/timezone", "Set the timezone (only works if sudo requires no password)"],
["/test", "Play an alarm to test"],
["/time", "Print time and timezone on device"],
["/help", "Get this message"]
]
for command in commands:
text += command[0] + " " + command[1] + "\n"
bot.send_message(chat_id=update.message.chat_id, text=text)
def start_command(self, bot, update, args):
user_id = update.message.from_user.id
chat= str(update.message.chat_id)
if str(user_id) in allowed_users.values() and chat != chats['group']:
self.store.new_draft(user_id)
bot.sendMessage(update.message.chat_id,parse_mode='Markdown',
text="Crearem una publicació per a compartir.\n\n\u0031\u20E3 El primer que heu de fer és enviar-me el *nom de la publicació*.\n\nSi no voleu continuar amb el procés, envieu /cancel.",
reply_markup=ReplyKeyboardHide())
else:
f_name = update.message.from_user.first_name
chat= str(update.message.chat_id)
if not function['production'] and chat != chats['group']:
bot.sendMessage(update.message.chat_id,
parse_mode='Markdown',
text= "Robot destinat a proves internes de Softcatalà. Si cerqueu el bot públic de Softcatalà el trobareu a @SoftcatalaBot.")
elif function['production'] and chat != chats['group']:
bot.sendMessage(update.message.chat_id,
text= str(f_name) + ", no teniu permisos per utilitzar aquesta ordre. Les ordres que teniu disponibles ara mateix són: /baixa /android /ios /tdesktop i /help.")
else:
bot.sendMessage(update.message.chat_id,
text= "No es permet crear publicacions des del grup.")
def help_command_callback(self, bot, update):
self.init_chat(update.message)
event_name = "help"
entities = update.message.parse_entities(types=[MessageEntity.BOT_COMMAND])
for entity_value in entities.values():
event_name = entity_value.replace("/", "").replace("@{}".format(self.bot_username), "")
break
self.log_and_botan_track(event_name, update.message)
chat_id = update.message.chat_id
chat_type = update.message.chat.type
reply_to_message_id = update.message.message_id
flood = self.chat_storage[str(chat_id)]["settings"]["flood"]
if chat_type != Chat.PRIVATE and flood == "no":
self.rant_and_cleanup(bot, chat_id, self.RANT_TEXT_PUBLIC, reply_to_message_id=reply_to_message_id)
else:
bot.send_message(chat_id=chat_id, text=self.HELP_TEXT,
parse_mode='Markdown', disable_web_page_preview=True)
def get_settings_inline_keyboard(self, chat_id):
mode = self.chat_storage[str(chat_id)]["settings"]["mode"]
flood = self.chat_storage[str(chat_id)]["settings"]["flood"]
emoji_yes = "?"
emoji_no = "?"
button_dl = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "dl" else emoji_no, "Download"]),
callback_data=" ".join(["settings", "dl"]))
button_link = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "link" else emoji_no, "Links"]),
callback_data=" ".join(["settings", "link"]))
button_ask = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "ask" else emoji_no, "Ask"]),
callback_data=" ".join(["settings", "ask"]))
button_flood = InlineKeyboardButton(text=" ".join([emoji_yes if flood == "yes" else emoji_no, "Flood"]),
callback_data=" ".join(["settings", "flood"]))
button_close = InlineKeyboardButton(text=" ".join([emoji_no, "Close settings"]),
callback_data=" ".join(["settings", "close"]))
inline_keyboard = InlineKeyboardMarkup([[button_dl, button_link, button_ask], [button_flood, button_close]])
return inline_keyboard
def help_me(bot, update):
response = '?????? ????????? ??????:\n' \
'/start - ???????? ????????????????? ????\n' \
'/help - ??????? ?? ????????????? ????\n\n' \
'??? ?????????? ?????? ??????? ??????? ?? ?????? "???????? ???????", ? ????? ???????:\n' \
'{???? ???????} {????? ???????} {???????? ???????}\n' \
'????????:\n' \
'01.01.2017 00:00 ?????????? ?????? ? ????? ?????\n' \
'< ??? >\n' \
'01.01.17 00:00 ?????????? ?????? ? ????? ?????\n\n' \
'?????, ?????????????? ??????????? ?????? ?????\n' \
'????? ???????? ???? ? ??????? ????? ??????? ???? ? ????? ' \
'??????????? ?? ?????? ?? ??????? ? ????????? ???? < #{?????????} >\n' \
'????????:\n' \
'08.02.2017 09:00 01.02.2017 20:00 #?????? ???? ???????? ??????????\n\n' \
'??? ???????? ??????? ??????? ?? ?????? "??????? ???????", ? ????? ??????? ??? ????????\n' \
'??? ????????? ??????? ????????? ?????????????\n' \
'??? ????? ???????? ????? ?????! ??????????? ? ???!'
bot.sendMessage(chat_id=update.message.chat_id, text=response, reply_markup=keyboard)
logging.info('Command \'help\' invoked by chat id [{0}]'.format(update.message.chat_id))
# ????????? ??????
def telegram_command_handle(updater):
"""
????????? ?????? ?? ???? Telegram
:param updater:
:return:
"""
dispatcher = updater.dispatcher
start_handler = CommandHandler('start', start)
dispatcher.add_handler(start_handler)
help_me_handler = CommandHandler('help', help_me)
dispatcher.add_handler(help_me_handler)
echo_handler = MessageHandler(Filters.text, echo)
dispatcher.add_handler(echo_handler)
def register_text_handler(self, callback, allow_edited=False):
"""Registers text message handler
Args:
callback(function): callable object to execute
allow_edited(Optional[bool]): pass edited messages
"""
@utils.log(logger, print_ret=False)
def process_update(bot, update):
lang = utils.get_lang(self._storage, update.effective_user)
callback(update.effective_message, lang)
self._dispatcher.add_handler(MessageHandler(Filters.text, process_update,
edited_updates=allow_edited))
def set_bot(self, bot):
""" Sets the main bot, the bot must be an instance of
`eddie.bot.Bot`.
This method is in charge of registering all the bot's commands and
default message handlers.
The commands will be handled by
`TelegramEndpoint.default_command_handler`, all the other messages
will be handled by `TelegramEndpoint.default_message_handler`.
"""
self._bot = bot
self._telegram.dispatcher.add_handler(
MessageHandler(
Filters.text,
self.default_message_handler
)
)
for command in self._bot.command_names:
self._telegram.dispatcher.add_handler(
CommandHandler(
command,
self.default_command_handler
)
)
def default_message_handler(self, bot, update):
""" This is the method that will be called for every new message that
is not a command. It will ask the bot how to reply to the user.
The input parameters (`bot` and `update`) are default parameters
used by telegram.
"""
in_message = update.message.text
update.message.reply_text(self._bot.default_response(in_message))
def default_command_handler(self, bot, update):
""" All the commands will pass through this method. It will use the
bot's command to get the output for the user.
The input parameters (`bot` and `update`) are default parameters
used by telegram.
"""
command = update.message.text[1:]
command_handler = self._bot.__getattribute__(command)
update.message.reply_text(command_handler())
def _keyword_detect(self, text):
keywords = (
'???',
'???',
'???',
'????',
'???', # ??
'???',
'???',
'???',
'????',
)
time_query_keywords = (
'??',
'??',
'????',
'??',
'??',
'??',
)
eye_catching_keywords = (
'??',
'??',
'????',
)
for keyword in keywords:
if keyword in text:
return True
if u'??' in text:
for k in time_query_keywords:
if k in text:
return True
if u'??' in text:
for k in eye_catching_keywords:
if k in text:
return True
return False
def search_youtube(text):
url = 'https://www.youtube.com'
while True:
r = requests.get(url + '/results', params={'search_query': text})
soup = BeautifulSoup(r.content, 'html.parser')
tag = soup.find('a', {'rel': 'spf-prefetch'})
title, video_url = tag.text, url + tag['href']
if 'googleads' not in video_url:
break
return title, video_url
def music(bot, update):
title, video_url = search_youtube(update.message.text)
music_dict = download(title, video_url)
update.message.reply_audio(**music_dict)
os.remove(title + '.mp3')
def main():
u = Updater('YOUR-TOKEN')
dp = u.dispatcher
dp.add_handler(CommandHandler("start", start))
dp.add_handler(MessageHandler(Filters.text, music))
u.start_polling()
u.idle()
def _register_commands(self, _dispatcher, plugins):
# keyboard answer handlers
_dispatcher.add_handler(MessageHandler(Filters.text, self.handle_message))
_dispatcher.add_handler(CallbackQueryHandler(self.handle_callback_query))
# public commands
_dispatcher.add_handler(CommandHandler('start', self.start_command))
_dispatcher.add_handler(CommandHandler('showqueue', self.show_queue_command))
_dispatcher.add_handler(CommandHandler('currentsong', self.current_song_command))
_dispatcher.add_handler(CommandHandler('cancel', self.cancel_command))
_dispatcher.add_handler(CommandHandler('login', self.login_command))
_dispatcher.add_handler(CommandHandler('subscribe', self.subscribe))
_dispatcher.add_handler(CommandHandler('unsubscribe', self.unsubscribe))
# gmusic_password protected commands
_dispatcher.add_handler(CommandHandler('next', self.next_command))
_dispatcher.add_handler(CommandHandler('play', self.play_command))
_dispatcher.add_handler(CommandHandler('pause', self.pause_command))
_dispatcher.add_handler(CommandHandler('skip', self.skip_command))
_dispatcher.add_handler(CommandHandler('movesong', self.move_song_command))
# admin commands
_dispatcher.add_handler(CommandHandler('admin', self.admin_command))
_dispatcher.add_handler(CommandHandler('clearqueue', self.clear_queue_command))
_dispatcher.add_handler(CommandHandler('reset', self.reset_command))
_dispatcher.add_handler(CommandHandler('exit', self.exit_command))
_dispatcher.add_handler(CommandHandler('ip', self.ip_command))
_dispatcher.add_handler(CommandHandler('togglepassword', self.toggle_password_command))
_dispatcher.add_handler(CommandHandler('setpassword', self.set_password_command))
_dispatcher.add_handler(CommandHandler('banuser', self.ban_user_command))
_dispatcher.add_handler(CommandHandler('setquality', self.set_quality_command))
_dispatcher.add_handler(CommandHandler('stationremove', self.station_remove_command))
_dispatcher.add_handler(CommandHandler('stationreload', self.station_reload_command))
for plugin in plugins:
_dispatcher.add_handler(
CommandHandler(plugin.get_label(), lambda bot, update: plugin.run_command(self, bot, update)))
_dispatcher.add_handler(InlineQueryHandler(self.get_inline_query_handler()))
_dispatcher.add_handler(ChosenInlineResultHandler(self.queue_command))