python类response()的实例源码

general_commands.py 文件源码 项目:Basic-discord-bot 作者: TheWizoid 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def eight_ball():
    response = [
    "It is certain",
    "It is decidedly so",
    "Without a doubt",
    "Yes, definitely",
    "You may rely on it",
    "As I see it, yes",
    "Most likely",
    "Yes",
    "Signs point to yes",
    "Don't count on it",
    "My reply is no",
    "My sources say no",
    "Very doubtful"]
    index = randint(0,12)
    return response[index]
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def authenticate_with_apikey(self, api_key, scope=None):
        """perform authentication by api key and store result for execute_request method
        api_key -- secret api key from account settings
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "client_credentials",
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def authenticate_with_contact_credentials(self, username, password, scope=None):
        """perform authentication by contact credentials and store result for execute_request method
        username -- typically a contact email
        password -- contact password
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "password",
            "username": username,
            "password": password,
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
test_imports_urllib.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_urllib(self):
        """
        Tests that urllib isn't changed from under our feet. (This might not
        even be a problem?)
        """
        from future import standard_library
        import urllib
        orig_file = urllib.__file__
        with standard_library.hooks():
            import urllib.response
        self.assertEqual(orig_file, urllib.__file__)
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_urllib_imports_moves(self):
        import future.moves.urllib
        import future.moves.urllib.parse
        import future.moves.urllib.request
        import future.moves.urllib.robotparser
        import future.moves.urllib.error
        import future.moves.urllib.response
        self.assertTrue(True)
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_urllib_imports_install_aliases(self):
        with standard_library.suspend_hooks():
            standard_library.install_aliases()
            import urllib
            import urllib.parse
            import urllib.request
            import urllib.robotparser
            import urllib.error
            import urllib.response
            self.assertTrue(True)
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_urllib_imports_cm(self):
        with standard_library.hooks():
            import urllib
            import urllib.parse
            import urllib.request
            import urllib.robotparser
            import urllib.error
            import urllib.response
        self.assertTrue(True)
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_urllib_imports_install_hooks(self):
        standard_library.remove_hooks()
        standard_library.install_hooks()
        import urllib
        import urllib.parse
        import urllib.request
        import urllib.robotparser
        import urllib.error
        import urllib.response
        self.assertTrue(True)
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def execute_request(self, api_url, api_request_object=None, method=None):
        """
        perform api request and return result as an instance of ApiObject or list of ApiObjects
        api_url -- absolute or relative api resource url
        api_request_object -- any json serializable object to send to API
        method -- HTTP method of api request. Default: GET if api_request_object is None else POST
        """
        if self._token is None:
            raise ApiException("Access token is not abtained. "
                               "Call authenticate_with_apikey or authenticate_with_contact_credentials first.")

        if not api_url.startswith("http"):
            api_url = self.api_endpoint + api_url

        if method is None:
            if api_request_object is None:
                method = "GET"
            else:
                method = "POST"

        request = urllib.request.Request(api_url, method=method)
        if api_request_object is not None:
            request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()

        request.add_header("Content-Type", "application/json")
        request.add_header("Accept", "application/json")
        request.add_header("Authorization", "Bearer " + self._get_access_token())

        try:
            response = urllib.request.urlopen(request)
            return WaApiClient._parse_response(response)
        except urllib.error.HTTPError as httpErr:
            if httpErr.code == 400:
                raise ApiException(httpErr.read())
            else:
                raise
