def eight_ball():
response = [
"It is certain",
"It is decidedly so",
"Without a doubt",
"Yes, definitely",
"You may rely on it",
"As I see it, yes",
"Most likely",
"Yes",
"Signs point to yes",
"Don't count on it",
"My reply is no",
"My sources say no",
"Very doubtful"]
index = randint(0,12)
return response[index]
python类response()的实例源码
def authenticate_with_apikey(self, api_key, scope=None):
"""perform authentication by api key and store result for execute_request method
api_key -- secret api key from account settings
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "client_credentials",
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None):
"""perform authentication by contact credentials and store result for execute_request method
username -- typically a contact email
password -- contact password
scope -- optional scope of authentication request. If None full list of API scopes will be used.
"""
scope = "auto" if scope is None else scope
data = {
"grant_type": "password",
"username": username,
"password": password,
"scope": scope
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def test_urllib(self):
"""
Tests that urllib isn't changed from under our feet. (This might not
even be a problem?)
"""
from future import standard_library
import urllib
orig_file = urllib.__file__
with standard_library.hooks():
import urllib.response
self.assertEqual(orig_file, urllib.__file__)
def test_urllib_imports_moves(self):
import future.moves.urllib
import future.moves.urllib.parse
import future.moves.urllib.request
import future.moves.urllib.robotparser
import future.moves.urllib.error
import future.moves.urllib.response
self.assertTrue(True)
def test_urllib_imports_install_aliases(self):
with standard_library.suspend_hooks():
standard_library.install_aliases()
import urllib
import urllib.parse
import urllib.request
import urllib.robotparser
import urllib.error
import urllib.response
self.assertTrue(True)
def test_urllib_imports_cm(self):
with standard_library.hooks():
import urllib
import urllib.parse
import urllib.request
import urllib.robotparser
import urllib.error
import urllib.response
self.assertTrue(True)
def test_urllib_imports_install_hooks(self):
standard_library.remove_hooks()
standard_library.install_hooks()
import urllib
import urllib.parse
import urllib.request
import urllib.robotparser
import urllib.error
import urllib.response
self.assertTrue(True)
def execute_request(self, api_url, api_request_object=None, method=None):
"""
perform api request and return result as an instance of ApiObject or list of ApiObjects
api_url -- absolute or relative api resource url
api_request_object -- any json serializable object to send to API
method -- HTTP method of api request. Default: GET if api_request_object is None else POST
"""
if self._token is None:
raise ApiException("Access token is not abtained. "
"Call authenticate_with_apikey or authenticate_with_contact_credentials first.")
if not api_url.startswith("http"):
api_url = self.api_endpoint + api_url
if method is None:
if api_request_object is None:
method = "GET"
else:
method = "POST"
request = urllib.request.Request(api_url, method=method)
if api_request_object is not None:
request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()
request.add_header("Content-Type", "application/json")
request.add_header("Accept", "application/json")
request.add_header("Authorization", "Bearer " + self._get_access_token())
try:
response = urllib.request.urlopen(request)
return WaApiClient._parse_response(response)
except urllib.error.HTTPError as httpErr:
if httpErr.code == 400:
raise ApiException(httpErr.read())
else:
raise
def _refresh_auth_token(self):
data = {
"grant_type": "refresh_token",
"refresh_token": self._token.refresh_token
}
encoded_data = urllib.parse.urlencode(data).encode()
request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
request.add_header("ContentType", "application/x-www-form-urlencoded")
auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
request.add_header("Authorization", 'Basic ' + auth_header)
response = urllib.request.urlopen(request)
self._token = WaApiClient._parse_response(response)
self._token.retrieved_at = datetime.datetime.now()
def test_future_moves(self):
"""
Ensure everything is available from the future.moves interface that we
claim and expect. (Issue #104).
"""
from future.moves.collections import Counter, OrderedDict # backported to Py2.6
from future.moves.collections import UserDict, UserList, UserString
from future.moves import configparser
from future.moves import copyreg
from future.moves.itertools import filterfalse, zip_longest
from future.moves import html
import future.moves.html.entities
import future.moves.html.parser
from future.moves import http
import future.moves.http.client
import future.moves.http.cookies
import future.moves.http.cookiejar
import future.moves.http.server
from future.moves import queue
from future.moves import socketserver
from future.moves.subprocess import check_output # even on Py2.6
from future.moves.subprocess import getoutput, getstatusoutput
from future.moves.sys import intern
from future.moves import urllib
import future.moves.urllib.error
import future.moves.urllib.parse
import future.moves.urllib.request
import future.moves.urllib.response
import future.moves.urllib.robotparser
try:
# Is _winreg available on Py2? If so, ensure future.moves._winreg is available too:
import _winreg
except ImportError:
pass
else:
from future.moves import winreg
from future.moves import xmlrpc
import future.moves.xmlrpc.client
import future.moves.xmlrpc.server
from future.moves import _dummy_thread
from future.moves import _markupbase
from future.moves import _thread
def logging_consent(message):
try:
logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
for i in logging_consent:
if i == "":
raise FileNotFoundError
except:
try:
os.mkdir("{}".format(message.server))
except:
pass
logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"w",encoding="utf-8")
logging_consent.write("True")
logging_consent.close()
logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
logging_chat = logging_consent.read()
logging_consent.close()
if logging_chat == "True":
chatlog = open("{0}/{0}_chatlog.txt".format(message.server),"a",encoding="utf-8")
try:
message.content += (" " + str(message.attachments[0]["url"]))
url = str(message.attachments[0]["url"])
file_name = url.split('/')[-1]
response = urllib.request.urlopen("{}".format(url))
#Discord doesn't allow me to download files?
response_temp = response.read()
except:
pass
print("[{}]{}:{}".format(message.server,message.author.name,message.content))
try:
start = int("1f1e6", 16)
end = int("1f93f", 16)
emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))
except:
pickle.dump({},open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))
for i in range(start,end):
emoji_dict[i] = 0
"""
unicode_message_content = str.encode(message.content,"utf-8")
print(unicode_message_content)
if bytes(128515) in unicode_message_content:
print("PogChamp")
print(bytes(128515))
print(bytes("64",encoding="utf-8"))
for i in range(start,end):#still doesn't work
temp = int(str.lower(hex(i)),16)
if bytes(temp) in unicode_message_content:
emoji_dict[temp] += 1
print(emoji_dict)
#print(emoji_dict["0x1f604"])
pickle.dump(emoji_dict,open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
"""
chatlog.write("[" +str(message.timestamp)[0:19]+ "]"+ message.author.name + ":" + message.content + "\n")
chatlog.close()
def play_sound_tag(server_id, voice_channel_id, sound_tag_name, user_id):
"""Plays the sound from the given sound tag if it is available."""
try:
if servermanager.is_muted(server_id, voice_channel_id):
raise bot_exception(EXCEPT_TYPE, "The bot is muted in this voice channel")
except KeyError:
raise bot_exception(EXCEPT_TYPE, "You are not in a voice channel (are you perhaps on a different server?)")
sound_tag_data = get_sound_tag_data(server_id, sound_tag_name)
check_sound_tag_access(server_id, sound_tag_data, user_id, need_owner=False)
update_sound_tag(server_id, sound_tag_name, increment_hits=True) # Increment hits
from jshbot.botmanager import client
from jshbot.botmanager import voice_player
global timeout_goal
channel = botmanager.get_voice_channel(server_id, voice_channel_id)
if client.voice == None or client.voice.channel != channel or not client.voice.is_connected(): # Connect to channel
if client.voice: # Disconnect from old channel
await client.voice.disconnect()
client.voice = await client.join_voice_channel(channel)
voice_player.server_id = server_id
if voice_player.player is not None and voice_player.player.is_playing(): # Stop if playing
voice_player.player.stop()
if sound_tag_data['type'] == 'YouTube':
# To prevent playlist downloads
# 'noplaylist': True
voice_player.player = await client.voice.create_ytdl_player(sound_tag_data['url'])
else: # Direct download (or stream? Not sure)
try:
# One day, I will figure out how to stream this crap. But today is not that day.
#response = urllib.request.urlopen(sound_tag_data['url'])
#voice_player.player = client.voice.create_ffmpeg_player(response, use_avconv=True)
urllib.request.urlretrieve (sound_tag_data['url'], '{}/tempsound'.format(configmanager.data_directory))
voice_player.player = client.voice.create_ffmpeg_player('{}/tempsound'.format(configmanager.data_directory))
except:
raise bot_exception(EXCEPT_TYPE, "An error occurred when downloading the sound file")
voice_player.player.start()
timeout = configmanager.config['voice_timeout']
if timeout >= 0:
timeout_reset_lock.acquire()
timeout_goal = time.time() + ((timeout*60) if timeout > 0 else (sound_tag_data['length']+1))
timeout_reset_lock.release()
await timeout_disconnect()