def tg_command(bot: telegram.Bot,
update: telegram.Update):
if update.edited_message: # handle edit
message: telegram.Message = update.edited_message
else:
message: telegram.Message = update.message
if not message.text.startswith('!!'): # no command indicator
return
tg_group_id = message.chat_id # telegram group id
tg_reply_to = message.reply_to_message
text = message.text[2:]
for command in global_vars.command_list: # process all non-forward commands
if command.tg_only and (text == command.command or text == command.short_command):
command.handler(tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_message_id=message.message_id,
tg_reply_to=tg_reply_to)
raise DispatcherHandlerStop()
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
raise DispatcherHandlerStop()
for command in global_vars.command_list: # process all forward commands
if not command.tg_only and not command.qq_only and (text == command.command or text == command.short_command):
command.handler(forward_index,
tg_user=message.from_user,
tg_group_id=tg_group_id,
tg_message_id=message.message_id,
tg_reply_to=tg_reply_to)
raise DispatcherHandlerStop()
python类Message()的实例源码
def command_tg(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message = None):
result = '''I'm a relay bot between qq and tg.
Please use "!!show commands" or "!!cmd" to show all commands.
'''
global_vars.tg_bot.sendMessage(chat_id=tg_group_id,
text=result,
reply_to_message_id=tg_message_id,
parse_mode='HTML')
_1006_water_meter_filter.py 文件源码
项目:coolq-telegram-bot
作者: jqqqqqqqqqq
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def tg_water_meter(bot: telegram.Bot,
update: telegram.Update):
if update.message:
message: telegram.Message = update.message
else:
message: telegram.Message = update.edited_message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=int(tg_group_id))
logger.debug("Water meter processing")
if message.forward_from_chat and message.forward_from_chat.type == 'channel':
logger.debug("message is forward from channel")
if update.message.forward_from_chat.id in global_vars.filter_list['channels']:
logger.debug("message is blocked")
global_vars.drive_mode_on(forward_index,
tg_user=message.from_user,
tg_group_id=tg_group_id,
tg_message_id=message.message_id)
raise DispatcherHandlerStop()
message_text = ''
if message.caption:
message_text = message.caption
elif message.text:
message_text = message.text
if not message_text:
return
for keyword in global_vars.filter_list['keywords']:
if keyword in message_text:
logger.debug("message is blocked")
global_vars.drive_mode_on(forward_index,
tg_user=message.from_user,
tg_group_id=tg_group_id,
tg_message_id=message.message_id)
raise DispatcherHandlerStop()
def test_authorized_only(default_conf, mocker):
mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf)
chat = Chat(0, 0)
update = Update(randint(1, 100))
update.message = Message(randint(1, 100), 0, datetime.utcnow(), chat)
state = {'called': False}
@authorized_only
def dummy_handler(*args, **kwargs) -> None:
state['called'] = True
dummy_handler(MagicMock(), update)
assert state['called'] is True
def test_authorized_only_unauthorized(default_conf, mocker):
mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf)
chat = Chat(0xdeadbeef, 0)
update = Update(randint(1, 100))
update.message = Message(randint(1, 100), 0, datetime.utcnow(), chat)
state = {'called': False}
@authorized_only
def dummy_handler(*args, **kwargs) -> None:
state['called'] = True
dummy_handler(MagicMock(), update)
assert state['called'] is False
def test_authorized_only_exception(default_conf, mocker):
mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf)
update = Update(randint(1, 100))
update.message = Message(randint(1, 100), 0, datetime.utcnow(), Chat(0, 0))
@authorized_only
def dummy_handler(*args, **kwargs) -> None:
raise Exception('test')
dummy_handler(MagicMock(), update)
def update():
_update = Update(0)
_update.message = Message(0, 0, datetime.utcnow(), Chat(0, 0))
return _update
def handler_wrapper(func):
def wrap(self, _, update, *args, **kwargs):
assert isinstance(User.query, Query)
assert isinstance(update.message, Message)
tguser = update.message.from_user
assert isinstance(tguser, TgUser)
user = User.query.filter(User.tg_id == tguser.id).one_or_none()
now = datetime.now()
if not user:
user = User(
tg_id=tguser.id,
first_name=tguser.first_name,
last_name=tguser.last_name or '',
username=tguser.username,
created_at=now,
last_active_at=now,
)
db.session.add(user)
db.session.commit()
else:
user.first_name = tguser.first_name
user.last_name = tguser.last_name or ''
user.username = tguser.username
user.last_active_at = now
user.is_active = True
user.update = update
user.message = update.message
try:
func(self, user, *args, **kwargs)
except Flow:
pass
db.session.commit()
return wrap
def __init__(self, id, text, chat_id, reply_id=None):
from_user = telegram.User(
id=1, first_name="He", last_name="Man", username="heman")
reply_to_user = telegram.User(
id=2, first_name="She", last_name="Ra", username="shera")
chat = telegram.Chat(id=chat_id, type="group")
reply_to_message = reply_id and telegram.Message(reply_id, reply_to_user, None, chat)
super().__init__(
message_id=id,
date=datetime.now(),
text=text,
chat=chat,
from_user=from_user,
reply_to_message=reply_to_message,
)
def _reply_error(bot, update, errmsg):
"""
A wrap that directly reply a message with error details.
Returns:
telegram.Message: Message sent
"""
return bot.send_message(update.message.chat.id, errmsg, reply_to_message_id=update.message.message_id)
def link_chat_exec(self, bot, tg_chat_id, tg_msg_id, callback_uid):
"""
Action to link a chat. Triggered by callback message with status `Flags.EXEC_LINK`.
Args:
bot: Telegram Bot instance
tg_chat_id: Chat ID
tg_msg_id: Message ID triggered the callback
callback_uid: Callback message
"""
if callback_uid == Flags.CANCEL_PROCESS:
txt = "Cancelled."
self.msg_status.pop("%s.%s" % (tg_chat_id, tg_msg_id), None)
self.msg_storage.pop("%s.%s" % (tg_chat_id, tg_msg_id), None)
return bot.editMessageText(text=txt,
chat_id=tg_chat_id,
message_id=tg_msg_id)
cmd, chat_lid = callback_uid.split()
chat = self.msg_storage["%s.%s" % (tg_chat_id, tg_msg_id)]['chats'][int(chat_lid)]
chat_uid = "%s.%s" % (chat['channel_id'], chat['chat_uid'])
chat_display_name = chat['chat_name'] if chat['chat_name'] == chat['chat_alias'] else "%s (%s)" % (
chat['chat_alias'], chat['chat_name'])
chat_display_name = "'%s' from '%s %s'" % (chat_display_name, chat['channel_emoji'], chat['channel_name']) \
if chat['channel_name'] else "'%s'" % chat_display_name
self.msg_status.pop("%s.%s" % (tg_chat_id, tg_msg_id), None)
self.msg_storage.pop("%s.%s" % (tg_chat_id, tg_msg_id), None)
if cmd == "unlink":
db.remove_chat_assoc(slave_uid=chat_uid)
txt = "Chat %s is restored." % chat_display_name
return bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id)
if cmd == "mute":
db.remove_chat_assoc(slave_uid=chat_uid)
db.add_chat_assoc(slave_uid=chat_uid, master_uid=self.MUTE_CHAT_ID, multiple_slave=True)
txt = "Chat %s is now muted." % chat_display_name
return bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id)
txt = "Command '%s' (%s) is not recognised, please try again" % (cmd, callback_uid)
bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id)
def command_exec(self, bot, chat_id, message_id, callback):
"""
Run a command from a command message.
Triggered by callback message with status `Flags.COMMAND_PENDING`.
Args:
bot: Telegram Bot instance
chat_id: Chat ID
message_id: Message ID triggered the callback
callback: Callback message
"""
if not callback.isdecimal():
msg = "Invalid parameter: %s. (CE01)" % callback
self.msg_status.pop("%s.%s" % (chat_id, message_id), None)
self.msg_storage.pop("%s.%s" % (chat_id, message_id), None)
return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id)
elif not (0 <= int(callback) < len(self.msg_storage["%s.%s" % (chat_id, message_id)])):
msg = "Index out of bound: %s. (CE02)" % callback
self.msg_status.pop("%s.%s" % (chat_id, message_id), None)
self.msg_storage.pop("%s.%s" % (chat_id, message_id), None)
return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id)
callback = int(callback)
channel_id = self.msg_storage["%s.%s" % (chat_id, message_id)]['channel']
command = self.msg_storage["%s.%s" % (chat_id, message_id)]['commands'][callback]
msg = self.msg_storage["%s.%s" % (chat_id, message_id)]['text'] + "\n------\n" + getattr(
self.slaves[channel_id], command['callable'])(*command['args'], **command['kwargs'])
self.msg_status.pop("%s.%s" % (chat_id, message_id), None)
self.msg_storage.pop("%s.%s" % (chat_id, message_id), None)
return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id)
def msg(self, bot, update):
"""
Process, wrap and deliver messages from user.
Args:
bot: Telegram Bot instance
update: Message update
"""
self.logger.debug("----\nMsg from tg user:\n%s", update.message.to_dict())
multi_slaves = False
if update.message.chat.id != update.message.from_user.id: # from group
assocs = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id))
if len(assocs) > 1:
multi_slaves = True
reply_to = bool(getattr(update.message, "reply_to_message", None))
private_chat = update.message.chat.id == update.message.from_user.id
if (private_chat or multi_slaves) and not reply_to:
candidates = db.get_recent_slave_chats(update.message.chat.id) or\
db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id))[:5]
if candidates:
tg_err_msg = update.message.reply_text("Error: No recipient specified.\n"
"Please reply to a previous message.", quote=True)
storage_id = "%s.%s" % (update.message.chat.id, tg_err_msg.message_id)
legends, buttons = self.slave_chats_pagination(storage_id, 0, fchats=candidates)
tg_err_msg.edit_text("Error: No recipient specified.\n"
"Please reply to a previous message, "
"or choose a recipient:\n\nLegend:\n" + "\n".join(legends),
reply_markup=telegram.InlineKeyboardMarkup(buttons))
self.msg_status[storage_id] = Flags.SUGGEST_RECIPIENT
self.msg_storage[storage_id]["update"] = update.to_dict()
else:
update.message.reply_text("Error: No recipient specified.\n"
"Please reply to a previous message.", quote=True)
else:
return self.process_telegram_message(bot, update)
def auto_spin(bot: Bot, job: Job):
from telegram import Message, Chat
u = Update(0, message=Message(0, None, 0, Chat(job.context, '')))
if core.results_today.get(job.context) is None:
do_the_spin(bot, u)
def test_telegram_command(mocker):
""" Test that the endpoint class correctly registers the commands callback
and that the Telegram bot uses them to reply to messages.
"""
mocker.patch('eddie.endpoints.telegram.Updater')
mock_commandhandler = mocker.patch(
'eddie.endpoints.telegram.CommandHandler')
reply_text_m = mocker.patch('telegram.Message.reply_text')
class MyBot(Bot):
"Reversing bot"
def default_response(self, in_message):
return in_message[::-1]
@command
def start(self):
"start command"
return 'start command has been called'
@command
def other(self):
"other command"
return 'other command has been called'
bot = MyBot()
endpoint = TelegramEndpoint(
token='123:ABC'
)
bot.add_endpoint(endpoint)
bot.run()
commands_added = [args for args,
kwargs in mock_commandhandler.call_args_list]
commands_added = dict((name, handler) for name, handler in commands_added)
assert 'start' in commands_added
assert 'other' in commands_added
commands_added['start'](bot, create_telegram_update('/start'))
reply_text_m.assert_called_with(bot.start())
commands_added['other'](bot, create_telegram_update('/other'))
reply_text_m.assert_called_with(bot.other())
bot.stop()
def send_from_tg_to_qq(forward_index: int,
message: list,
tg_group_id: int,
tg_user: telegram.User=None,
tg_forward_from: telegram.Message=None,
tg_reply_to: telegram.Message=None,
edited: bool=False,
auto_escape: bool=True) -> int:
"""
send message from telegram to qq
:param forward_index: forward group index
:param message: message in cq-http-api like format
:param tg_group_id: telegram group id
:param tg_user: telegram user who send this message
:param tg_forward_from: who the message is forwarded from
:param tg_reply_to: who the message is replied to
:param edited: the status of edition
:param auto_escape: if contain coolq code, pass False
:return: qq message id
"""
logger.debug("tg -> qq: " + str(message))
sender_name = get_full_user_name(tg_user)
forward_from = get_forward_from(tg_forward_from)
reply_to = get_reply_to(tg_reply_to, forward_index)
if edited: # if edited, add edit mark
edit_mark = ' E ' # ' ? '
else:
edit_mark = ''
message_attribute = sender_name + reply_to + forward_from + edit_mark + ': '
if sender_name: # insert extra info at beginning
message.insert(0, {
'type': 'text',
'data': {'text': message_attribute}
})
if FORWARD_LIST[forward_index].get('QQ'):
return global_vars.qq_bot.send_group_msg(group_id=FORWARD_LIST[forward_index]['QQ'],
message=message,
auto_escape=auto_escape)['message_id']
if FORWARD_LIST[forward_index].get('DISCUSS'):
return global_vars.qq_bot.send_discuss_msg(discuss_id=FORWARD_LIST[forward_index]['DISCUSS'],
message=message,
auto_escape=auto_escape)['message_id']
def send_from_qq_to_tg(forward_index: int,
message: list,
qq_group_id: int = 0,
qq_discuss_id: int = 0,
qq_user: int=None) -> list:
"""
send message from qq to telegram
:param forward_index: forward group index
:param message: message in cq-http-api like format
:param qq_group_id: which group this message came from, can be None if qq_discuss_id is not None
:param qq_discuss_id: which discuss this message came from, can be None if qq_group_id is not None
:param qq_user: which user sent this message
:return: telegram.Message list
"""
logger.debug('qq -> tg: ' + str(message))
message_list = divide_qq_message(forward_index, message)
message_count = len(message_list)
telegram_message_id_list = list()
for idx, message_part in enumerate(message_list):
if message_count == 1:
message_index_attribute = ''
else:
message_index_attribute = '(' + str(idx + 1) + '/' + str(message_count) + ')'
if message_part.get('image'):
filename = message_part['image']
cq_get_pic_url(filename)
cq_download_pic(filename)
pic = open(os.path.join(CQ_IMAGE_ROOT, filename), 'rb')
if message_part.get('text'):
full_msg = get_qq_name(qq_user, forward_index) + ': ' \
+ message_index_attribute + message_part['text']
else:
full_msg = get_qq_name(qq_user, forward_index) + ': ' + message_index_attribute
if filename.lower().endswith('gif'): # gif pictures send as document
_msg: telegram.Message = global_vars.tg_bot.sendDocument(FORWARD_LIST[forward_index]['TG'],
pic,
caption=full_msg)
else: # jpg/png pictures send as photo
_msg: telegram.Message = global_vars.tg_bot.sendPhoto(FORWARD_LIST[forward_index]['TG'],
pic,
caption=full_msg)
else:
# only first message could be pure text
if qq_user:
full_msg_bold = '<b>' + get_qq_name(qq_user, forward_index) + '</b>: ' + \
message_index_attribute +\
message_list[0]['text']
else:
full_msg_bold = message_index_attribute + message_list[0]['text']
_msg: telegram.Message = global_vars.tg_bot.sendMessage(FORWARD_LIST[forward_index]['TG'],
full_msg_bold,
parse_mode='HTML')
telegram_message_id_list.append(_msg.message_id)
return telegram_message_id_list
def photo_from_telegram(bot: telegram.Bot,
update: telegram.Update):
if update.message:
message: telegram.Message = update.message
edited = False
else:
message: telegram.Message = update.edited_message
edited = True
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
if edited:
recall_message(forward_index, message)
reply_entity = list()
file_id = message.photo[-1].file_id
pic_url = tg_get_pic_url(file_id, 'jpg')
if JQ_MODE:
reply_entity.append({
'type': 'image',
'data': {'file': file_id + '.jpg'}
})
if message.caption:
reply_entity.append({
'type': 'text',
'data': {'text': message.caption}
})
else:
if message.caption:
reply_entity.append({
'type': 'text',
'data': {'text': '[ ??, ?????' + pic_url + ' ]' + message.caption}
})
else:
reply_entity.append({
'type': 'text',
'data': {'text': '[ ??, ?????' + pic_url + ' ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message,
edited=edited)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def feed(self, message: telegram.Message) -> bool:
"""Give a telegram message to search for a tag
The message will be added to the digest if has a tag or is a reply to a
previous message with tag.
Returns:
bool: Indicate if the message was added to the digest
"""
# Verify if message is allowed to digest
if not self.db.exists(ConfigChat, chat_id=message.chat_id):
return False
# Extract tag from the message
text_tag = extract_hashtag(message.text)
# Check early if the message has a tag or can be a reply to a tagged message
if not (text_tag or message.reply_to_message):
return False
# Get the user who sent the message.
hashuser = self.db.get(HashUser, id=message.from_user.id)
if not hashuser:
hashuser = HashUser(
id=message.from_user.id,
friendly_name=self.make_friendly_name(message.from_user),
username=message.from_user.username,
)
# A tag was found in the message?
reply_id = None
if text_tag:
# Add a tag entry if necessary
tag_id = self.db.generate_tag_id(text_tag)
if not self.db.exists(HashTag, id=tag_id):
tag = HashTag(id=tag_id, shapes={text_tag})
# Or fetch tag and add a possible new shape
else:
tag = self.db.get(HashTag, id=tag_id)
tag.shapes.add(text_tag)
# Otherwise, the message may be a reply to a previous tagged message.
else:
reply_id = message.reply_to_message.message_id
tag = self.db.get_message_tag(reply_id)
if not tag:
return False
# Create a HashMessage from the telegram message
hashmessage = HashMessage(
id=message.message_id,
date=message.date,
text=message.text,
chat_id=message.chat_id,
reply_to=reply_id,
)
hashmessage.tag = tag
hashmessage.user = hashuser
# Add the hashmessage to the database
self.db.insert(hashmessage)
return True