def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str,
cmd_name: str, example: str):
if generic_checks(bot, update, cmd_name, example, 2):
# Extract value
text = update.message.text
cmd, tag, value = text.split(" ", 2)
# Apply edit and
edit_poll(tag, field_name, value)
# send "success" feedback message
new_p = get_poll(tag)
msg = text_edit_successful.format(p=new_p)
cid = update.message.chat_id
bot.sendMessage(cid, msg)
# Check if complete, and suggest to activate
if check_if_complete(tag):
msg2 = text_activate_suggestion.format(t=tag)
bot.sendMessage(cid, msg2, parse_mode="Markdown")
else:
# A generic check failed.
pass
return
python类Bot()的实例源码
def main():
global update_id
# Telegram Bot Authorization Token
f = open("../token.txt", "r")
token = f.readline()
f.close()
bot = telegram.Bot(token=token)
# get the first pending update_id, this is so we can skip over it in case
# we get an "Unauthorized" exception.
try:
update_id = bot.getUpdates()[0].update_id
except IndexError:
update_id = None
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
while True:
try:
echo(bot)
except NetworkError:
sleep(1)
except Unauthorized:
# The user has removed or blocked the bot.
update_id += 1
def execute(self, chat_id, bot: Bot, update: Update):
handled = False
if update.message.text in self._buildings_dict:
handled = True
building_identifier = self._buildings_dict[update.message.text]
classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier)
keyboard_button = []
counter = 0
row = -1
for classroom in classrooms:
if counter == 0:
keyboard_button.append([])
row += 1
keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower()))
counter += 1
if counter == 3:
counter = 0
keyboard_button.append(["/buildings"])
reply_keyboard = ReplyKeyboardMarkup(keyboard_button)
bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard)
return handled
def help_about(bot: telegram.Bot, update: telegram.Update):
bot.send_message(
chat_id=update.message.chat_id,
text=strings.HELP_MESSAGE,
parse_mode='HTML',
disable_web_page_preview=True,
reply_markup=strings.HELP_MESSAGE_KEYBOARD
)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='help.about'
)
logging.info('Command help: about.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def settings_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_message(bot, chat_settings)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.main'
)
logging.info('Command settings: main.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_message(bot: telegram.Bot, chat_settings: bot_types.ChatSettings):
bot.send_message(
chat_id=chat_settings.id,
text=strings.SETTINGS_MESSAGE % (
chat_settings.admin_name,
str(chat_settings.mode),
'???????' if chat_settings.quiet else '????????',
'?????? ????????????? ????' if chat_settings.admin_only else '??? ??????',
chat_settings.yandex_key if chat_settings.yandex_key else '?? ??????????',
str(chat_settings.voice),
str(chat_settings.emotion),
str(chat_settings.speed),
'???????????' if chat_settings.as_audio else '????????? ?????????',
'????????? ?????????' if chat_settings.as_audio else '???????????',
'?????????' if chat_settings.quiet else '????????',
'?????' if chat_settings.admin_only else '?????? ????????????? ????'
),
parse_mode='HTML'
)
# endregion
# region settings voice
def settings_voice_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_voice_message(bot, update.message.chat_id)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.voice.get'
)
logging.info('Command settings: voice.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_voice_message_callback(bot: telegram.Bot, voice: bot_types.Voice,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_VOICE_MESSAGE % str(voice)
)
# endregion
# region settings emotion
def settings_emotion_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_emotion_message(bot, update.message.chat_id)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.emotions.get'
)
logging.info('Command settings: emotion.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def send_settings_emotion_message_callback(bot: telegram.Bot, emotion: bot_types.Emotion,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_EMOTION_MESSAGE % str(emotion)
)
# endregion
# region settings mode
def send_settings_mode_message_callback(bot: telegram.Bot, mode: bot_types.Mode,
callback_query_id: str, quiet: bool):
if quiet:
bot.answer_callback_query(
callback_query_id=callback_query_id
)
else:
bot.answer_callback_query(
callback_query_id=callback_query_id,
text=strings.NEW_MODE_MESSAGE % str(mode)
)
# endregion
# region settings audio
def set_auto(bot: Bot, update: Update):
"""
Handles /auto command
:param bot:
:param update:
:return:
"""
global data
chat_id = update.message.chat_id
if chat_id in data.conversations:
data.conversations[chat_id].auto = not data.conversations[chat_id].auto
if data.conversations[chat_id].auto:
message = "Automatic mode enabled."
else:
message = "Automatic mode disabled."
send_custom_message(bot=bot, chat_id=chat_id, message=message)
else:
send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def send_matches(bot: Bot, update: Update):
"""
Send list of matches (pictures) to private chat
:param bot:
:param update:
:return:
"""
global data
chat_id = update.message.chat_id
sender_id = update.message.from_user.id
if chat_id in data.conversations:
try:
conversation = data.conversations[chat_id]
matches = conversation.get_matches()
id = 0
# TODO cache the photo urls for some time
for match in matches:
photo = match.user.get_photos(width='172')[0]
try:
is_msg_sent = send_private_photo(bot=bot, caption=match.user.name + " ID=" + str(id), url=photo,
user_id=sender_id)
if not is_msg_sent:
notify_start_private_chat(bot=bot,
chat_id=chat_id,
incoming_message=update.message)
break
id += 1
except error.BadRequest:
send_custom_message(bot=bot, chat_id=sender_id, message="One match could not be loaded: %s" % photo)
time.sleep(0.1)
except AttributeError as e:
message = "An error happened."
send_custom_message(bot=bot, chat_id=sender_id, message=message)
else:
send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def unlink(bot: Bot, update: Update):
"""
Handles /unlink command
:param bot:
:param update:
:return:
"""
global data
sender = update.message.from_user.id
chat_id = update.message.chat_id
if chat_id in data.conversations:
if sender == data.conversations[chat_id].owner:
del data.conversations[chat_id]
send_message(bot, chat_id, name="account_unlinked")
else:
send_error(bot, chat_id=chat_id, name="command_not_allowed")
else:
send_error(bot, chat_id=chat_id, name="account_not_setup")
def test_admin_help(self):
"""
Test admin help command
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__admin_help( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_get_users_no_config(self):
"""
Test get users command if no users are registered
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
usrmgr_mock.return_value.config = {}
AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_get_users(self):
"""
Test get users command
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
config = {"foogroup": {"users": [{"id": 1337,
"username": "@foouser"}],
"unverified": ["@baruser"]}}
usrmgr_mock.return_value.config = config
AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access
bot, update)
self.assertTrue(bot.sendMessage.called)
def test_adduser_no_args(self):
"""
Test adduser command if the wrong number of args is passed
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access
bot, update, [])
bot.sendMessage.assert_called_with(
chat_id=1, text="Usage: adduser <user> <group>")
def test_adduser_usr_already_in_grp(self):
"""
Test adduser command if user is already in group
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
usrmgr_mock.return_value.add_user.return_value = False
AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access
bot, update, ["@foouser", "foogroup"])
bot.sendMessage.assert_called_with(
chat_id=1,
text="The user '@foouser' is already in the group 'foogroup'!")
def test_rmuser_no_args(self):
"""
Test rmuser command if the wrong number of args is passed
"""
bot = Mock(spec=Bot)
update = self.__get_dummy_update()
AdminCommands._AdminCommands__rm_user( # pylint: disable=no-member, protected-access
bot, update, [])
bot.sendMessage.assert_called_with(chat_id=1,
text="Usage: rmuser <user> <group>")