def test_requires_usergroup_acc(self):
"""
Test requires usergroup decorator if the user has access
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock:
user_mock = user_mock.return_value
user_mock.has_acces.return_value = True
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
return True
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
called = my_command_handler(bot_mock, update_mock)
self.assertTrue(called)
python类Update()的实例源码
def test_requires_usergroup_self(self):
"""
Test requires usergroup decorator with self as first argument.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock:
user_mock = user_mock.return_value
user_mock.has_acces.return_value = True
@ownbot.auth.requires_usergroup("foo")
def my_command_handler(self, bot, update):
"""Dummy command handler"""
print(self, bot, update)
return True
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
called = my_command_handler(None, bot_mock, update_mock)
self.assertTrue(called)
def test_assign_first_to(self):
"""
Test assign first to decorator.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = True
@ownbot.auth.assign_first_to("foo")
def my_command_handler(bot, update):
"""Dummy command handler"""
print(bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertTrue(user_mock.save.called)
def test_assign_first_to_with_self(self):
"""
Test assign first to decorator with self as first argument.
"""
with patch("ownbot.auth.User") as user_mock,\
patch("test_auth.Update") as update_mock,\
patch("ownbot.auth.UserManager") as usrmgr_mock:
user_mock = user_mock.return_value
usrmgr_mock.return_value.group_is_empty.return_value = True
@ownbot.auth.assign_first_to("foo")
def my_command_handler(self, bot, update):
"""Dummy command handler"""
print(self, bot, update)
bot_mock = Mock(spec=Bot)
update_mock = Update(1337)
my_command_handler(None, bot_mock, update_mock)
self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
self.assertTrue(user_mock.save.called)
def admin_access():
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def create_telegram_update(message_text):
""" Helper function: create an "Update" to simulate a message sent to the
Telegram bot.
"""
from datetime import datetime
message = telegram.Message(
message_id=0,
from_user=telegram.User(0, 'greenkey'),
date=datetime.now(),
chat=telegram.Chat(0, ''),
text=message_text
)
return telegram.Update(
update_id=0,
message=message
)
def check_update(self, update):
if (isinstance(update, Update)
and (update.message or update.edited_message and self.allow_edited)):
message = update.message or update.edited_message
if message.text:
command = message.text[1:].split(' ')[0].split('@')
command.append(
message.bot.username) # in case the command was send without a username
if self.filters is None:
res = True
elif isinstance(self.filters, list):
res = any(func(message) for func in self.filters)
else:
res = self.filters(message)
return res and (message.text.startswith('/') and command[0] == self.command
and command[1].lower() == message.bot.username.lower())
else:
return False
else:
return False
def admin_access(admins_list=None):
def access(func):
@wraps(func)
def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
user = update.effective_user
admins = None
if admins_list is None:
admins = getattr(self, 'admins', None)
if admins is None:
self.logger.warning('Specify self.admins (list of users ids) parameter in '
'manager or channel classes in order to restrict access to bot.')
return func(self, bot, update, *args, **kwargs)
if user.id not in admins:
self.logger.info(f"Unauthorized access denied for {user}.")
return
return func(self, bot, update, *args, **kwargs)
return wrapped
return access
def command_accept_choice(self, bot: Bot, update: Update):
query = update.callback_query
update.message = query.message # because callback update doesn't have message at all
chat_id = update.message.chat_id
chosen_channel = self.chosen_channels.get(chat_id, None)
if chosen_channel:
chosen_channel.remove_commands_handlers(chat_id)
chosen_channel: BaseChannel = self.channels[query.data]
self.chosen_channels[chat_id] = chosen_channel
chosen_channel.add_commands_handlers(chat_id)
bot.edit_message_text(text=f'Chose {query.data} ({chosen_channel.channel_id}).',
chat_id=query.message.chat_id,
message_id=query.message.message_id)
help = chosen_channel.help_text()
update.message.reply_text('```\n' + help + '\n```', parse_mode=ParseMode.MARKDOWN)
def distribute(self, bot: Bot, update: Update, args):
if args.help:
self.send_code(update, args.help)
elif args.command == 'add':
usage = self.subparsers['add'].format_usage()
self.add(bot, update, args.tags, usage)
elif args.command in ['remove', 'delete']:
usage = self.subparsers['remove'].format_usage()
self.remove(bot, update, args.tags, usage)
elif args.command == 'show':
self.show(bot, update)
else:
self.logger.error('Bad args: ' + str(args))
raise Exception # will never get this far (hopefully)
def distribute(self, bot: Bot, update: Update, args):
if args.help:
self.send_code(update, args.help)
elif args.command == 'add':
usage = self.subparsers['add'].format_usage()
self.add(bot, update, args.keys, usage)
elif args.command in ['remove', 'delete']:
usage = self.subparsers['remove'].format_usage()
self.remove(bot, update, args.keys, usage)
elif args.command == 'show':
self.show(bot, update)
else:
self.logger.error('Bad args: ' + str(args))
raise Exception # will never get this far (hopefully)
def add(self, bot: Bot, update: Update, args: List[str], usage: str):
if len(args) != 2:
self.send_code(update, usage)
return
subreddits = {}
name, score = args
if score.isdecimal():
score = int(score)
else:
self.send_code(update, usage)
return
subreddits[name] = score
self.store.add(subreddits)
self.show(bot, update)
_002_water_meter_control.py 文件源码
项目:coolq-telegram-bot
作者: jqqqqqqqqqq
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def add_keyword(bot: telegram.Bot,
update: telegram.Update,
args: list):
if update.message.chat_id not in global_vars.admin_list['TG']:
return
if len(args) == 0:
update.message.reply_text('Usage: /add_keyword keyword1 keyword2 ...')
return
for keyword in args:
logger.debug('keyword: ' + keyword)
if keyword in global_vars.filter_list['keywords']:
update.message.reply_text('Keyword: "' + keyword + '" already in list')
continue
global_vars.filter_list['keywords'].append(keyword)
update.message.reply_text('Done.')
save_data()
_002_water_meter_control.py 文件源码
项目:coolq-telegram-bot
作者: jqqqqqqqqqq
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def add_channel(bot: telegram.Bot,
update: telegram.Update):
if update.message.forward_from_chat:
if update.message.forward_from_chat.type != 'channel': # it seems forward_from_chat can only be channels
update.message.reply_text(
'Message type error. Please forward me a message from channel, or use /cancel to stop')
return
if update.message.forward_from_chat.id not in global_vars.filter_list['channels']:
global_vars.filter_list['channels'].append(update.message.forward_from_chat.id)
save_data()
update.message.reply_text('Okay, please send me another, or use /cancel to stop')
else:
update.message.reply_text('Already in list. Send me another or use /cancel to stop')
return CHANNEL
else:
if update.message.text == '/cancel':
update.message.reply_text('Done.')
return ConversationHandler.END
else:
update.message.reply_text(
'Message type error. Please forward me a message from channel, or use /cancel to stop')
return CHANNEL
def audio_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def location_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
latitude = message.location.latitude
longitude = message.location.longitude
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]:
"""
Decorator to check if the message comes from the correct chat_id
:param command_handler: Telegram CommandHandler
:return: decorated function
"""
def wrapper(*args, **kwargs):
bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1]
# Reject unauthorized messages
chat_id = int(_CONF['telegram']['chat_id'])
if int(update.message.chat_id) != chat_id:
logger.info('Rejected unauthorized message from: %s', update.message.chat_id)
return wrapper
logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id)
try:
return command_handler(*args, **kwargs)
except BaseException:
logger.exception('Exception occurred within Telegram module')
return wrapper
def _status_table(bot: Bot, update: Update) -> None:
"""
Handler for /status table.
Returns the current TradeThread status in table format
:param bot: telegram bot
:param update: message update
:return: None
"""
# Fetch open trade
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
if get_state() != State.RUNNING:
send_msg('*Status:* `trader is not running`', bot=bot)
elif not trades:
send_msg('*Status:* `no active order`', bot=bot)
else:
trades_list = []
for trade in trades:
# calculate profit and send message to user
current_rate = exchange.get_ticker(trade.pair)['bid']
trades_list.append([
trade.id,
trade.pair,
shorten_date(arrow.get(trade.open_date).humanize(only_distance=True)),
'{:.2f}'.format(100 * trade.calc_profit(current_rate))
])
columns = ['ID', 'Pair', 'Since', 'Profit']
df_statuses = DataFrame.from_records(trades_list, columns=columns)
df_statuses = df_statuses.set_index(columns[0])
message = tabulate(df_statuses, headers='keys', tablefmt='simple')
message = "<pre>{}</pre>".format(message)
send_msg(message, parse_mode=ParseMode.HTML)
def _balance(bot: Bot, update: Update) -> None:
"""
Handler for /balance
Returns current account balance per crypto
"""
output = ''
balances = [
c for c in exchange.get_balances()
if c['Balance'] or c['Available'] or c['Pending']
]
if not balances:
output = '`All balances are zero.`'
for currency in balances:
output += """*Currency*: {Currency}
*Available*: {Available}
*Balance*: {Balance}
*Pending*: {Pending}
""".format(**currency)
send_msg(output)
def _count(bot: Bot, update: Update) -> None:
"""
Handler for /count.
Returns the number of trades running
:param bot: telegram bot
:param update: message update
:return: None
"""
if get_state() != State.RUNNING:
send_msg('`trader is not running`', bot=bot)
return
trades = Trade.query.filter(Trade.is_open.is_(True)).all()
message = tabulate({
'current': [len(trades)],
'max': [_CONF['max_open_trades']]
}, headers=['current', 'max'], tablefmt='simple')
message = "<pre>{}</pre>".format(message)
logger.debug(message)
send_msg(message, parse_mode=ParseMode.HTML)
def _help(bot: Bot, update: Update) -> None:
"""
Handler for /help.
Show commands of the bot
:param bot: telegram bot
:param update: message update
:return: None
"""
message = """
*/start:* `Starts the trader`
*/stop:* `Stops the trader`
*/status [table]:* `Lists all open trades`
*table :* `will display trades in a table`
*/profit:* `Lists cumulative profit from all finished trades`
*/forcesell <trade_id>|all:* `Instantly sells the given trade or all trades, regardless of profit`
*/performance:* `Show performance of each finished trade grouped by pair`
*/count:* `Show number of trades running compared to allowed number of trades`
*/balance:* `Show account balance per currency`
*/help:* `This help message`
*/version:* `Show version`
"""
send_msg(message, bot=bot)
def helpme(bot, thing, arguments):
"""Visualizza il messaggio di aiuto di un comando.
Sintassi: `{symbol}helpme [comando]`"""
# Set status to typing
await status_typing(bot, thing)
# If no command is specified, show the help message for this command.
if len(arguments) == 0 or len(arguments) > 1:
await display_help(bot, thing, helpme)
return
# Check the list of telegram commands if the message was sent from Telegram
if isinstance(thing, telegram.Update):
if arguments[0] in b.commands:
await display_help(bot, thing, b.commands[arguments[0]])
else:
await answer(bot, thing, "? Il comando specificato non esiste.")
# Check the list of discord commands if the message was sent from Discord
if isinstance(thing, extradiscord.discord.Message):
if arguments[0] in d.commands:
await display_help(bot, thing, d.commands[arguments[0]])
else:
await answer(bot, thing, "? Il comando specificato non esiste.")
def job_updatelol(singletimeout=1, alltimeout=1800):
await d.client.wait_until_ready()
while True:
# Open a new database session
session = database.Session()
# Query all the LoL accounts
users = session.query(database.LoL).all()
# Update all the users' stats
for user in users:
updates = await user.update_data()
if updates:
# Send some info to Discord
await d.client.send_message(d.client.get_channel(247426005537390592), "Account aggiornato!", embed=user.generate_discord_embed())
await asyncio.sleep(singletimeout)
session.commit()
await asyncio.sleep(alltimeout)
def cmd_list_tokens(bot: telegram.Bot, update: telegram.Update):
cid = update.message.chat_id
if generic_checks(bot, update, "listtokens", "", 1):
# Extract value
text = update.message.text
cmd, tag = text.split(" ", 1)
q = Query()
tokens = tokens_db.search(q.poll_tag == tag)
msg = "There have been %i tokens generated for your poll:\n" % len(
tokens)
msg = msg + "\n".join(
["%i: %s" % (n + 1, t['token']) for n, t in enumerate(tokens)])
bot.sendMessage(cid, msg)
def cmd_vote(bot: telegram.Bot, update: telegram.Update):
cid = update.message.chat_id
uid = update.message.from_user.id
polls = get_polls(uid, bot)
if update.message.chat.type != "private":
bot.sendMessage(cid, text_private_chat_only)
return ConversationHandler.END
if len(polls) == 0:
bot.sendMessage(cid, "You aren't eligible to vote in any polls.")
else:
keyboard_choices = [p["tag"] + ": " + p["title"] for p in polls]
# keyboard array is a list of lists
# because each list represents a new row
# and we want each button on a separate row
keyboard_array = [[k, ] for k in keyboard_choices]
keyboard = ReplyKeyboardMarkup(keyboard_array,
one_time_keyboard=True)
bot.sendMessage(cid,
"Click the button for the poll you would like to vote in.",
reply_markup=keyboard)
return state_vote_1
def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str,
cmd_name: str, example: str):
if generic_checks(bot, update, cmd_name, example, 2):
# Extract value
text = update.message.text
cmd, tag, value = text.split(" ", 2)
# Apply edit and
edit_poll(tag, field_name, value)
# send "success" feedback message
new_p = get_poll(tag)
msg = text_edit_successful.format(p=new_p)
cid = update.message.chat_id
bot.sendMessage(cid, msg)
# Check if complete, and suggest to activate
if check_if_complete(tag):
msg2 = text_activate_suggestion.format(t=tag)
bot.sendMessage(cid, msg2, parse_mode="Markdown")
else:
# A generic check failed.
pass
return
def execute(self, chat_id, bot: Bot, update: Update):
handled = False
if update.message.text in self._buildings_dict:
handled = True
building_identifier = self._buildings_dict[update.message.text]
classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier)
keyboard_button = []
counter = 0
row = -1
for classroom in classrooms:
if counter == 0:
keyboard_button.append([])
row += 1
keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower()))
counter += 1
if counter == 3:
counter = 0
keyboard_button.append(["/buildings"])
reply_keyboard = ReplyKeyboardMarkup(keyboard_button)
bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard)
return handled
def help_about(bot: telegram.Bot, update: telegram.Update):
bot.send_message(
chat_id=update.message.chat_id,
text=strings.HELP_MESSAGE,
parse_mode='HTML',
disable_web_page_preview=True,
reply_markup=strings.HELP_MESSAGE_KEYBOARD
)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='help.about'
)
logging.info('Command help: about.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def settings_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_message(bot, chat_settings)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.main'
)
logging.info('Command settings: main.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})
def settings_voice_message(bot: telegram.Bot, update: telegram.Update):
chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
admin_name=update.message.from_user.first_name)
if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
send_settings_voice_message(bot, update.message.chat_id)
bot_types.Botan.track(
uid=update.message.chat_id,
message=update.message.to_dict(),
name='settings.voice.get'
)
logging.info('Command settings: voice.', extra={
'telegram': {
'update': update.to_dict(),
'chat_id': update.message.chat_id,
'message_id': update.message.message_id,
},
'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
})