def save_cookie(self, text, cookie_file=CONF.cookie_file):
cookie_jar2 = cookielib.LWPCookieJar()
cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2)
opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler)
urllib2.install_opener(opener2)
if six.PY3:
text = text.decode('gbk')
p = re.compile('location\.replace\(\'(.*?)\'\)')
# ???httpfox??????????????
# location.replace('http://weibo.com ?????????
# ?????????????# ????login_url?? ??????re?????
# p = re.compile('location\.replace\(\B'(.*?)'\B\)')
# ??? ??????? re?????\'???????
try:
# Search login redirection URL
login_url = p.search(text).group(1)
data = urllib2.urlopen(login_url).read()
# Verify login feedback, check whether result is TRUE
patt_feedback = 'feedBackUrlCallBack\((.*)\)'
p = re.compile(patt_feedback, re.MULTILINE)
feedback = p.search(data).group(1)
feedback_json = json.loads(feedback)
if feedback_json['result']:
cookie_jar2.save(cookie_file,
ignore_discard=True,
ignore_expires=True)
return 1
else:
return 0
except:
return 0
python类build_opener()的实例源码
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
print('Loading cookies error')
#install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
urllib2.install_opener(opener)
print('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: #If no cookies found
return self.do_login(username, pwd, cookie_file)
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def _request(self, url, method='GET', data=None):
url = self._auth.endpoint + url
headers = self._auth.headers
if data is not None:
data = urlencode(data)
if method in ['GET', 'DELETE']:
url = url + '?' + data
data = None
else:
headers.update({'Content-Type': POST_CONTENT_TYPE})
if sys.version_info > (3,): # python3
data = data.encode('utf-8')
log.debug(method + ' ' + url)
log.debug(data)
try:
opener = build_opener(HTTPHandler)
request = Request(url, data=data, headers=headers)
request.get_method = lambda: method
response = opener.open(request).read()
data = self._parse_response(response)
except HTTPError as e:
log.error(e)
data = self._parse_response(e.read())
raise ApiHandlerError('Invalid server response', data)
except ValueError as e:
log.error(e)
raise ApiHandlerError('Invalid server response')
return data
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
("Referer", "https://passport.weibo.cn"),
("Origin", "https://passport.weibo.cn"),
("Host", "passport.weibo.cn")]
request.install_opener(opener)
#??
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0"),
("Referer", "http://cn.v2ex.com/signin"),
("Origin", "http://cn.v2ex.com"),
("Host", "cn.v2ex.com")]
request.install_opener(opener)
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36"),
("Referer", "https://wx.qq.com/"),
("Origin", "https://wx.qq.com/"),
("Host", "wx.qq.com")]
request.install_opener(opener)
#??uuid
def build_opener():
cookie = http.cookiejar.CookieJar()
cookie_processor = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_processor)
opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"),
("Referer", "https://www.zhihu.com/"),
("Origin", "https://www.zhihu.com/"),
("Host", "www.zhihu.com")]
request.install_opener(opener)
def getUrllibOpener():
if pythonVersion > 3.0:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ctx))
opener.addheaders = [('Content-Type', 'application/json'),('User-Agent', 'vulners-getsploit-v%s' % __version__)]
else:
opener = urllib2.build_opener(urllib2.HTTPSHandler())
opener.addheaders = [('Content-Type', 'application/json'), ('User-Agent', 'vulners-getsploit-v%s' % __version__)]
return opener
def hashed_download(url, temp, digest):
"""Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
and return its path."""
# Based on pip 1.4.1's URLOpener but with cert verification removed. Python
# >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
# authenticity has only privacy (not arbitrary code execution)
# implications, since we're checking hashes.
def opener():
opener = build_opener(HTTPSHandler())
# Strip out HTTPHandler to prevent MITM spoof:
for handler in opener.handlers:
if isinstance(handler, HTTPHandler):
opener.handlers.remove(handler)
return opener
def read_chunks(response, chunk_size):
while True:
chunk = response.read(chunk_size)
if not chunk:
break
yield chunk
response = opener().open(url)
path = join(temp, urlparse(url).path.split('/')[-1])
actual_hash = sha256()
with open(path, 'wb') as file:
for chunk in read_chunks(response, 4096):
file.write(chunk)
actual_hash.update(chunk)
actual_digest = actual_hash.hexdigest()
if actual_digest != digest:
raise HashError(url, path, actual_digest, digest)
return path
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None):
response_data = response_data or {}
response_body = json.dumps(
{
'Status': response_status,
'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name,
'PhysicalResourceId': physical_resource_id or context.log_stream_name,
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': response_data
}
)
opener = build_opener(HTTPHandler)
request = Request(event['ResponseURL'], data=response_body)
request.add_header('Content-Type', '')
request.add_header('Content-Length', len(response_body))
request.get_method = lambda: 'PUT'
try:
response = opener.open(request)
print("Status code: {}".format(response.getcode()))
print("Status message: {}".format(response.msg))
return True
except HTTPError as exc:
print("Failed executing HTTP request: {}".format(exc.code))
return False
def __init__(self):
self.DEBUG = False
self.appid = 'wx782c26e4c19acffb'
self.uuid = ''
self.base_uri = ''
self.redirect_uri = ''
self.uin = ''
self.sid = ''
self.skey = ''
self.pass_ticket = ''
self.deviceId = 'e' + repr(random.random())[2:17]
self.BaseRequest = {}
self.synckey = ''
self.SyncKey = []
self.User = []
self.MemberList = []
self.ContactList = []
self.GroupList = []
self.autoReplyMode = False
self.syncHost = ''
self._handlers = dict((k, []) for k in self.message_types)
self._handlers['location'] = []
self._handlers['all'] = []
self._filters = dict()
opener = request.build_opener(request.HTTPCookieProcessor(CookieJar()))
opener.addheaders = [('User-agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'),
('Referer','https://wx2.qq.com/')]
request.install_opener(opener)
def __init__(self, secure: bool=True) -> None:
transmissionrpc.HTTPHandler.__init__(self)
self.http_opener = build_opener()
self.auth: DataDict = None
self.secure = secure
def set_authentication(self, uri: str, login: str, password: str) -> None:
if self.secure:
context = ssl.create_default_context()
else:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
self.http_opener = build_opener(HTTPSHandler(context=context))
self.auth = {'Authorization': 'Basic %s' %
b64encode(str.encode(login +
":" +
password)).decode('utf-8')}
def getFile(cls, getfile, unpack=True):
if cls.getProxy():
proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
auth = req.HTTPBasicAuthHandler()
opener = req.build_opener(proxy, auth, req.HTTPHandler)
req.install_opener(opener)
try:
response = req.urlopen(getfile)
except:
msg = "[!] Could not fetch file %s"%getfile
if cls.exitWhenNoSource(): sys.exit(msg)
else: print(msg)
data = None
data = response.read()
# TODO: if data == text/plain; charset=utf-8, read and decode
if unpack:
if 'gzip' in response.info().get('Content-Type'):
data = gzip.GzipFile(fileobj = BytesIO(data))
elif 'bzip2' in response.info().get('Content-Type'):
data = BytesIO(bz2.decompress(data))
elif 'zip' in response.info().get('Content-Type'):
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
# In case the webserver is being generic
elif 'application/octet-stream' in response.info().get('Content-Type'):
if data[:4] == b'PK\x03\x04': # Zip
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
return (data, response)
def request(self, host, handler, request_body, verbose=0):
"""Send XMLRPC request"""
uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
host=host, handler=handler)
if self._passmgr:
self._passmgr.add_password(None, uri, self._username,
self._password)
if self.verbose:
_LOGGER.debug("FabricTransport: {0}".format(uri))
opener = urllib2.build_opener(*self._handlers)
headers = {
'Content-Type': 'text/xml',
'User-Agent': self.user_agent,
}
req = urllib2.Request(uri, request_body, headers=headers)
try:
return self.parse_response(opener.open(req))
except (urllib2.URLError, urllib2.HTTPError) as exc:
try:
code = -1
if exc.code == 400:
reason = 'Permission denied'
code = exc.code
else:
reason = exc.reason
msg = "{reason} ({code})".format(reason=reason, code=code)
except AttributeError:
if 'SSL' in str(exc):
msg = "SSL error"
else:
msg = str(exc)
raise InterfaceError("Connection with Fabric failed: " + msg)
except BadStatusLine:
raise InterfaceError("Connection with Fabric failed: check SSL")
def test_urllib_request_opener(self):
"""
When making a request via urllib.request.OpenerDirector
we return the original response
we capture a span for the request
"""
opener = build_opener()
with override_global_tracer(self.tracer):
resp = opener.open(URL_200)
self.assertEqual(self.to_str(resp.read()), '')
self.assertEqual(resp.getcode(), 200)
spans = self.tracer.writer.pop()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.span_type, 'http')
self.assertIsNone(span.service)
self.assertEqual(span.name, self.SPAN_NAME)
self.assertEqual(span.error, 0)
self.assertEqual(span.get_tag('http.method'), 'GET')
self.assertEqual(span.get_tag('http.status_code'), '200')
self.assertEqual(span.get_tag('http.url'), URL_200)
# Additional Python2 test cases for urllib
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
self.request_opener = urllib2.urlopen
if sessions:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
self.request_opener = opener.open
self._timeout = timeout
def create_cookie_opener(self):
'''
????Cookie
:return: ????????opener
'''
cookie = cookiejar.CookieJar()
cookie_process = request.HTTPCookieProcessor(cookie)
opener = request.build_opener(cookie_process)
return opener
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
self.request_opener = urllib2.urlopen
if sessions:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
self.request_opener = opener.open
self._timeout = timeout