def start_bot():
my_bot = Updater(settings.TELEGRAM_API_KEY)
dp = my_bot.dispatcher
dp.add_handler(CommandHandler("start", reply_to_start_command))
conv_handler = ConversationHandler(
entry_points=[RegexHandler('^(????????? ??????)$', start_anketa, pass_user_data=True)],
states={
"name": [MessageHandler(Filters.text, get_name, pass_user_data=True)],
"attitude": [RegexHandler('^(1|2|3|4|5)$', attitude, pass_user_data=True)],
"understanding": [RegexHandler('^(1|2|3|4|5)$', understanding, pass_user_data=True)],
"comment": [MessageHandler(Filters.text, comment, pass_user_data=True),
CommandHandler('skip', skip_comment, pass_user_data=True)],
},
fallbacks=[MessageHandler(Filters.text, dontknow, pass_user_data=True)]
)
dp.add_handler(conv_handler)
my_bot.start_polling()
my_bot.idle()
python类text()的实例源码
def register_command(self, commands, callback, allow_edited=False):
"""Registers commands handler
Args:
commands(list|tuple): list of commands to register
callback(function): callable object to execute
allow_edited(Optional[bool]): pass edited messages
Raises:
ValueError: if one of commands in ``commands`` was already registered
"""
for command in commands:
self._register_command(command)
@utils.log(logger, print_ret=False)
def process_update(bot, update):
lang = utils.get_lang(self._storage, update.effective_user)
callback(update.effective_message,
update.effective_message.text.split(' ')[1:], lang)
self._dispatcher.add_handler(CommandHandler(commands, process_update,
allow_edited=allow_edited))
def _bad_keyword_detect(self, text):
keywords = (
'??',
'??',
'??'
)
bad_words = (
'??',
'??',
'??', # ??
'??',
'??', # ??
)
for i in keywords:
if i in text:
for j in bad_words:
if j in text:
return True
return False
def filter(self, message):
# First filter by weekday: we don't work on weekends
date = TZ.normalize(message.date.replace(tzinfo=pytz.utc))
if not is_workday(date):
return False
# Then filter by time: we work in range of [9.am, 18.pm]
if date.hour < START or date.hour >= END:
return False
# Then filter by message text
text = message.text
if self._bad_keyword_detect(text) or self._keyword_detect(text):
return True
return False
def workday_end_time(bot, update):
bot.sendChatAction(
chat_id=update.message.chat_id, action=ChatAction.TYPING
)
text_templates = (
'{}?????????????',
'?????{}',
'??{}????',
'??????{}',
'???{}????',
'????{}',
'??????: {}',
)
now = TZ.normalize(update.message.date.replace(tzinfo=pytz.utc))
end_time = TZ.localize(datetime(now.year, now.month, now.day, hour=18))
duration = end_time - now
hour = duration.seconds // 3600
minute = (duration.seconds % 3600) // 60
time_remaining = ' {} ??'.format(hour) if hour else ''
time_remaining += ' {} ??'.format(minute)
text = random.choice(text_templates).format(time_remaining)
update.message.reply_text(text, quote=True)
def send_callback_keyboard(self, bot, chat_id, text, buttons, action=None):
keyboard = []
for button in buttons:
keyboard.append([button])
keyboard.append([InlineKeyboardButton("cancel", callback_data="null")])
markup = InlineKeyboardMarkup(keyboard)
try:
message_id = self._sent_keyboard_message_ids[chat_id]
message = bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text, reply_markup=markup)
except KeyError:
message = bot.send_message(chat_id=chat_id, text=text, reply_markup=markup)
self._sent_keyboard_message_ids[chat_id] = message.message_id
if action:
self._sent_keyboard[chat_id] = action
def admin_command(self, bot, update):
chat_id = update.message.chat_id
admin_chat_id = config.get_state(_admin_key)
if not admin_chat_id:
config.save_state(_admin_key, chat_id)
config.save_secrets()
text = "You're admin now!"
elif chat_id == admin_chat_id:
text = "You already are admin!"
else:
bot.send_message(text="There can be only one!", chat_id=chat_id)
return
text += "\nHidden comamnds:\n/" + "\n/".join(admin_commands)
bot.send_message(text=text, chat_id=chat_id)
# Password protected commands
def set_password_command(self, bot, update):
chat_id = update.message.chat_id
if not config.get_state(_password_enabled_key):
bot.send_message(chat_id=chat_id, text="Please enable password protection with /togglepassword first")
return
split = update.message.text.split(" ")
if len(split) < 2:
bot.send_message(chat_id=chat_id, text="Usage: /setpassword [password]")
return
password = split[1].strip()
if not password:
bot.send_message(chat_id=chat_id, text="Password can't be empty")
return
global _password
_password = password
_clients.add(User(user=update.message.from_user))
bot.send_message(chat_id=chat_id, text="Successfully changed password")
def ban_user_command(self, bot, update):
chat_id = update.message.chat_id
if not config.get_state(_password_enabled_key):
bot.send_message(chat_id=chat_id, text="Password is disabled")
return
if not _clients:
bot.send_message(chat_id=chat_id, text="No clients logged in")
return
text = "Who should be banned?"
keyboard_items = list(
map(lambda client: InlineKeyboardButton(text=client.name, callback_data=str(client.user_id)), _clients))
@decorators.callback_keyboard_answer_handler
def _action(chat_id, data):
global _clients
global _password
_clients = set(filter(lambda client: str(client.user_id) != data, _clients))
_password = None
return "Banned user. Please set a new password."
self.send_callback_keyboard(bot, chat_id, text, keyboard_items, _action)
def echo(bot, update):
global max_
if update.message.from_user.username == "andremesquita96":
user_message = update.message.text
if randint(1, max_) == 3:
if '?' in user_message:
bot.sendMessage(update.message.chat_id, text='{}, {}'.format(
'Responde', choice(automatic_swearwords)))
else:
bot.sendMessage(update.message.chat_id, text='{}, {}'.format(
user_message, choice(automatic_swearwords)))
if max_ < 6:
max_ += 1
else:
max_ = 3
def get_short_url(long_url: str):
"""
generate short url using Sina Weibo api
:param long_url: the original url
:return: short url
"""
# change long url to `t.cn` short url
sina_api_prefix = 'http://api.t.sina.com.cn/short_url/shorten.json?source=3271760578&url_long='
try:
r = requests.get(sina_api_prefix + long_url)
obj = json.loads(r.text)
short_url = obj[0]['url_short']
return short_url
# when error occurs, return origin long url
except:
traceback.print_exc()
return long_url
def audio_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '[ ?? ]'}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def get_location_from_baidu(latitude: Union[float, str],
longitude: Union[float, str]):
params = (
('callback', 'renderReverse'),
('location', str(latitude) + ',' + str(longitude)),
('output', 'json'),
('pois', '1'),
('ak', BAIDU_API),
)
result = requests.get('http://api.map.baidu.com/geocoder/v2/',
params=params)
result = result.text.replace('renderReverse&&renderReverse(', '')[:-1]
result_json = json.loads(result)
if result_json['status'] == 0:
return result_json['result']['formatted_address']
else:
return 'Baidu API returned an error code: ' + str(result_json['status'])
def location_from_telegram(bot: telegram.Bot,
update: telegram.Update):
message: telegram.Message = update.message
tg_group_id = message.chat_id # telegram group id
forward_index = get_forward_index(tg_group_id=tg_group_id)
latitude = message.location.latitude
longitude = message.location.longitude
reply_entity = list()
reply_entity.append({
'type': 'text',
'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)}
})
qq_message_id = send_from_tg_to_qq(forward_index,
message=reply_entity,
tg_group_id=tg_group_id,
tg_user=message.from_user,
tg_forward_from=message,
tg_reply_to=message.reply_to_message)
global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def main():
# Create the EventHandler and pass it your bot's token.
updater = Updater(api_key, workers=64)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on noncommand i.e message - return not found
dp.add_handler(MessageHandler(Filters.command, maintenance))
dp.add_handler(MessageHandler(Filters.text, maintenance))
# log all errors
dp.add_error_handler(error)
# Start the Bot
#updater.start_polling()
updater.start_webhook(listen='127.0.0.1', port=int(listen_port), url_path=api_key)
updater.bot.setWebhook('https://{0}/{1}'.format(domain, api_key))
# Run the bot until the you presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
def broadcast(bot, update):
info_log(update)
bot = Bot(api_key)
# list users from MySQL
accounts_list = mysql_select_accounts_balances()
# some users are bugged & stop broadcast - they deleted chat with bot. So we blacklist them
BLACK_LIST = mysql_select_blacklist()
for account in accounts_list:
# if not in blacklist and has balance
if ((account[0] not in BLACK_LIST) and (int(account[1]) > 0)):
mysql_set_blacklist(account[0])
print(account[0])
push_simple(bot, account[0], update.message.text.replace('/broadcast ', ''))
sleep(0.2)
mysql_delete_blacklist(account[0]) # if someone deleted chat, broadcast will fail and he will remain in blacklist
# bootstrap
def price_text(bot, update):
user_id = update.message.from_user.id
chat_id = update.message.chat_id
lang_id = mysql_select_language(user_id)
price = mysql_select_price()
last_price_merc = ('%.8f' % (float(price[0][0]) / (10 ** 8)))
ask_price_merc = ('%.8f' % (float(price[0][3]) / (10 ** 8)))
bid_price_merc = ('%.8f' % (float(price[0][4]) / (10 ** 8)))
last_price_grail = ('%.8f' % (float(price[1][0]) / (10 ** 8)))
ask_price_grail = ('%.8f' % (float(price[1][3]) / (10 ** 8)))
bid_price_grail = ('%.8f' % (float(price[1][4]) / (10 ** 8)))
last_price_flip = ('%.8f' % (float(price[2][0]) / (10 ** 8)))
ask_price_flip = ('%.8f' % (float(price[2][3]) / (10 ** 8)))
bid_price_flip = ('%.8f' % (float(price[2][4]) / (10 ** 8)))
high_price = ('%.8f' % (max(float(price[0][1]), float(price[1][1]), float(price[0][0])) / (10 ** 8)))
low_price = ('%.8f' % (min(float(price[0][2]), float(price[1][2])) / (10 ** 8)))
volume = int(price[0][5]) + int(price[1][5]) + int(price[2][5])
volume_btc = ('%.2f' % ((float(price[0][6]) + float(price[1][6])) / (10 ** 8)))
text = lang_text('price', lang_id).format(last_price_merc, ask_price_merc, bid_price_merc, last_price_grail, ask_price_grail, bid_price_grail, high_price, low_price, "{:,}".format(volume), volume_btc, last_price_flip, ask_price_flip, bid_price_flip)
lang_keyboard(lang_id, bot, chat_id, text)
sleep(1)
message_markdown(bot, chat_id, lang_text('price_options', lang_id))
def main():
logger.info("Loading handlers for telegram bot")
# Default dispatcher (this is related to the first bot in settings.TELEGRAM_BOT_TOKENS)
dp = DjangoTelegramBot.dispatcher
# To get Dispatcher related to a specific bot
# dp = DjangoTelegramBot.getDispatcher('BOT_n_token') #get by bot token
# dp = DjangoTelegramBot.getDispatcher('BOT_n_username') #get by bot username
# on different commands - answer in Telegram
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", help))
dp.add_handler(CommandHandler("startgroup", startgroup))
dp.add_handler(CommandHandler("me", me))
dp.add_handler(CommandHandler("chat", chat))
dp.add_handler(MessageHandler(Filters.forwarded , forwarded))
# on noncommand i.e message - echo the message on Telegram
dp.add_handler(MessageHandler(Filters.text, echo))
# log all errors
dp.add_error_handler(error)
def start(bot, update, args):
message = update.message
user = message.from_user
res = False
link = "http://www.teamspeak.com/invite/%s/" % TS_HOST
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton(_('Connect TS'), url=link)]])
if args:
res = utils.validate_invitation_token(token=args[0], user_id=user.id, name=user.first_name)
if res:
text = _("Welcome %s you're now activated, using /who or /ts you can check who's in teamspeak.") % \
user.first_name
elif utils.is_allow(user.id):
text = _("Hello %s, using /who or /ts you can check who's in teamspeak.") % user.first_name
else:
reply_markup = None
text = _("Welcome, ask admin to generate an invitation link by /generate")
bot.sendMessage(message.chat_id, text, reply_to_message_id=message.message_id, reply_markup=reply_markup)
def button(self, bot, update):
query = update.callback_query
bot.editMessageText(text="Capturing image of topic: %s" % query.data,
chat_id=query.message.chat_id,
message_id=query.message.message_id)
img_file_path = self.get_image(image_topic=query.data)
bot.editMessageText(text="Uploading captured image of topic: %s" % query.data,
chat_id=query.message.chat_id,
message_id=query.message.message_id)
bot.send_photo(chat_id=query.message.chat_id,
photo=open(img_file_path, 'rb'),
caption="This is what I see on topic " +
query.data)
# update.message.reply_photo(photo=open(img_file_path, 'rb'),
# caption="This is what I see on topic " +
# query.data)
def __init__(self):
token = rospy.get_param('/telegram/token', None)
if token is None:
rospy.logerr("No token found in /telegram/token param server.")
exit(0)
else:
rospy.loginfo("Got telegram bot token from param server.")
# Set CvBridge
self.bridge = CvBridge()
# Create the EventHandler and pass it your bot's token.
updater = Updater(token)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on noncommand i.e message - echo the message on Telegram
dp.add_handler(MessageHandler(Filters.text, self.pub_received))
# log all errors
dp.add_error_handler(self.error)
# Start the Bot
updater.start_polling()
def pub_received(self, bot, update):
rospy.loginfo("Received: " + str(update))
valid_necessary_words = ['what do you see',
'picture',
'camera']
found_word = False
for v in valid_necessary_words:
if v in update.message.text.lower():
img_file_path = self.get_image()
update.message.reply_photo(photo=open(img_file_path, 'rb'),
caption="This is what I see!")
found_word = True
break
if not found_word:
update.message.reply_text("Try any of: " +
str(valid_necessary_words))
def main(token):
# Create the EventHandler and pass it your bot's token.
updater = Updater(token)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on different commands - answer in Telegram
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", help))
# on noncommand i.e message - echo the message on Telegram
dp.add_handler(MessageHandler(Filters.text, echo))
# log all errors
dp.add_error_handler(error)
# Start the Bot
updater.start_polling()
# Run the bot until the you presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
def __init__(self):
self.base_pub = rospy.Publisher("/base_controller/command", Twist,
queue_size=1)
token = rospy.get_param('/telegram/token', None)
# Create the Updater and pass it your bot's token.
updater = Updater(token)
# Add command and error handlers
updater.dispatcher.add_handler(CommandHandler('start', self.start))
updater.dispatcher.add_handler(CommandHandler('help', self.help))
updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo))
updater.dispatcher.add_error_handler(self.error)
# Start the Bot
updater.start_polling()
def __init__(self):
self.history = {}
self.updater = Updater(TOKEN)
self.name = str(self).split(' ')[-1][:-1]
self.dp = self.updater.dispatcher
self.dp.add_handler(CommandHandler("start", start))
self.dp.add_handler(CommandHandler("help", help))
self.dp.add_handler(MessageHandler([Filters.text], echo))
self.dp.add_error_handler(error)
self.stories = StoriesHandler()
logger.info('I\'m alive!')
def echo(bot, update):
text = update.message.text
md = mess2dict(update.message)
try:
sender_fname = md['from']['first_name']
sender_lname = md['from']['last_name']
except:
sender_fname = str(md['from']['id'])
sender_lname = ""
logger.info("{} {} says: {}".format(sender_fname, sender_lname, text))
sender_id = str(md['from']['id'])
msg_id = str(md["message_id"])
if text:
ai.history[sender_id]['context'].append(text)
rep = ai.history[sender_id]["state_tracker"].get_reply(text)
ai.history[sender_id]['replies'].append(rep)
logger.info('bot replies: {}'.format(rep))
bot_send_message(bot, update, rep)
def tag(bot, update, chat_id=None):
if chat_id == None:
chat_id = update.message.chat_id
else:
pass
try:
tags = update.message.text.split(' ', 1)[1]
except Exception as e:
try:
tags = lastcmd[update.callback_query.message.chat_id].split(' ', 1)[1]
except:
bot.sendMessage(chat_id, "Please, use `/tag <tag>` instead.", parse_mode=ParseMode.MARKDOWN)
pages = "30"
try:
lastcmd[update.message.chat_id]=update.message.text
except Exception as e:
pass
noparser(bot, update, tags=tags, pages=pages, chat_id=chat_id)
if update.message != None:
botanio(bot, update, message=update.message, uid=chat_id, event_name="/tag")
else:
botanio(bot, update, message = None, uid=chat_id, event_name="/tag")
def text(bot, update):
if update.message.text == "Anime": #Keyboard handler
anime(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Hentai (18+)":
hentai(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Uncensored (18+)":
uncensored(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Yuri (18+)":
yuri(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Loli (18+)":
loli(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Ecchi":
ecchi(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Neko":
neko(bot, update, chat_id=update.message.chat_id)
elif update.message.text == "Wallpaper":
wallpaper(bot, update, chat_id=update.message.chat_id)
else:
pass
def handle_message(self, bot, update):
print("Received", update.message)
chat_id = update.message.chat_id
if update.message.text == "/start":
# ???? ???????? ??????? /start, ???????? ??? ? ?????? -- ???
# ????? ??????? ????????? ???????? ??????, ???? ??? ????
self.handlers.pop(chat_id, None)
if chat_id in self.handlers:
# ???? ?????? ??? ?????, ?? ???? ???????????? .send(), ?????
# ???????? ? ????????? ????? ????????????
try:
answer = self.handlers[chat_id].send(update.message)
except StopIteration:
# ???? ??? ???? ????????? ?????????? -- ??? ??????, ???????? ??????? ? ??????
del self.handlers[chat_id]
# (???????? ?????????, ???? ????? ????? ??????, ??? ???????????? ? ???? ???????)
return self.handle_message(bot, update)
else:
# ?????? ?????? ??????????. defaultdict ???????? ????? ????????? ??? ?????
# ??????, ? ?? ?????? ????? ??????? ?????? ????????? ? ??????? .next()
# (.send() ??????????? ?????? ????? ??????? yield)
answer = next(self.handlers[chat_id])
# ?????????? ?????????? ????? ????????????
print("Answer: %r" % answer)
self._send_answer(bot, chat_id, answer)
def test_help(self):
# this tests the help handler. So first insert the handler
def help(bot, update):
update.message.reply_text('Help!')
# Then register the handler with he updater's dispatcher and start polling
self.updater.dispatcher.add_handler(CommandHandler("help", help))
self.updater.start_polling()
# We want to simulate a message. Since we don't care wich user sends it we let the MessageGenerator
# create random ones
update = self.mg.get_message(text="/help")
# We insert the update with the bot so the updater can retrieve it.
self.bot.insertUpdate(update)
# sent_messages is the list with calls to the bot's outbound actions. Since we hope the message we inserted
# only triggered one sendMessage action it's length should be 1.
self.assertEqual(len(self.bot.sent_messages), 1)
sent = self.bot.sent_messages[0]
self.assertEqual(sent['method'], "sendMessage")
self.assertEqual(sent['text'], "Help!")
# Always stop the updater at the end of a testcase so it won't hang.
self.updater.stop()