def create_telegram_update(message_text):
""" Helper function: create an "Update" to simulate a message sent to the
Telegram bot.
"""
from datetime import datetime
message = telegram.Message(
message_id=0,
from_user=telegram.User(0, 'greenkey'),
date=datetime.now(),
chat=telegram.Chat(0, ''),
text=message_text
)
return telegram.Update(
update_id=0,
message=message
)
python类Message()的实例源码
def get_reply_to(reply_to_message: telegram.Message, forward_index: int):
"""
Combine replied user's first name and last name and format into (reply to from xxx)
:param reply_to_message: the replied message
:param forward_index: forward index
:return: combined (reply to from xxx)
"""
if not reply_to_message or not reply_to_message.from_user:
return ''
reply_to = get_full_user_name(reply_to_message.from_user)
if reply_to_message.from_user.id == global_vars.tg_bot_id:
tg_message_id = reply_to_message.message_id
saved_message = global_vars.mdb.retrieve_message(tg_message_id, forward_index)
if not saved_message:
return ''
qq_number = saved_message[2]
if not qq_number: # message is bot command (tg side)
return ''
reply_to = get_qq_name(qq_number, forward_index)
return '(?' + reply_to + ')'
def audio_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def pic_link_on(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
if JQ_MODE:
return
IMAGE_LINK_MODE[forward_index] = True
message = 'QQ ?????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def pic_link_off(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
if JQ_MODE:
return
IMAGE_LINK_MODE[forward_index] = False
message = 'QQ ?????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def recall(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
result = recall_message(forward_index, tg_reply_to)
if result == -1:
text = 'Please refer to a message.'
elif result == -2:
text = 'Message not recallable.'
elif result == -3:
text = 'Recalling messages from other QQ users is not supported.',
elif result == -4:
text = 'Message sent more than two minutes ago. Recalling failed.'
else:
text = 'Message recalled.'
global_vars.tg_bot.sendMessage(chat_id=tg_group_id,
text=text,
reply_to_message_id=tg_message_id)
def drive_mode_on(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.DRIVE_MODE[forward_index] = True
message = 'Status changed: 451'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def drive_mode_off(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.DRIVE_MODE[forward_index] = False
message = 'Status changed: 200'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def dice(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message = None):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
reply_entity = list()
reply_entity.append({
'data': {'text': 'threw a dice'},
'type': 'text'})
reply_entity.append({
'data': {'type': '1'},
'type': 'dice'})
send_from_tg_to_qq(forward_index,
reply_entity,
tg_group_id=tg_group_id,
tg_user=tg_user)
def rps(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
reply_entity = list()
reply_entity.append({
'data': {'text': 'played rock–paper–scissors'},
'type': 'text'})
reply_entity.append({
'data': {'type': '1'},
'type': 'rps'})
send_from_tg_to_qq(forward_index,
reply_entity,
tg_group_id=tg_group_id,
tg_user=tg_user)
def update_namelist(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.group_members[forward_index] = global_vars.qq_bot.get_group_member_list(group_id=FORWARD_LIST[forward_index]['QQ'])
message = 'QQ????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def show_red_pack(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
message = '????????????????????? https://qr.alipay.com/c1x00417d1veog0b99yea4e'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def link_chat_show_list(self, bot, update, args=None):
"""
Show the list of available chats for linking.
Triggered by `/link`.
Args:
bot: Telegram Bot instance
update: Message update
"""
args = args or []
if update.message.from_user.id != update.message.chat_id:
links = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat_id))
if links:
return self.link_chat_gen_list(bot, update.message.chat_id, filter=" ".join(args), chats=links)
self.link_chat_gen_list(bot, update.message.from_user.id, filter=" ".join(args))
def unlink_all(self, bot, update):
"""
Unlink all chats linked to the telegram group.
Triggered by `/unlink_all`.
Args:
bot: Telegram Bot instance
update: Message update
"""
if update.message.chat.id == update.message.from_user.id:
return bot.send_message(update.message.chat.id, "Send `/unlink_all` to a group to unlink all remote chats "
"from it.",
parse_mode=telegram.ParseMode.MARKDOWN,
reply_to_message_id=update.message.message_id)
assocs = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id))
if len(assocs) < 1:
return bot.send_message(update.message.chat.id, "No chat is linked to the group.",
reply_to_message_id=update.message.message_id)
else:
db.remove_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id))
return bot.send_message(update.message.chat.id, "All chats has been unlinked from this group. (%s)" % len(assocs),
reply_to_message_id=update.message.message_id)
def extra_help(self, bot, update):
"""
Show list of extra functions and their usage.
Triggered by `/extra`.
Args:
bot: Telegram Bot instance
update: Message update
"""
msg = "List of slave channel features:"
for n, i in enumerate(sorted(self.slaves)):
i = self.slaves[i]
msg += "\n\n<b>%s %s</b>" % (i.channel_emoji, i.channel_name)
xfns = i.get_extra_functions()
if xfns:
for j in xfns:
fn_name = "/%s_%s" % (n, j)
msg += "\n\n%s <b>(%s)</b>\n%s" % (
fn_name, xfns[j].name, xfns[j].desc.format(function_name=fn_name))
else:
msg += "No command found."
bot.send_message(update.message.chat.id, msg, parse_mode="HTML")
def extra_call(self, bot, update, groupdict=None):
"""
Call an extra function from slave channel.
Args:
bot: Telegram Bot instance
update: Message update
groupdict: Parameters offered by the message
"""
if int(groupdict['id']) >= len(self.slaves):
return self._reply_error(bot, update, "Invalid slave channel ID. (XC01)")
ch = self.slaves[sorted(self.slaves)[int(groupdict['id'])]]
fns = ch.get_extra_functions()
if groupdict['command'] not in fns:
return self._reply_error(bot, update, "Command not found in selected channel. (XC02)")
header = "%s %s: %s\n-------\n" % (ch.channel_emoji, ch.channel_name, fns[groupdict['command']].name)
msg = bot.send_message(update.message.chat.id, header + "Please wait...")
result = fns[groupdict['command']](" ".join(update.message.text.split(' ', 1)[1:]))
bot.editMessageText(text=header + result, chat_id=update.message.chat.id, message_id=msg.message_id)
def poll(self):
"""
Message polling process.
"""
self.polling_from_tg()
while True:
try:
m = self.queue.get()
if m is None:
break
self.logger.info("Got message from queue\nType: %s\nText: %s\n----" % (m.type, m.text))
threading.Thread(target=self.process_msg, args=(m,)).start()
self.queue.task_done()
self.logger.info("Msg sent to TG, task_done marked.")
except Exception as e:
self.logger.error("Error occurred during message polling")
self.logger.error(repr(e))
self.bot.stop()
self.poll()
self.logger.debug("Gracefully stopping %s (%s).", self.channel_name, self.channel_id)
self.bot.stop()
self.logger.debug("%s (%s) gracefully stopped.", self.channel_name, self.channel_id)
def __get_dummy_update():
"""Returns a dummy update instance"""
user = User(1337, "@foouser")
chat = Chat(1, None)
message = Message(1, user, datetime.now(), chat)
return Update(1, message=message)
def __get_dummy_update():
"""Returns a dummy update instance"""
user = User(1337, "@foouser")
chat = Chat(1, None)
message = Message(1, user, datetime.now(), chat)
return Update(1, message=message)
def test_telegram_default_response(mocker):
""" Test that the Telegram bot correctly reply with the default response.
"""
mocker.patch('eddie.endpoints.telegram.Updater')
mock_messagehandler = mocker.patch(
'eddie.endpoints.telegram.MessageHandler')
reply_text_m = mocker.patch('telegram.Message.reply_text')
class MyBot(Bot):
"Uppering-reversing bot"
def default_response(self, in_message):
return in_message[::-1].upper()
bot = MyBot()
endpoint = TelegramEndpoint(
token='123:ABC'
)
bot.add_endpoint(endpoint)
bot.run()
handlers_added = [args for args,
kwargs in mock_messagehandler.call_args_list]
assert len(handlers_added) > 0
generic_handler = list(handler for filter_, handler in handlers_added)[-1]
message = 'this is the message'
generic_handler(bot, create_telegram_update(message))
reply_text_m.assert_called_with(
bot.default_response(message))
bot.stop()
def filter(self, message: Message):
return message.from_user.id in self.active_users
def post_title(self, post: Post) -> Message:
text = self.format_header(post)
return self.bot.send_message(chat_id=self.channel_id,
text=text,
disable_web_page_preview=True,
timeout=self.timeout)
def post_image(self, image: Image, is_album) -> Message:
text = self.format_caption(image) if is_album else ''
kwargs = {
'caption': text,
'chat_id': self.channel_id,
'disable_notification': True,
'timeout': self.timeout,
}
if image.animated:
return self.bot.send_video(video=image.url, **kwargs)
else:
return self.bot.send_photo(photo=image.url, **kwargs)
def download(self):
if os.path.exists(self.path):
print("[FileDownloader]", "Exists", self.path)
return
r = requests.get(self.url, **self.requests_kwargs)
with open(self.path, 'wb') as f:
f.write(r.content)
# Message attributing functions
# Below this line is the user name processing function used by mybot.py
def get_forward_from(message: telegram.Message):
"""
Combine forwarded user's first name and last name and format into (forwarded from xxx)
:param message: the forwarded message
:return: combined (forwarded from xxx)
"""
if not message:
return ''
if not message.forward_from:
return ''
if message.forward_from.id == global_vars.tg_bot_id:
if message.caption:
message_text = message.caption
elif message.text:
message_text = message.text
else:
message_text = ''
right_end = message_text.find(':')
if right_end != -1: # from qq
result = message_text[:right_end]
else: # self generated command text, etc.
result = ''
else:
result = get_full_user_name(message.forward_from)
return '(?' + result + ')'
def video_from_telegram(bot: telegram.Bot,
update: telegram.Update):
if update.message:
message: telegram.Message = update.message
edited = False
else:
message: telegram.Message = update.edited_message
edited = True
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message,
edited=edited)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def document_from_telegram(bot: telegram.Bot,
update: telegram.Update):
if update.message:
message: telegram.Message = update.message
edited = False
else:
message: telegram.Message = update.edited_message
edited = True
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message,
edited=edited)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def sticker_from_telegram(bot: telegram.Bot, update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
file_id = update.message.sticker.file_id
if JQ_MODE:
tg_get_pic_url(file_id, 'png')
reply_entity.append({
'type': 'image',
'data': {'file': file_id + '.png'}
})
elif IMAGE_LINK_MODE[forward_index]:
pic_url = tg_get_pic_url(file_id, 'png')
reply_entity.append({
'type': 'text',
'data': {'text': '[ ' + message.sticker.emoji + ' sticker, ?????' + pic_url + ' ]'}
})
else:
reply_entity.append({
'type': 'text',
'data': {'text': '[ ' + message.sticker.emoji + ' sticker ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def text_from_telegram(bot: telegram.Bot,
update: telegram.Update):
if update.message:
message: telegram.Message = update.message
edited = False
else:
message: telegram.Message = update.edited_message
edited = True
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
if edited:
recall_message(forward_index, message)
if message.text.startswith('//'):
return
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': message.text}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message,
edited=edited)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def show_tg_group_id(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message):
msg = 'Telegram group id is: ' + str(tg_group_id)
global_vars.tg_bot.sendMessage(chat_id=tg_group_id,
text=msg)