python类ProxyHandler()的实例源码

jd.py 文件源码 项目:jd-crawler 作者: qiaofei32 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_proxy(self, proxy_list):
        ava_list = []
        test_url = "http://www.baidu.com/"
        for host, port in proxy_list:
            ret = False
            host_port = "%s:%s" % (host, port)
            proxy = {
                "http": "http://%s" %(host_port),
                "https": "https://%s" %(host_port),
            }
            proxy_handler = urllib2.ProxyHandler(proxy)
            opener = urllib2.build_opener(proxy_handler)
            try:
                conn = opener.open(test_url, timeout=2.5)
                data = conn.read()
                if "??" in data:
                    ret = True
                    ava_list.append((host, port))
            except Exception as e:
                # print e
                ret = False
            print "checking proxy: %s ---> %s" % (host_port, str(ret))
        return ava_list
cacheapi.py 文件源码 项目:YouPBX 作者: JoneXiong 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def cache_resource(self, url):
        if self.proxy_url is not None:
            proxy = urllib2.ProxyHandler({'http': self.proxy_url})
            opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)
        request = urllib2.Request(url)
        user_agent = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.35 Safari/535.1'
        request.add_header('User-Agent', user_agent)
        handler = urllib2.urlopen(request, timeout=self.http_timeout)
        try:
            resource_type = MIME_TYPES[handler.headers.get('Content-Type')]
            if not resource_type:
                raise UnsupportedResourceFormat("Resource format not found")
        except KeyError:
            raise UnsupportedResourceFormat("Resource format not supported")
        etag = handler.headers.get('ETag')
        last_modified = handler.headers.get('Last-Modified')
        resource_key = self.get_resource_key(url)
        stream = handler.read()
        self.update_resource_params(resource_key, resource_type, etag, last_modified, stream)
        return stream, resource_type
httphandler.py 文件源码 项目:lightbulb-framework 作者: lightbulb-framework 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, configuration):
        self.setup(configuration)
        self.echo = None
        if "ECHO" in configuration:
            self.echo = configuration['ECHO']
        if self.proxy_scheme is not None and self.proxy_host is not None and \
                        self.proxy_port is not None:
            credentials = ""
            if self.proxy_username is not None and self.proxy_password is not None:
                credentials = self.proxy_username + ":" + self.proxy_password + "@"
            proxyDict = {
                self.proxy_scheme: self.proxy_scheme + "://" + credentials +
                                                    self.proxy_host + ":" + self.proxy_port
            }

            proxy = urllib2.ProxyHandler(proxyDict)

            if credentials != '':
                auth = urllib2.HTTPBasicAuthHandler()
                opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler)
            else:
                opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)
raml2doc-met.py 文件源码 项目:raml2doc 作者: openconnectivityfoundation 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def do_GET(self):
        filename = self.path.split('/')[-1]
        print "ProxyHandler: url:", self.path, " localfile:", filename
        if os.path.exists(filename):
            print "ProxyHandler: local file found:",filename
            self.copyfile(open(filename), self.wfile)
        else:
            filenamejson = filename + ".json"
            print "ProxyHandler: local file NOT found:", filename, " trying: ", filenamejson
            if os.path.exists(filenamejson):
                print "ProxyHandler: local file found:",filenamejson
                self.copyfile(open(filenamejson), self.wfile)
            else:
                print "ProxyHandler: trying url:",self.path
                proxy_handler = urllib2.ProxyHandler({})
                opener = urllib2.build_opener(proxy_handler)
                try:
                    req = urllib2.Request(self.path)
                    self.copyfile(opener.open(req), self.wfile)
                except:
                    print "ProxyHandler: file not found:", self.path
setup.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def wait_xxnet_exit():

    def http_request(url, method="GET"):
        proxy_handler = urllib2.ProxyHandler({})
        opener = urllib2.build_opener(proxy_handler)
        try:
            req = opener.open(url)
            return req
        except Exception as e:
            #logging.exception("web_control http_request:%s fail:%s", url, e)
            return False

    for i in range(20):
        host_port = config.get(["modules", "launcher", "control_port"], 8085)
        req_url = "http://127.0.0.1:{port}/quit".format(port=host_port)
        if http_request(req_url) == False:
            return True
        time.sleep(1)
    return False
