def download_from_url(url):
proxy = env_server.get_proxy()
if proxy['enabled']:
server = proxy['server'].replace('http://', '')
proxy_dict = {
'http': 'http://{login}:{pass}@{0}'.format(server, **proxy)
}
proxy_handler = urllib2.ProxyHandler(proxy_dict)
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
run_thread = tc.ServerThread(env_inst.ui_main)
run_thread.kwargs = dict(url=url, timeout=1)
run_thread.routine = urllib2.urlopen
run_thread.run()
result_thread = tc.treat_result(run_thread, silent=True)
if result_thread.isFailed():
return False
else:
return result_thread.result
python类HTTPHandler()的实例源码
def download_vcpython27(self):
"""
Download vcpython27 since some Windows 7 boxes have it and some don't.
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of vcpython27... this may take a few minutes...')
with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of vcpython27 complete')
def download_python(self):
"""
Download Python
:return: None
"""
self._prepare_for_download()
logger.info('Beginning download of python')
with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f:
if self.PROXY is not None:
opener = urllib2.build_opener(
urllib2.HTTPHandler(),
urllib2.HTTPSHandler(),
urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY})
)
urllib2.install_opener(opener)
f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read())
logger.debug('Download of python complete')
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def _update_opener(self):
'''
Builds and installs a new opener to be used by all future calls to
:func:`urllib2.urlopen`.
'''
if self._http_debug:
http = urllib2.HTTPHandler(debuglevel=1)
else:
http = urllib2.HTTPHandler()
if self._proxy:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.ProxyHandler({'http':
self._proxy}),
urllib2.HTTPBasicAuthHandler(),
http)
else:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj),
urllib2.HTTPBasicAuthHandler(),
http)
urllib2.install_opener(opener)
def check_gn_proxy(proxy, protocal_type='HTTP'):
url = 'http://icanhazip.com'
proxy_handler = urllib2.ProxyHandler({
'http': 'http://' + proxy,
'https': 'https://' + proxy,
})
if protocal_type == 'HTTPS':
url = 'https://icanhazip.com'
opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler)
try:
response = opener.open(url, timeout=3)
res_ip = response.read().strip()
return response.code == 200 and res_ip == proxy.split(':')[0]
except Exception:
return False
def error_handler(url):
global HANDLE_ERRORS
orig = HANDLE_ERRORS
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)
pos = {0: 'off', 1: 'on'}
for i in (0, 1):
print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
HANDLE_ERRORS = i
try:
fo = urllib2.urlopen(url)
foo = fo.read()
fo.close()
try: status, reason = fo.status, fo.reason
except AttributeError: status, reason = None, None
except IOError, e:
print " EXCEPTION: %s" % e
raise
else:
print " status = %s, reason = %s" % (status, reason)
HANDLE_ERRORS = orig
hosts = keepalive_handler.open_connections()
print "open connections:", hosts
keepalive_handler.close_all()
def comp(N, url):
print ' making %i connections to:\n %s' % (N, url)
sys.stdout.write(' first using the normal urllib handlers')
# first use normal opener
opener = urllib2.build_opener()
urllib2.install_opener(opener)
t1 = fetch(N, url)
print ' TIME: %.3f s' % t1
sys.stdout.write(' now using the keepalive handler ')
# now install the keepalive handler and try again
opener = urllib2.build_opener(HTTPHandler())
urllib2.install_opener(opener)
t2 = fetch(N, url)
print ' TIME: %.3f s' % t2
print ' improvement factor: %.2f' % (t1/t2, )
def getUnRedirectUrl(url,timeout=10):
req = urllib2.Request(url)
debug_handler = urllib2.HTTPHandler(debuglevel = 0)
opener = urllib2.build_opener(debug_handler, RedirctHandler)
html = None
response = None
try:
response = opener.open(url,timeout=timeout)
html = response.read()
except urllib2.URLError as e:
if hasattr(e, 'headers'):
error_info = e.headers
elif hasattr(e, 'reason'):
error_info = e.reason
finally:
if response:
response.close()
if html:
return html
else:
return error_info
def __init__(self, configuration):
self.setup(configuration)
self.echo = None
if "ECHO" in configuration:
self.echo = configuration['ECHO']
if self.proxy_scheme is not None and self.proxy_host is not None and \
self.proxy_port is not None:
credentials = ""
if self.proxy_username is not None and self.proxy_password is not None:
credentials = self.proxy_username + ":" + self.proxy_password + "@"
proxyDict = {
self.proxy_scheme: self.proxy_scheme + "://" + credentials +
self.proxy_host + ":" + self.proxy_port
}
proxy = urllib2.ProxyHandler(proxyDict)
if credentials != '':
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler)
else:
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
_debug = 0
self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)
self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
self.http_handler = urllib.HTTPHandler(debuglevel=_debug)
self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def error_handler(url):
global HANDLE_ERRORS
orig = HANDLE_ERRORS
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)
pos = {0: 'off', 1: 'on'}
for i in (0, 1):
print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
HANDLE_ERRORS = i
try:
fo = urllib2.urlopen(url)
foo = fo.read()
fo.close()
try: status, reason = fo.status, fo.reason
except AttributeError: status, reason = None, None
except IOError, e:
print " EXCEPTION: %s" % e
raise
else:
print " status = %s, reason = %s" % (status, reason)
HANDLE_ERRORS = orig
hosts = keepalive_handler.open_connections()
print "open connections:", ' '.join(hosts)
keepalive_handler.close_all()
def comp(N, url):
print ' making %i connections to:\n %s' % (N, url)
sys.stdout.write(' first using the normal urllib handlers')
# first use normal opener
opener = urllib2.build_opener()
urllib2.install_opener(opener)
t1 = fetch(N, url)
print ' TIME: %.3f s' % t1
sys.stdout.write(' now using the keepalive handler ')
# now install the keepalive handler and try again
opener = urllib2.build_opener(HTTPHandler())
urllib2.install_opener(opener)
t2 = fetch(N, url)
print ' TIME: %.3f s' % t2
print ' improvement factor: %.2f' % (t1/t2, )
def send_common_request(url, is_post, cookie, para=''):
"""
?????WEB???????
:url: ??URL
:is_post: ???POST
:cookie: cookie
"""
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:42.0) Gecko/20100101 Firefox/42.0',
'Cookie': cookie
}
# dns cache
# socket.getaddrinfo = new_getaddrinfo
try:
encoding_support = ContentEncodingProcessor()
opener = urllib2.build_opener(encoding_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
if is_post == 2: # post
# url, query = url.split('?', 1)
return urllib2.urlopen(urllib2.Request(url, para, headers=headers)).read()
else:
return urllib2.urlopen(urllib2.Request('?'.join([url, para]), headers=headers)).read()
except:
return ''
def send_signal(event, response_status, reason, response_data=None):
response_body = json.dumps(
{
'Status': response_status,
'Reason': str(reason or 'ReasonCanNotBeNone'),
'PhysicalResourceId': event.get('PhysicalResourceId', event['LogicalResourceId']),
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': response_data or {}
},
sort_keys=True,
)
logging.debug(response_body)
opener = build_opener(HTTPHandler)
request = Request(event['ResponseURL'], data=response_body)
request.add_header('Content-Type', '')
request.add_header('Content-Length', len(response_body))
request.get_method = lambda: 'PUT'
opener.open(request)
def GetLocation(url,timeout=25):
req = urllib2.Request(url)
debug_handler = urllib2.HTTPHandler()
opener = urllib2.build_opener(debug_handler, RedirctHandler)
location = None
try:
opener.open(url,timeout=timeout)
except urllib2.URLError as e:
if hasattr(e, 'code'):
error_info = e.code
elif hasattr(e, 'reason'):
error_info = e.reason
except RedirectException as e:
location = e.location
if location:
return location, None
else:
return False, error_info
def error_handler(url):
global HANDLE_ERRORS
orig = HANDLE_ERRORS
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)
pos = {0: 'off', 1: 'on'}
for i in (0, 1):
print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
HANDLE_ERRORS = i
try:
fo = urllib2.urlopen(url)
foo = fo.read()
fo.close()
try: status, reason = fo.status, fo.reason
except AttributeError: status, reason = None, None
except IOError, e:
print " EXCEPTION: %s" % e
raise
else:
print " status = %s, reason = %s" % (status, reason)
HANDLE_ERRORS = orig
hosts = keepalive_handler.open_connections()
print "open connections:", hosts
keepalive_handler.close_all()
def comp(N, url):
print ' making %i connections to:\n %s' % (N, url)
sys.stdout.write(' first using the normal urllib handlers')
# first use normal opener
opener = urllib2.build_opener()
urllib2.install_opener(opener)
t1 = fetch(N, url)
print ' TIME: %.3f s' % t1
sys.stdout.write(' now using the keepalive handler ')
# now install the keepalive handler and try again
opener = urllib2.build_opener(HTTPHandler())
urllib2.install_opener(opener)
t2 = fetch(N, url)
print ' TIME: %.3f s' % t2
print ' improvement factor: %.2f' % (t1/t2, )
def error_handler(url):
global HANDLE_ERRORS
orig = HANDLE_ERRORS
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)
pos = {0: 'off', 1: 'on'}
for i in (0, 1):
print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
HANDLE_ERRORS = i
try:
fo = urllib2.urlopen(url)
foo = fo.read()
fo.close()
try: status, reason = fo.status, fo.reason
except AttributeError: status, reason = None, None
except IOError, e:
print " EXCEPTION: %s" % e
raise
else:
print " status = %s, reason = %s" % (status, reason)
HANDLE_ERRORS = orig
hosts = keepalive_handler.open_connections()
print "open connections:", ' '.join(hosts)
keepalive_handler.close_all()
def comp(N, url):
print ' making %i connections to:\n %s' % (N, url)
sys.stdout.write(' first using the normal urllib handlers')
# first use normal opener
opener = urllib2.build_opener()
urllib2.install_opener(opener)
t1 = fetch(N, url)
print ' TIME: %.3f s' % t1
sys.stdout.write(' now using the keepalive handler ')
# now install the keepalive handler and try again
opener = urllib2.build_opener(HTTPHandler())
urllib2.install_opener(opener)
t2 = fetch(N, url)
print ' TIME: %.3f s' % t2
print ' improvement factor: %.2f' % (t1/t2, )
def __init__(self,timeout=10,threads=None,stacksize=32768*16,loginfunc=None):
#proxy_support = urllib2.ProxyHandler({'http':'http://localhost:3128'})
cookie_support = urllib2.HTTPCookieProcessor(cookielib.CookieJar())
encoding_support = ContentEncodingProcessor()
#self.opener = urllib2.build_opener(cookie_support,encoding_support,proxy_support,urllib2.HTTPHandler)
self.opener = urllib2.build_opener(cookie_support,encoding_support,urllib2.HTTPHandler)
self.req = urllib2.Request('http://www.hsbc.com')
socket.setdefaulttimeout(timeout)
self.q_req = Queue()
self.q_ans = Queue()
self.lock = Lock()
self.running = 0
if loginfunc:
self.opener = loginfunc(self.opener)
if threads:
self.threads = threads
stack_size(stacksize)
for i in range(threads):
t = Thread(target=self.threadget)
t.setDaemon(True)
t.start()
def __init__(self):
# ????
self.proxy_url = proxyList[3]
self.proxy = urllib2.ProxyHandler({"http": self.proxy_url})
# ??
self.hostURL = 'http://book.douban.com/tag/'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.47 (KHTML, like Gecko)'
' Chrome/48.1.2524.116 Safari/537.36',
'Referer': 'http://book.douban.com/',
'Host': 'book.douban.com',
'Upgrade-Insecure-Requests': '1',
'Connection': 'keep-alive'
}
# opener??
self.cookie = cookielib.LWPCookieJar()
self.cookieHandler = urllib2.HTTPCookieProcessor(self.cookie)
self.opener = urllib2.build_opener(self.cookieHandler, self.proxy, urllib2.HTTPHandler)
# ????????????
def Check(ip):
try:
log.step_normal('????ip:[%s]' % ip)
proxy_support = urllib2.ProxyHandler({'http': 'http://' + ip})
opener = urllib2.build_opener(proxy_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
request = urllib2.Request('http://www.baidu.com')
request.add_header("cookie", env.COOKIE)
request.add_header("User-Agent", getUA())
content = urllib2.urlopen(request, timeout=4).read()
if len(content) >= 1000:
log.step_normal('add proxy [%s]' % ip)
return ip
else:
log.step_normal('??????IP??? [%s]' % ip)
praserJsonFile()
except (URLError, HTTPError) as e:
log.step_normal('??ip?? [%s]' % ip)
praserJsonFile()
def get_local_ip(ip):
try:
proxy_support = urllib2.ProxyHandler({'http': 'http://' + ip})
opener = urllib2.build_opener(proxy_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
request = urllib2.Request('http://ip.chinaz.com/getip.aspx')
# request.add_header("cookie", env.COOKIE)
request.add_header("User-Agent", getUA())
fp = urllib2.urlopen(request)
mybytes = fp.read()
# note that Python3 does not read the html code as string
# but as html code bytearray, convert to string with
mystr = mybytes.decode('utf-8')
fp.close()
ip = mystr.find("ip")
add = mystr.find("address")
ip = mystr[ip + 4:add - 2]
address = mystr[add + 9:-2]
return [ip, address]
except (HTTPError, URLError, Exception) as e:
log.step_warning ('??ip????---> %s' % e)
return [ip, 'address']
# ??????????random.randint(0
def httpConnection(url, proxy):
#TODO: habilitar autenticacion ntlm
if (proxy.auth == "ntlm"):
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = HTTPNtlmAuthHandler.HTTPNtlmAuthHandler(passman)
else:
passman = urllib2.HTTPPasswordMgr()
passman.add_password(None, proxy.url, proxy.user, proxy.password)
auth = urllib2.HTTPBasicAuthHandler(passman)
if (proxy.url):
proxy = urllib2.ProxyHandler({'http': proxy.url})
opener = urllib2.build_opener(proxy.url, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
return urllib2.urlopen(url)
def check(proxy):
import urllib2
url = "http://connect.rom.miui.com/generate_204"
proxy_handler = urllib2.ProxyHandler({'http': "http://" + proxy})
opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler)
try:
response = opener.open(url, timeout=1)
return response.code == 204 and response.url == url
except Exception:
return False
def get_file(self, url, quality):
self.cookieJar = cookielib.LWPCookieJar()
self.opener = urllib2.build_opener(
urllib2.HTTPCookieProcessor(self.cookieJar),
urllib2.HTTPRedirectHandler(),
urllib2.HTTPHandler(debuglevel=0))
self.opener.addheaders = [('User-agent', "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36")]
forms = {"youtubeURL": url,
'quality':quality
}
data = urllib.urlencode(forms)
req = urllib2.Request('http://www.convertmemp3.com/',data)
res = self.opener.open(req)
self.convhtml = res.read()
def __init__(self, proxy=None, debuglevel=0):
self.proxy = proxy
urllib2.HTTPHandler.__init__(self, debuglevel)
def do_open(self, http_class, req):
if self.proxy is not None:
req.set_proxy(self.proxy, "http")
return urllib2.HTTPHandler.do_open(self, ProxyHTTPConnection, req)