def test_requires_usergroup_no_acc(self):
"""
Test requires usergroup decorator if the user has no access
"""
with patch("ownbot.auth.User") as user_mock:
user_mock.return_value.has_access.return_value = False
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
return True
bot_mock = Mock(spec=Bot)
update = self.__get_dummy_update()
called = my_command_handler(bot_mock, update)
self.assertIsNone(called)
python类User()的实例源码
def test_requires_usergroup_acc(self):
"""
Test requires usergroup decorator if the user has access
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock:
user_mock = user_mock.return_value
user_mock.has_acces.return_value = True
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
return True
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
called = my_command_handler(bot_mock, update_mock)
self.assertTrue(called)
def test_requires_usergroup_self(self):
"""
Test requires usergroup decorator with self as first argument.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock:
user_mock = user_mock.return_value
user_mock.has_acces.return_value = True
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(self, bot, update):
"""Dummy command handler"""
print(self, bot, update)
return True
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
called = my_command_handler(None, bot_mock, update_mock)
self.assertTrue(called)
def test_assign_first_to_not_first(self):
"""
Test assign first to decorator if the users is not first.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = False
@ownbot.auth.assign_first_to("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertFalse(user_mock.save.called)
def test_assign_first_to_with_self(self):
"""
Test assign first to decorator with self as first argument.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = True
@ownbot.auth.assign_first_to("foo")
def my_command_handler(self, bot, update):
"""Dummy command handler"""
print(self, bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(None, bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertTrue(user_mock.save.called)
def create_telegram_update(message_text):
""" Helper function: create an "Update" to simulate a message sent to the
Telegram bot.
"""
from datetime import datetime
message = telegram.Message(
message_id=0,
from_user=telegram.User(0, 'greenkey'),
date=datetime.now(),
chat=telegram.Chat(0, ''),
text=message_text
)
return telegram.Update(
update_id=0,
message=message
)
def pic_link_on(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
if JQ_MODE:
return
IMAGE_LINK_MODE[forward_index] = True
message = 'QQ ?????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def pic_link_off(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
if JQ_MODE:
return
IMAGE_LINK_MODE[forward_index] = False
message = 'QQ ?????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def recall(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
result = recall_message(forward_index, tg_reply_to)
if result == -1:
text = 'Please refer to a message.'
elif result == -2:
text = 'Message not recallable.'
elif result == -3:
text = 'Recalling messages from other QQ users is not supported.',
elif result == -4:
text = 'Message sent more than two minutes ago. Recalling failed.'
else:
text = 'Message recalled.'
global_vars.tg_bot.sendMessage(chat_id=tg_group_id,
text=text,
reply_to_message_id=tg_message_id)
def drive_mode_on(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.DRIVE_MODE[forward_index] = True
message = 'Status changed: 451'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def drive_mode_off(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.DRIVE_MODE[forward_index] = False
message = 'Status changed: 200'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def dice(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message = None):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
reply_entity = list()
reply_entity.append({
'data': {'text': 'threw a dice'},
'type': 'text'})
reply_entity.append({
'data': {'type': '1'},
'type': 'dice'})
send_from_tg_to_qq(forward_index,
reply_entity,
tg_group_id=tg_group_id,
tg_user=tg_user)
def rps(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message):
forward_index = get_forward_index(tg_group_id=tg_group_id)
if forward_index == -1:
return
reply_entity = list()
reply_entity.append({
'data': {'text': 'played rock–paper–scissors'},
'type': 'text'})
reply_entity.append({
'data': {'type': '1'},
'type': 'rps'})
send_from_tg_to_qq(forward_index,
reply_entity,
tg_group_id=tg_group_id,
tg_user=tg_user)
def update_namelist(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
global_vars.group_members[forward_index] = global_vars.qq_bot.get_group_member_list(group_id=FORWARD_LIST[forward_index]['QQ'])
message = 'QQ????????'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def show_red_pack(forward_index: int,
tg_group_id: int=None,
tg_user: telegram.User=None,
tg_message_id: int=None,
tg_reply_to: telegram.Message=None,
qq_group_id: int=None,
qq_discuss_id: int=None,
qq_user: int=None):
message = '????????????????????? https://qr.alipay.com/c1x00417d1veog0b99yea4e'
return send_both_side(forward_index,
message,
qq_group_id,
qq_discuss_id,
tg_group_id,
tg_message_id)
def __get_dummy_update():
"""Returns a dummy update instance"""
user = User(1337, "@foouser")
chat = Chat(1, None)
message = Message(1, user, datetime.now(), chat)
return Update(1, message=message)
def __get_dummy_update():
"""Returns a dummy update instance"""
user = User(1337, "@foouser")
chat = Chat(1, None)
message = Message(1, user, datetime.now(), chat)
return Update(1, message=message)
def __init__(self, text):
super().__init__(
chat=telegram.Chat(id=1, type='private'),
message_id=1,
from_user=telegram.User(
id=1,
first_name='Test',
is_bot=False
),
date=1,
text=text,
)
def get_full_user_name(user: telegram.User):
"""
Combine user's first name and last name
:param user: the user which you want to extract name from
:return: combined user name
"""
if not user:
return ''
name = user.first_name
if user.last_name:
name += ' ' + user.last_name
return name
def group_request_callback(bot: telegram.Bot,
update: telegram.Update):
query: telegram.CallbackQuery = update.callback_query
user: telegram.User = query.from_user
chat_id = user.id
token = query.data
user_name = get_full_user_name(user)
if token.startswith('!!'): # decline
token = token[2:]
if token not in global_vars.group_requests:
return
global_vars.qq_bot.set_group_add_request(flag=token,
type=global_vars.group_requests[token]['type'],
approve=False)
for message_id in global_vars.group_requests[token]['message_id_list']:
edited_message = {
'chat_id': chat_id,
'message_id': message_id,
'text': query.message.text + '\n' + user_name + 'declined'
}
bot.editMessageText(**edited_message)
else:
if token not in global_vars.group_requests:
return
global_vars.qq_bot.set_group_add_request(flag=token,
type=global_vars.group_requests[token]['type'],
approve=True)
for message_id in global_vars.group_requests[token]['message_id_list']:
edited_message = {
'chat_id': chat_id,
'message_id': message_id,
'text': query.message.text + '\n' + user_name + 'accepted'
}
bot.edit_message_text(**edited_message)
del global_vars.group_requests[token]
def command_tg(tg_group_id: int,
tg_user: telegram.User,
tg_message_id: int,
tg_reply_to: telegram.Message = None):
result = '''I'm a relay bot between qq and tg.
Please use "!!show commands" or "!!cmd" to show all commands.
'''
global_vars.tg_bot.sendMessage(chat_id=tg_group_id,
text=result,
reply_to_message_id=tg_message_id,
parse_mode='HTML')
def setUp(self):
self.gm = GameManager()
self.chat0 = Chat(0, 'group')
self.chat1 = Chat(1, 'group')
self.chat2 = Chat(2, 'group')
self.user0 = User(0, 'user0')
self.user1 = User(1, 'user1')
self.user2 = User(2, 'user2')
def test_getMe(self):
data = self.mockbot.getMe()
self.assertIsInstance(data, User)
self.assertEqual(data.name, "@MockBot")
def test_standard(self):
u = self.iqg.get_inline_query()
self.assertIsInstance(u, Update)
self.assertIsInstance(u.inline_query, InlineQuery)
self.assertIsInstance(u.inline_query.from_user, User)
bot = Mockbot(username="testbot")
iqg2 = InlineQueryGenerator(bot=bot)
self.assertEqual(bot.username, iqg2.bot.username)
with self.assertRaises(BadBotException):
iqg3 = InlineQueryGenerator(bot="bot")
def test_chosen_inline_result(self):
u = self.iqc.get_chosen_inline_result("testid")
self.assertIsInstance(u, Update)
self.assertIsInstance(u.chosen_inline_result, ChosenInlineResult)
self.assertIsInstance(u.chosen_inline_result.from_user, User)
self.assertEqual(u.chosen_inline_result.result_id, "testid")
with self.assertRaisesRegexp(AttributeError, "chosen_inline_result"):
self.iqc.get_chosen_inline_result()
def get_user(self, first_name=None, last_name=None, username=None,
id=None):
"""
Returns a telegram.User object with the optionally given name(s) or username
If any of the arguments are omitted the names will be chosen randomly and the
username will be generated as first_name + last_name.
Args:
first_name (Optional[str]): First name for the returned user.
last_name (Optional[str]): Lst name for the returned user.
username (Optional[str]): Username for the returned user.
Returns:
telegram.User: A telegram user object
"""
if not first_name:
first_name = random.choice(self.FIRST_NAMES)
if not last_name:
last_name = random.choice(self.LAST_NAMES)
if not username:
username = first_name + last_name
return User(
id or self.gen_id(),
first_name,
last_name=last_name,
username=username)
def handler_wrapper(func):
def wrap(self, _, update, *args, **kwargs):
assert isinstance(User.query, Query)
assert isinstance(update.message, Message)
tguser = update.message.from_user
assert isinstance(tguser, TgUser)
user = User.query.filter(User.tg_id == tguser.id).one_or_none()
now = datetime.now()
if not user:
user = User(
tg_id=tguser.id,
first_name=tguser.first_name,
last_name=tguser.last_name or '',
username=tguser.username,
created_at=now,
last_active_at=now,
)
db.session.add(user)
db.session.commit()
else:
user.first_name = tguser.first_name
user.last_name = tguser.last_name or ''
user.username = tguser.username
user.last_active_at = now
user.is_active = True
user.update = update
user.message = update.message
try:
func(self, user, *args, **kwargs)
except Flow:
pass
db.session.commit()
return wrap
def __init__(self, id, text, chat_id, reply_id=None):
from_user = telegram.User(
id=1, first_name="He", last_name="Man", username="heman")
reply_to_user = telegram.User(
id=2, first_name="She", last_name="Ra", username="shera")
chat = telegram.Chat(id=chat_id, type="group")
reply_to_message = reply_id and telegram.Message(reply_id, reply_to_user, None, chat)
super().__init__(
message_id=id,
date=datetime.now(),
text=text,
chat=chat,
from_user=from_user,
reply_to_message=reply_to_message,
)
def setUp(self):
with mock.patch("telegram.bot.Bot.set_webhook", callable=mock.MagicMock()):
with mock.patch("kik.api.KikApi.set_configuration", callable=mock.MagicMock()):
with mock.patch("messengerbot.MessengerClient.subscribe_app", callable=mock.MagicMock()):
with mock.patch("telegram.bot.Bot.get_me", callable=mock.MagicMock()) as mock_get_me:
user_dict = {'username': u'Microbot_test_bot', 'first_name': u'Microbot_test', 'id': 204840063}
mock_get_me.return_value = User(**user_dict)
self.bot = factories.BotFactory()
self.telegram_webhook_url = reverse('permabots:telegrambot', kwargs={'hook_id': self.bot.telegram_bot.hook_id})
self.kik_webhook_url = reverse('permabots:kikbot', kwargs={'hook_id': self.bot.kik_bot.hook_id})
self.messenger_webhook_url = reverse('permabots:messengerbot', kwargs={'hook_id': self.bot.messenger_bot.hook_id})
self.telegram_update = factories.TelegramUpdateLibFactory()
self._create_kik_api_message()
self._create_messenger_api_message()
self.kwargs = {'content_type': 'application/json', }
def send_from_tg_to_qq(forward_index: int,
message: list,
tg_group_id: int,
tg_user: telegram.User=None,
tg_forward_from: telegram.Message=None,
tg_reply_to: telegram.Message=None,
edited: bool=False,
auto_escape: bool=True) -> int:
"""
send message from telegram to qq
:param forward_index: forward group index
:param message: message in cq-http-api like format
:param tg_group_id: telegram group id
:param tg_user: telegram user who send this message
:param tg_forward_from: who the message is forwarded from
:param tg_reply_to: who the message is replied to
:param edited: the status of edition
:param auto_escape: if contain coolq code, pass False
:return: qq message id
"""
logger.debug("tg -> qq: " + str(message))
sender_name = get_full_user_name(tg_user)
forward_from = get_forward_from(tg_forward_from)
reply_to = get_reply_to(tg_reply_to, forward_index)
if edited: # if edited, add edit mark
edit_mark = ' E ' # ' ? '
else:
edit_mark = ''
message_attribute = sender_name + reply_to + forward_from + edit_mark + ': '
if sender_name: # insert extra info at beginning
message.insert(0, {
'type': 'text',
'data': {'text': message_attribute}
})
if FORWARD_LIST[forward_index].get('QQ'):
return global_vars.qq_bot.send_group_msg(group_id=FORWARD_LIST[forward_index]['QQ'],
message=message,
auto_escape=auto_escape)['message_id']
if FORWARD_LIST[forward_index].get('DISCUSS'):
return global_vars.qq_bot.send_discuss_msg(discuss_id=FORWARD_LIST[forward_index]['DISCUSS'],
message=message,
auto_escape=auto_escape)['message_id']