cas.py 文件源码 项目:etunexus_api 作者: etusolution 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _init_urllib(self, secure, debuglevel=0):
        cj = cookielib.CookieJar()
        no_proxy_support = urllib2.ProxyHandler({})
        cookie_handler = urllib2.HTTPCookieProcessor(cj)
        ctx = None
        if not secure:
            self._logger.info('[WARNING] Skip certificate verification.')
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
        https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx)
        opener = urllib2.build_opener(no_proxy_support,
                                      cookie_handler,
                                      https_handler,
                                      MultipartPostHandler.MultipartPostHandler)
        opener.addheaders = [('User-agent', API_USER_AGENT)]
        urllib2.install_opener(opener)
test_urllib2_localnet.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setUp(self):
        super(ProxyAuthTests, self).setUp()
        self.digest_auth_handler = DigestAuthHandler()
        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
        self.digest_auth_handler.set_realm(self.REALM)
        # With Digest Authentication
        def create_fake_proxy_handler(*args, **kwargs):
            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)

        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
        self.server.start()
        self.server.ready.wait()
        proxy_url = "http://127.0.0.1:%d" % self.server.port
        handler = urllib2.ProxyHandler({"http" : proxy_url})
        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
test_urllib2.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
test_urllib2.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
test_urllib2.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
test_urllib2.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_proxy(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        o.add_handler(ph)
        meth_spec = [
            [("http_open", "return response")]
            ]
        handlers = add_ordered_mock_handlers(o, meth_spec)

        req = Request("http://acme.example.com/")
        self.assertEqual(req.get_host(), "acme.example.com")
        r = o.open(req)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")

        self.assertEqual([(handlers[0], "http_open")],
                         [tup[0:2] for tup in o.calls])
test_urllib2.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_proxy_https_proxy_authorization(self):
        o = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
        o.add_handler(ph)
        https_handler = MockHTTPSHandler()
        o.add_handler(https_handler)
        req = Request("https://www.example.com/")
        req.add_header("Proxy-Authorization","FooBar")
        req.add_header("User-Agent","Grail")
        self.assertEqual(req.get_host(), "www.example.com")
        self.assertIsNone(req._tunnel_host)
        r = o.open(req)
        # Verify Proxy-Authorization gets tunneled to request.
        # httpsconn req_headers do not have the Proxy-Authorization header but
        # the req will have.
        self.assertNotIn(("Proxy-Authorization","FooBar"),
                         https_handler.httpconn.req_headers)
        self.assertIn(("User-Agent","Grail"),
                      https_handler.httpconn.req_headers)
        self.assertIsNotNone(req._tunnel_host)
        self.assertEqual(req.get_host(), "proxy.example.com:3128")
        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
test_urllib2.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_proxy_basic_auth(self):
        opener = OpenerDirector()
        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
        opener.add_handler(ph)
        password_manager = MockPasswordManager()
        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
        realm = "ACME Networks"
        http_handler = MockHTTPHandler(
            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
        opener.add_handler(auth_handler)
        opener.add_handler(http_handler)
        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
                              realm, http_handler, password_manager,
                              "http://acme.example.com:3128/protected",
                              "proxy.example.com:3128",
                              )
__init__.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
__init__.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
__init__.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def proxy_open(self, req, proxy, type):
    # This block is copied wholesale from Python2.6 urllib2.
    # It is idempotent, so the superclass method call executes as normal
    # if invoked.
    orig_type = req.get_type()
    proxy_type, user, password, hostport = self._parse_proxy(proxy)
    if proxy_type is None:
      proxy_type = orig_type
    if user and password:
      user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
      creds = base64.b64encode(user_pass).strip()
      # Later calls overwrite earlier calls for the same header
      req.add_header("Proxy-authorization", "Basic " + creds)
    hostport = urllib2.unquote(hostport)
    req.set_proxy(hostport, proxy_type)
    # This condition is the change
    if orig_type == "https":
      return None

    return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
cookie_util.py 文件源码 项目:sogouWechart 作者: duanbj 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def proxy_identify(proxy, url):
        cookie = cookielib.LWPCookieJar()
        handler = urllib2.HTTPCookieProcessor(cookie)
        proxy_support = urllib2.ProxyHandler({'http': proxy})
        opener = urllib2.build_opener(proxy_support, handler)
        try:
            response = opener.open(url, timeout=3)
            if response.code == 200:
                c = ''
                for item in cookie:
                    c += item.name+'='+item.value+';'
                print c
                IpProxy.sogou_cookie.append(c)
                return True
        except Exception, error:
            print error
            return False
httproxychecker.py 文件源码 项目:tools 作者: okabe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def checker():
    while True:
        if proxyq.empty() is not True:
            proxy = "http://{}".format( proxyq.get() )
            url = "http://icanhazip.com"
            proxy_handler = urllib2.ProxyHandler( { "http" : proxy } )
            opener = urllib2.build_opener( proxy_handler )
            urllib2.install_opener( opener )
            printq.put( "[>] Trying {}".format( proxy ) )
            try:
                response = urllib2.urlopen( url, timeout=3 ).readlines()
                for line in response:
                    if line.rstrip( "\n" ) in proxy:
                        printq.put( "[+] Working proxy: {}".format( proxy ) )
                        with open( "working.txt", "a" ) as log:
                            log.write( "{}\n".format( proxy ) )
                        log.close()
            except Exception as ERROR:
                printq.put( "[!] Bad proxy: {}".format( proxy ) )
            proxyq.task_done()
sqlScan.py 文件源码 项目:00scanner 作者: xiaoqin00 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init_options(proxy=None, cookie=None, ua=None, referer=None):
    globals()["_headers"] = dict(filter(lambda _: _[1], ((COOKIE, cookie), (UA, ua or NAME), (REFERER, referer))))
    urllib2.install_opener(urllib2.build_opener(urllib2.ProxyHandler({'http': proxy})) if proxy else None)

# if __name__ == "__main__":
#     print "%s #v%s\n by: %s\n" % (NAME, VERSION, AUTHOR)
#     parser = optparse.OptionParser(version=VERSION)
#     parser.add_option("-u", "--url", dest="url", help="Target URL (e.g. \"http://www.target.com/page.php?id=1\")")
#     parser.add_option("--data", dest="data", help="POST data (e.g. \"query=test\")")
#     parser.add_option("--cookie", dest="cookie", help="HTTP Cookie header value")
#     parser.add_option("--user-agent", dest="ua", help="HTTP User-Agent header value")
#     parser.add_option("--referer", dest="referer", help="HTTP Referer header value")
#     parser.add_option("--proxy", dest="proxy", help="HTTP proxy address (e.g. \"http://127.0.0.1:8080\")")
#     options, _ = parser.parse_args()
#     if options.url:
#         init_options(options.proxy, options.cookie, options.ua, options.referer)
#         result = scan_page(options.url if options.url.startswith("http") else "http://%s" % options.url, options.data)
#         print "\nscan results: %s vulnerabilities found" % ("possible" if result else "no")
#     else:
#         parser.print_help()
test_functional.py 文件源码 项目:mechanize 作者: python-mechanize 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        mechanize._testcase.TestCase.setUp(self)
        self.test_uri = urljoin(self.uri, "test_fixtures")
        self.server = self.get_cached_fixture("server")
        if self.no_proxies:
            old_opener_m = mechanize._opener._opener
            old_opener_u = urllib2._opener
            mechanize.install_opener(mechanize.build_opener(
                mechanize.ProxyHandler(proxies={})))
            urllib2.install_opener(urllib2.build_opener(
                urllib2.ProxyHandler(proxies={})))

            def revert_install():
                mechanize.install_opener(old_opener_m)
                urllib2.install_opener(old_opener_u)
            self.add_teardown(revert_install)


问题


面经


文章

微信
公众号

扫码关注公众号