python类BaseHTTPRequestHandler()的实例源码

__init__.py 文件源码 项目:hangoutsbot 作者: das7pad 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_listening(bot=None, loop=None, name="", port=8000, certfile=None,
                    webhook_receiver=BaseHTTPRequestHandler,
                    friendly_name="UNKNOWN"):
    if loop:
        asyncio.set_event_loop(loop)

    if bot:
        webhook_receiver._bot = bot

    try:
        httpd = HTTPServer((name, port), webhook_receiver)

        if certfile:
            httpd.socket = ssl.wrap_socket(
                httpd.socket,
                certfile=certfile,
                server_side=True)

        socket = httpd.socket.getsockname()

        logger.info("%s : %s:%s...", friendly_name, socket[0], socket[1])

        httpd.serve_forever()

    except ssl.SSLError:
        logger.exception("%s : %s:%s, pem file is invalid/corrupt",
                         friendly_name, name, port)

    except OSError as err:
        if err.errno == 2:
            message = ".pem file is missing/unavailable"
        elif err.errno == 98:
            message = "address/port in use"
        else:
            message = str(err.strerror)

        logger.exception("%s : %s:%s, %s", friendly_name, name, port, message)

        try:
            httpd.socket.close()
        except:
            pass

    except KeyboardInterrupt:
        httpd.socket.close()
Server.py 文件源码 项目:SpotifyWeb 作者: DevInsideYou 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_redirect_response(send_oauth2_request, oauth2_url, redirect_port, handle, available_duration_for_login_in_seconds):
    def get_redirect_response():
      server = start_up_server(SpotifyRedirectURIHandler())
      send_oauth2_request(oauth2_url)
      block_thread_until_spotify_responds(server)
      shut_down_server(server)

    def start_up_server(handler):
      return HTTPServer(('', redirect_port), handler)

    def SpotifyRedirectURIHandler():
      class SpotifyRedirectURIHandler(BaseHTTPRequestHandler):
        def do_GET(self):
          self.send_response(200)
          self.send_header('Content-type', 'text/html')
          self.end_headers()

          handle(self.path)

          html = open(self.__html_location(), "r").read()

          self.wfile.write(bytes(html, "utf8"))

          return

        def __html_location(self):
          return os.path.abspath(
              os.path.join(
                os.path.dirname(__file__),
                "../../resources/index.html"
              )
            )

      return SpotifyRedirectURIHandler

    def block_thread_until_spotify_responds(server):
      circuit_breaker = threading.Timer(available_duration_for_login_in_seconds, send_http_request_to_self)
      circuit_breaker.start()
      server.handle_request() # blocking happens here
      circuit_breaker.cancel()

    def send_http_request_to_self():
      try:
        urllib.request.urlopen("http://localhost:{}/".format(str(redirect_port))).read()
      except:
        pass

    def shut_down_server(server):
      server.socket.close()

    get_redirect_response()


问题


面经


文章

微信
公众号

扫码关注公众号