def check_proxy(self, proxy_list):
ava_list = []
test_url = "http://www.baidu.com/"
for host, port in proxy_list:
ret = False
host_port = "%s:%s" % (host, port)
proxy = {
"http": "http://%s" %(host_port),
"https": "https://%s" %(host_port),
}
proxy_handler = urllib2.ProxyHandler(proxy)
opener = urllib2.build_opener(proxy_handler)
try:
conn = opener.open(test_url, timeout=2.5)
data = conn.read()
if "??" in data:
ret = True
ava_list.append((host, port))
except Exception as e:
# print e
ret = False
print "checking proxy: %s ---> %s" % (host_port, str(ret))
return ava_list
python类ProxyHandler()的实例源码
def cache_resource(self, url):
if self.proxy_url is not None:
proxy = urllib2.ProxyHandler({'http': self.proxy_url})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
request = urllib2.Request(url)
user_agent = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.35 Safari/535.1'
request.add_header('User-Agent', user_agent)
handler = urllib2.urlopen(request, timeout=self.http_timeout)
try:
resource_type = MIME_TYPES[handler.headers.get('Content-Type')]
if not resource_type:
raise UnsupportedResourceFormat("Resource format not found")
except KeyError:
raise UnsupportedResourceFormat("Resource format not supported")
etag = handler.headers.get('ETag')
last_modified = handler.headers.get('Last-Modified')
resource_key = self.get_resource_key(url)
stream = handler.read()
self.update_resource_params(resource_key, resource_type, etag, last_modified, stream)
return stream, resource_type
def __init__(self, configuration):
self.setup(configuration)
self.echo = None
if "ECHO" in configuration:
self.echo = configuration['ECHO']
if self.proxy_scheme is not None and self.proxy_host is not None and \
self.proxy_port is not None:
credentials = ""
if self.proxy_username is not None and self.proxy_password is not None:
credentials = self.proxy_username + ":" + self.proxy_password + "@"
proxyDict = {
self.proxy_scheme: self.proxy_scheme + "://" + credentials +
self.proxy_host + ":" + self.proxy_port
}
proxy = urllib2.ProxyHandler(proxyDict)
if credentials != '':
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler)
else:
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
def do_GET(self):
filename = self.path.split('/')[-1]
print "ProxyHandler: url:", self.path, " localfile:", filename
if os.path.exists(filename):
print "ProxyHandler: local file found:",filename
self.copyfile(open(filename), self.wfile)
else:
filenamejson = filename + ".json"
print "ProxyHandler: local file NOT found:", filename, " trying: ", filenamejson
if os.path.exists(filenamejson):
print "ProxyHandler: local file found:",filenamejson
self.copyfile(open(filenamejson), self.wfile)
else:
print "ProxyHandler: trying url:",self.path
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
try:
req = urllib2.Request(self.path)
self.copyfile(opener.open(req), self.wfile)
except:
print "ProxyHandler: file not found:", self.path
def wait_xxnet_exit():
def http_request(url, method="GET"):
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
try:
req = opener.open(url)
return req
except Exception as e:
#logging.exception("web_control http_request:%s fail:%s", url, e)
return False
for i in range(20):
host_port = config.get(["modules", "launcher", "control_port"], 8085)
req_url = "http://127.0.0.1:{port}/quit".format(port=host_port)
if http_request(req_url) == False:
return True
time.sleep(1)
return False
def _init_urllib(self, secure, debuglevel=0):
cj = cookielib.CookieJar()
no_proxy_support = urllib2.ProxyHandler({})
cookie_handler = urllib2.HTTPCookieProcessor(cj)
ctx = None
if not secure:
self._logger.info('[WARNING] Skip certificate verification.')
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx)
opener = urllib2.build_opener(no_proxy_support,
cookie_handler,
https_handler,
MultipartPostHandler.MultipartPostHandler)
opener.addheaders = [('User-agent', API_USER_AGENT)]
urllib2.install_opener(opener)
def setUp(self):
super(ProxyAuthTests, self).setUp()
self.digest_auth_handler = DigestAuthHandler()
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
self.digest_auth_handler.set_realm(self.REALM)
# With Digest Authentication
def create_fake_proxy_handler(*args, **kwargs):
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
self.server.start()
self.server.ready.wait()
proxy_url = "http://127.0.0.1:%d" % self.server.port
handler = urllib2.ProxyHandler({"http" : proxy_url})
self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
def test_proxy(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
o.add_handler(ph)
meth_spec = [
[("http_open", "return response")]
]
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://acme.example.com/")
self.assertEqual(req.get_host(), "acme.example.com")
r = o.open(req)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
def test_proxy_https_proxy_authorization(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
o.add_handler(ph)
https_handler = MockHTTPSHandler()
o.add_handler(https_handler)
req = Request("https://www.example.com/")
req.add_header("Proxy-Authorization","FooBar")
req.add_header("User-Agent","Grail")
self.assertEqual(req.get_host(), "www.example.com")
self.assertIsNone(req._tunnel_host)
r = o.open(req)
# Verify Proxy-Authorization gets tunneled to request.
# httpsconn req_headers do not have the Proxy-Authorization header but
# the req will have.
self.assertNotIn(("Proxy-Authorization","FooBar"),
https_handler.httpconn.req_headers)
self.assertIn(("User-Agent","Grail"),
https_handler.httpconn.req_headers)
self.assertIsNotNone(req._tunnel_host)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
opener.add_handler(ph)
password_manager = MockPasswordManager()
auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
realm, http_handler, password_manager,
"http://acme.example.com:3128/protected",
"proxy.example.com:3128",
)
def test_proxy(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
o.add_handler(ph)
meth_spec = [
[("http_open", "return response")]
]
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://acme.example.com/")
self.assertEqual(req.get_host(), "acme.example.com")
r = o.open(req)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
def test_proxy_https_proxy_authorization(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
o.add_handler(ph)
https_handler = MockHTTPSHandler()
o.add_handler(https_handler)
req = Request("https://www.example.com/")
req.add_header("Proxy-Authorization","FooBar")
req.add_header("User-Agent","Grail")
self.assertEqual(req.get_host(), "www.example.com")
self.assertIsNone(req._tunnel_host)
r = o.open(req)
# Verify Proxy-Authorization gets tunneled to request.
# httpsconn req_headers do not have the Proxy-Authorization header but
# the req will have.
self.assertNotIn(("Proxy-Authorization","FooBar"),
https_handler.httpconn.req_headers)
self.assertIn(("User-Agent","Grail"),
https_handler.httpconn.req_headers)
self.assertIsNotNone(req._tunnel_host)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
opener.add_handler(ph)
password_manager = MockPasswordManager()
auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
realm = "ACME Networks"
http_handler = MockHTTPHandler(
407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
realm, http_handler, password_manager,
"http://acme.example.com:3128/protected",
"proxy.example.com:3128",
)
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_identify(proxy, url):
cookie = cookielib.LWPCookieJar()
handler = urllib2.HTTPCookieProcessor(cookie)
proxy_support = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy_support, handler)
try:
response = opener.open(url, timeout=3)
if response.code == 200:
c = ''
for item in cookie:
c += item.name+'='+item.value+';'
print c
IpProxy.sogou_cookie.append(c)
return True
except Exception, error:
print error
return False
def checker():
while True:
if proxyq.empty() is not True:
proxy = "http://{}".format( proxyq.get() )
url = "http://icanhazip.com"
proxy_handler = urllib2.ProxyHandler( { "http" : proxy } )
opener = urllib2.build_opener( proxy_handler )
urllib2.install_opener( opener )
printq.put( "[>] Trying {}".format( proxy ) )
try:
response = urllib2.urlopen( url, timeout=3 ).readlines()
for line in response:
if line.rstrip( "\n" ) in proxy:
printq.put( "[+] Working proxy: {}".format( proxy ) )
with open( "working.txt", "a" ) as log:
log.write( "{}\n".format( proxy ) )
log.close()
except Exception as ERROR:
printq.put( "[!] Bad proxy: {}".format( proxy ) )
proxyq.task_done()
def init_options(proxy=None, cookie=None, ua=None, referer=None):
globals()["_headers"] = dict(filter(lambda _: _[1], ((COOKIE, cookie), (UA, ua or NAME), (REFERER, referer))))
urllib2.install_opener(urllib2.build_opener(urllib2.ProxyHandler({'http': proxy})) if proxy else None)
# if __name__ == "__main__":
# print "%s #v%s\n by: %s\n" % (NAME, VERSION, AUTHOR)
# parser = optparse.OptionParser(version=VERSION)
# parser.add_option("-u", "--url", dest="url", help="Target URL (e.g. \"http://www.target.com/page.php?id=1\")")
# parser.add_option("--data", dest="data", help="POST data (e.g. \"query=test\")")
# parser.add_option("--cookie", dest="cookie", help="HTTP Cookie header value")
# parser.add_option("--user-agent", dest="ua", help="HTTP User-Agent header value")
# parser.add_option("--referer", dest="referer", help="HTTP Referer header value")
# parser.add_option("--proxy", dest="proxy", help="HTTP proxy address (e.g. \"http://127.0.0.1:8080\")")
# options, _ = parser.parse_args()
# if options.url:
# init_options(options.proxy, options.cookie, options.ua, options.referer)
# result = scan_page(options.url if options.url.startswith("http") else "http://%s" % options.url, options.data)
# print "\nscan results: %s vulnerabilities found" % ("possible" if result else "no")
# else:
# parser.print_help()
def setUp(self):
mechanize._testcase.TestCase.setUp(self)
self.test_uri = urljoin(self.uri, "test_fixtures")
self.server = self.get_cached_fixture("server")
if self.no_proxies:
old_opener_m = mechanize._opener._opener
old_opener_u = urllib2._opener
mechanize.install_opener(mechanize.build_opener(
mechanize.ProxyHandler(proxies={})))
urllib2.install_opener(urllib2.build_opener(
urllib2.ProxyHandler(proxies={})))
def revert_install():
mechanize.install_opener(old_opener_m)
urllib2.install_opener(old_opener_u)
self.add_teardown(revert_install)