WaApi.py 文件源码 项目:prms-door 作者: prmakerspace 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _refresh_auth_token(self):
        data = {
            "grant_type": "refresh_token",
            "refresh_token": self._token.refresh_token
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
test_standard_library.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_future_moves(self):
        """
        Ensure everything is available from the future.moves interface that we
        claim and expect. (Issue #104).
        """
        from future.moves.collections import Counter, OrderedDict   # backported to Py2.6
        from future.moves.collections import UserDict, UserList, UserString

        from future.moves import configparser
        from future.moves import copyreg

        from future.moves.itertools import filterfalse, zip_longest

        from future.moves import html
        import future.moves.html.entities
        import future.moves.html.parser

        from future.moves import http
        import future.moves.http.client
        import future.moves.http.cookies
        import future.moves.http.cookiejar
        import future.moves.http.server

        from future.moves import queue

        from future.moves import socketserver

        from future.moves.subprocess import check_output              # even on Py2.6
        from future.moves.subprocess import getoutput, getstatusoutput

        from future.moves.sys import intern

        from future.moves import urllib
        import future.moves.urllib.error
        import future.moves.urllib.parse
        import future.moves.urllib.request
        import future.moves.urllib.response
        import future.moves.urllib.robotparser

        try:
            # Is _winreg available on Py2? If so, ensure future.moves._winreg is available too:
            import _winreg
        except ImportError:
            pass
        else:
            from future.moves import winreg

        from future.moves import xmlrpc
        import future.moves.xmlrpc.client
        import future.moves.xmlrpc.server

        from future.moves import _dummy_thread
        from future.moves import _markupbase
        from future.moves import _thread
chat_logging.py 文件源码 项目:Basic-discord-bot 作者: TheWizoid 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def logging_consent(message):
    try:
        logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
        for i in logging_consent:
            if i == "":
                raise FileNotFoundError
    except:
        try:
            os.mkdir("{}".format(message.server))
        except:
            pass
        logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"w",encoding="utf-8")
        logging_consent.write("True")

    logging_consent.close()
    logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
    logging_chat = logging_consent.read()
    logging_consent.close()

    if logging_chat == "True":
        chatlog = open("{0}/{0}_chatlog.txt".format(message.server),"a",encoding="utf-8")
        try:
            message.content += (" " + str(message.attachments[0]["url"]))
            url = str(message.attachments[0]["url"])
            file_name = url.split('/')[-1]
            response = urllib.request.urlopen("{}".format(url))
            #Discord doesn't allow me to download files?
            response_temp = response.read()
        except:
            pass
        print("[{}]{}:{}".format(message.server,message.author.name,message.content))
        try:
            start = int("1f1e6", 16)
            end = int("1f93f", 16)
            emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))
        except:
            pickle.dump({},open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
            emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))

            for i in range(start,end):
                emoji_dict[i] = 0
        """
        unicode_message_content = str.encode(message.content,"utf-8")
        print(unicode_message_content)
        if bytes(128515) in unicode_message_content:
            print("PogChamp")
        print(bytes(128515))
        print(bytes("64",encoding="utf-8"))

        for i in range(start,end):#still doesn't work
            temp = int(str.lower(hex(i)),16)
            if bytes(temp) in unicode_message_content:
                emoji_dict[temp] += 1

        print(emoji_dict)
        #print(emoji_dict["0x1f604"])

        pickle.dump(emoji_dict,open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
        """
        chatlog.write("[" +str(message.timestamp)[0:19]+ "]"+ message.author.name + ":" + message.content + "\n")
        chatlog.close()
soundtagmanager.py 文件源码 项目:JshBot-legacy 作者: jkchen2 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def play_sound_tag(server_id, voice_channel_id, sound_tag_name, user_id):
    """Plays the sound from the given sound tag if it is available."""

    try:
        if servermanager.is_muted(server_id, voice_channel_id):
            raise bot_exception(EXCEPT_TYPE, "The bot is muted in this voice channel")
    except KeyError:
        raise bot_exception(EXCEPT_TYPE, "You are not in a voice channel (are you perhaps on a different server?)")

    sound_tag_data = get_sound_tag_data(server_id, sound_tag_name)
    check_sound_tag_access(server_id, sound_tag_data, user_id, need_owner=False)
    update_sound_tag(server_id, sound_tag_name, increment_hits=True) # Increment hits

    from jshbot.botmanager import client
    from jshbot.botmanager import voice_player
    global timeout_goal

    channel = botmanager.get_voice_channel(server_id, voice_channel_id)
    if client.voice == None or client.voice.channel != channel or not client.voice.is_connected(): # Connect to channel
        if client.voice: # Disconnect from old channel
            await client.voice.disconnect()
        client.voice = await client.join_voice_channel(channel)
        voice_player.server_id = server_id

    if voice_player.player is not None and voice_player.player.is_playing(): # Stop if playing
        voice_player.player.stop()

    if sound_tag_data['type'] == 'YouTube':
        # To prevent playlist downloads
        # 'noplaylist': True
        voice_player.player = await client.voice.create_ytdl_player(sound_tag_data['url'])
    else: # Direct download (or stream? Not sure)
        try:
            # One day, I will figure out how to stream this crap. But today is not that day.
            #response = urllib.request.urlopen(sound_tag_data['url'])
            #voice_player.player = client.voice.create_ffmpeg_player(response, use_avconv=True)
            urllib.request.urlretrieve (sound_tag_data['url'], '{}/tempsound'.format(configmanager.data_directory))
            voice_player.player = client.voice.create_ffmpeg_player('{}/tempsound'.format(configmanager.data_directory))
        except:
            raise bot_exception(EXCEPT_TYPE, "An error occurred when downloading the sound file")
    voice_player.player.start()

    timeout = configmanager.config['voice_timeout']
    if timeout >= 0:
        timeout_reset_lock.acquire()
        timeout_goal = time.time() + ((timeout*60) if timeout > 0 else (sound_tag_data['length']+1))
        timeout_reset_lock.release()
        await timeout_disconnect()


问题


面经


文章

微信
公众号

扫码关注公众号