def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
if url is None:
return None
try:
req = request.Request(url, headers=headers, data=data)
cookie = cookiejar.CookieJar()
cookie_process = request.HTTPCookieProcessor(cookie)
opener = request.build_opener()
if proxy:
proxies = {urlparse(url).scheme: proxy}
opener.add_handler(request.ProxyHandler(proxies))
content = opener.open(req).read()
except error.URLError as e:
print('HtmlDownLoader download error:', e.reason)
content = None
if retry_count > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
#??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
return self.download(url, retry_count-1, headers, proxy, data)
return content
python类build_opener()的实例源码
def get_response(url, faker = False):
logging.debug('get_response: %s' % url)
# install cookies
if cookies:
opener = request.build_opener(request.HTTPCookieProcessor(cookies))
request.install_opener(opener)
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
else:
response = request.urlopen(url)
data = response.read()
if response.info().get('Content-Encoding') == 'gzip':
data = ungzip(data)
elif response.info().get('Content-Encoding') == 'deflate':
data = undeflate(data)
response.data = data
return response
# DEPRECATED in favor of get_content()
def youtube(self, ctx, *, ytsearch: str):
"""Does a little YouTube search."""
opener = request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
search = ytsearch.split()
search = "+".join(search)
errorthing = ytsearch
url = ('https://www.youtube.com/results?search_query={}'.format(search))
ourUrl = opener.open(url).read()
await self.bot.type()
soup = bs(ourUrl, "html.parser")
alexpls = re.findall('"(/watch\?v=.*?)"',
str(soup.find_all('a',
attrs={'href': re.compile('^/watch\?v=.*')})))
try:
await self.bot.say('{}: https://www.youtube.com{}'.format(ctx.message.author.mention, alexpls[0]))
except IndexError:
await self.bot.say('Sorry I could not find any results containing the name `{}`'.format(errorthing))
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
handlers = []
if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or
(sys.version_info[0] == 3 and sys.version_info >= (3,2,0))):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
handlers.append(urllib2.HTTPSHandler(context=context))
if sessions:
handlers.append(urllib2.HTTPCookieProcessor(CookieJar()))
opener = urllib2.build_opener(*handlers)
self.request_opener = opener.open
self._timeout = timeout
def follow_redirects(link, sites= None):
"""Follow directs for the link as long as the redirects are on the given
sites and return the resolved link."""
def follow(url):
return sites == None or urlparse.urlparse(url).hostname in sites
class RedirectHandler(urllib2.HTTPRedirectHandler):
def __init__(self):
self.last_url = None
def redirect_request(self, req, fp, code, msg, hdrs, newurl):
self.last_url = newurl
if not follow(newurl):
return None
r = urllib2.HTTPRedirectHandler.redirect_request(
self, req, fp, code, msg, hdrs, newurl)
r.get_method = lambda : 'HEAD'
return r
if not follow(link):
return link
redirect_handler = RedirectHandler()
opener = urllib2.build_opener(redirect_handler)
req = urllib2.Request(link)
req.get_method = lambda : 'HEAD'
try:
with contextlib.closing(opener.open(req,timeout=1)) as site:
return site.url
except:
return redirect_handler.last_url if redirect_handler.last_url else link
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def get_access_token(self, code, state=None):
'''
In callback url: http://host/callback?code=123&state=xyz
use code and state to get an access token.
'''
kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code)
if self._redirect_uri:
kw['redirect_uri'] = self._redirect_uri
if state:
kw['state'] = state
opener = build_opener(HTTPSHandler)
request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw))
request.get_method = _METHOD_MAP['POST']
request.add_header('Accept', 'application/json')
try:
response = opener.open(request, timeout=TIMEOUT)
r = _parse_json(response.read())
if 'error' in r:
raise ApiAuthError(str(r.error))
return str(r.access_token)
except HTTPError as e:
raise ApiAuthError('HTTPError when get access token')
def cookie_friendly_download(referer_url, file_url, store_dir='.', timeout=1000):
from http.cookiejar import CookieJar
from urllib import request
cj = CookieJar()
cp = request.HTTPCookieProcessor(cj)
opener = request.build_opener(cp)
with opener.open(referer_url) as fin:
fin.headers.items()
import os
from os import path
with opener.open(file_url, timeout=timeout) as fin:
file_bin = fin.read()
filename = fin.headers['Content-Disposition']
filename = filename.split(';')[-1].split('=')[1]
os.makedirs(store_dir, exist_ok=True)
with open(path.join(store_dir, filename), mode='wb') as fout:
fout.write(file_bin)
return path.join(store_dir, filename)
def get_req(self, start_size, end_size):
'''??socket'''
logger.debug('DownloadBatch.get_req: %s, %s' % (start_size, end_size))
opener = request.build_opener()
content_range = 'bytes={0}-{1}'.format(start_size, end_size)
opener.addheaders = [
('Range', content_range),
('User-Agent', const.USER_AGENT),
('Referer', const.PAN_REFERER),
]
for i in range(RETRIES):
try:
return opener.open(self.url, timeout=self.timeout)
except OSError:
logger.error(traceback.format_exc())
self.queue.put((self.id_, BATCH_ERROR), block=False)
return None
except:
self.queue.put((self.id_, BATCH_ERROR), block=False)
return None
else:
return None
def authenticate(top_level_url=u'https://api.github.com'):
try:
if 'GH_AUTH_USER' not in os.environ:
try:
username = raw_input(u'Username: ')
except NameError:
username = input(u'Username: ')
else:
username = os.environ['GH_AUTH_USER']
if 'GH_AUTH_PASS' not in os.environ:
password = getpass.getpass(u'Password: ')
else:
password = os.environ['GH_AUTH_USER']
except KeyboardInterrupt:
sys.exit(u'')
try:
import urllib.request as urllib_alias
except ImportError:
import urllib2 as urllib_alias
password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, top_level_url, username, password)
handler = urllib_alias.HTTPBasicAuthHandler(password_mgr)
opener = urllib_alias.build_opener(handler)
urllib_alias.install_opener(opener)
def supports_site(url):
"""
Rss Crawler are supported if by every site containing an rss feed.
Determines if this crawler works on the given url.
:param str url: The url to test
:return bool: Determines wether this crawler work on the given url
"""
# Follow redirects
opener = urllib2.build_opener(urllib2.HTTPRedirectHandler)
redirect = opener.open(url).url
response = urllib2.urlopen(redirect).read()
# Check if a standard rss feed exists
return re.search(
r'(<link[^>]*href[^>]*type ?= ?"application\/rss\+xml"|' +
r'<link[^>]*type ?= ?"application\/rss\+xml"[^>]*href)',
response.decode('utf-8')) is not None
def main(which_days):
for day in which_days:
day_input_file = os.path.join(root_dir, 'input_{0:02d}.txt'.format(day))
if not os.path.exists(day_input_file):
session_token = os.environ.get("AOC_SESSION_TOKEN")
if session_token is None:
raise ValueError("Must set AOC_SESSION_TOKEN environment variable!")
url = 'https://adventofcode.com/2016/day/{0}/input'.format(day)
opener = build_opener()
opener.addheaders.append(('Cookie', 'session={0}'.format(session_token)))
response = opener.open(url)
with open(day_input_file, 'w') as f:
f.write(response.read().decode("utf-8"))
print("Solutions to Day {0:02d}\n-------------------".format(day))
# Horrible way to run scripts, but I did not want to rewrite old solutions.
day_module = __import__('{0:02d}'.format(day))
print('')
def proxyurllib():
print(COLOR_GREEN+'-'*30+COLOR_NONE)
#TODO proxy
handler=request.ProxyHandler({'http':'http://10.112.5.173:49908'})
'''
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler()
proxy_auth_handler.add_password('realm', 'host', 'username', 'password')
'''
opener=request.build_opener(handler)
request.install_opener(opener)
#??opener??urlopen()?????URL opener??????urlopen()????????opener???response=
google = request.urlopen('http://www.google.com')
print(google.read())
print("?????",request.getproxies())
#proxyurllib()
#FIXME ROBOT.TXT??
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
handlers = []
if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or
(sys.version_info[0] == 3 and sys.version_info >= (3,2,0))):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
handlers.append(urllib2.HTTPSHandler(context=context))
if sessions:
handlers.append(urllib2.HTTPCookieProcessor(CookieJar()))
opener = urllib2.build_opener(*handlers)
self.request_opener = opener.open
self._timeout = timeout
def make_request(self,
method,
request_body=None,
query_params=None,
request_headers=None):
method = method.upper()
if request_headers:
self._set_headers(request_headers)
request_body = json.dumps(request_body) if request_body else None
query_params = query_params if query_params else None
opener = urllib.build_opener()
request = urllib.Request(self._build_url(query_params),
data=request_body)
for key, value in self.request_headers.iteritems():
request.add_header(key, value)
request.get_method = lambda: method
self._response = opener.open(request)
self._set_response(self._response)
self._reset()
def make_request(*args):
if platform.system() == "Windows": #pragma: no cover
sctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sh = urllib2.HTTPSHandler(debuglevel=0, context=sctx)
opener = urllib2.build_opener(sh)
else:
opener = build_opener()
opener.addheaders = [('User-agent',
'Mozilla/5.0' + str(random.randrange(1000000)))]
try:
return opener.open(*args).read().strip()
except Exception as e:
try:
p = e.read().strip()
except:
p = e
raise Exception(p)
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def index(request):
if request.method == "GET":
try:
ssl._create_default_https_context = ssl._create_unverified_context
opener = wdf_urllib.build_opener(
wdf_urllib.HTTPCookieProcessor(CookieJar()))
wdf_urllib.install_opener(opener)
except:
pass
uuid = getUUID()
url = 'https://login.weixin.qq.com/qrcode/' + uuid
params = {
't': 'webwx',
'_': int(time.time()),
}
request = getRequest(url=url, data=urlencode(params))
response = wdf_urllib.urlopen(request)
context = {
'uuid': uuid,
'response': response.read(),
'delyou': '',
}
return render_to_response('index.html', context)
def get_response(url, faker = False):
logging.debug('get_response: %s' % url)
# install cookies
if cookies:
opener = request.build_opener(request.HTTPCookieProcessor(cookies))
request.install_opener(opener)
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
else:
response = request.urlopen(url)
data = response.read()
if response.info().get('Content-Encoding') == 'gzip':
data = ungzip(data)
elif response.info().get('Content-Encoding') == 'deflate':
data = undeflate(data)
response.data = data
return response
# DEPRECATED in favor of get_content()
def send_signal(event, response_status, reason, response_data=None):
response_body = json.dumps(
{
'Status': response_status,
'Reason': str(reason or 'ReasonCanNotBeNone'),
'PhysicalResourceId': event.get('PhysicalResourceId', event['LogicalResourceId']),
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Data': response_data or {}
},
sort_keys=True,
)
logging.debug(response_body)
opener = build_opener(HTTPHandler)
request = Request(event['ResponseURL'], data=response_body)
request.add_header('Content-Type', '')
request.add_header('Content-Length', len(response_body))
request.get_method = lambda: 'PUT'
opener.open(request)
def __init__(self, writing=WRITING_NATIVE, opener=None, retry_times=4, executor=_g_executor,
timeout=4, service_urls=('http://translate.google.com',), debug=False):
self._DEBUG = debug
self._MIN_TASKS_FOR_CONCURRENT = 2
self._opener = opener
self._languages = None
self._TIMEOUT = timeout
if not self._opener:
debuglevel = self._DEBUG and 1 or 0
self._opener = build_opener(
HTTPHandler(debuglevel=debuglevel),
HTTPSHandler(debuglevel=debuglevel))
self._RETRY_TIMES = retry_times
self._executor = executor
self._writing = writing
if _is_sequence(service_urls):
self._service_urls = service_urls
else:
self._service_urls = (service_urls,)
def __init__(self):
self.articles = []
self.query = None
self.cjar = MozillaCookieJar()
# If we have a cookie file, load it:
if ScholarConf.COOKIE_JAR_FILE and \
os.path.exists(ScholarConf.COOKIE_JAR_FILE):
try:
self.cjar.load(ScholarConf.COOKIE_JAR_FILE,
ignore_discard=True)
print "Using cookie file"
ScholarUtils.log('info', 'loaded cookies file')
except Exception as msg:
print "Ignoring cookie file: %s" % msg
ScholarUtils.log('warn', 'could not load cookies file: %s' % msg)
self.cjar = MozillaCookieJar() # Just to be safe
self.opener = build_opener(HTTPCookieProcessor(self.cjar))
self.settings = None # Last settings object, if any
def __get_cookies(self, req):
cookies = cookiejar.CookieJar()
handler = request.HTTPCookieProcessor(cookies)
opener = request.build_opener(handler)
try:
with opener.open(req) as f:
if f.code == 200:
pattern = re.compile(r"<input.*?type='hidden'.*?name='csrfmiddlewaretoken'.*?value='(.*?)'.*>")
try:
self.csrfmiddlewaretoken = pattern.search(f.read().decode("utf-8")).group(1)
print("Achieved cookies and csrfmiddlewaretoken sucessfully")
except:
print("Achieved cookies sucessfully")
return cookies
else:
print("Lost cookies")
except error.URLError as e:
if hasattr(e, "reason"):
print ("We failed to reach a server. Please check your url and read the Reason")
print ("Reason: {}".format(e.reason))
elif hasattr(e, "code"):
print("The server couldn't fulfill the request.")
print("Error code: {}".format(e.code))
exit()
def supports_site(url):
"""
Rss Crawler are supported if by every site containing an rss feed.
Determines if this crawler works on the given url.
:param str url: The url to test
:return bool: Determines wether this crawler work on the given url
"""
# Follow redirects
opener = urllib2.build_opener(urllib2.HTTPRedirectHandler)
redirect = opener.open(url).url
response = urllib2.urlopen(redirect).read()
# Check if a standard rss feed exists
return re.search(
r'(<link[^>]*href[^>]*type ?= ?"application\/rss\+xml"|' +
r'<link[^>]*type ?= ?"application\/rss\+xml"[^>]*href)',
response.decode('utf-8')) is not None
def __init__(self, server, port, username, password):
""" Connection Class init call """
self.server = server
self.port = port
self.username = username
self.password = password
self.url = 'https://{0}:{1}'.format(self.server,self.port)
self.api = '/api/1.1/xml'
self.authtoken = ''
self.response = None
self.sync_id = ''
#force urllib2 to not use a proxy
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
self.login()
#Gets called in __init__
def default_urllib2_opener(config):
if config is not None:
proxy_server = config.get("http", "proxy")
else:
proxy_server = None
handlers = []
if proxy_server is not None:
handlers.append(urllib2.ProxyHandler({"http": proxy_server}))
opener = urllib2.build_opener(*handlers)
if config is not None:
user_agent = config.get("http", "useragent")
else:
user_agent = None
if user_agent is None:
user_agent = default_user_agent_string()
opener.addheaders = [('User-agent', user_agent)]
return opener
def make_request(*args):
opener = build_opener()
opener.addheaders = [('User-agent',
'Mozilla/5.0'+str(random.randrange(1000000)))]
try:
return opener.open(*args).read().strip()
except Exception as e:
try:
p = e.read().strip()
except:
p = e
raise Exception(p)
def login(self, username, pwd, cookie_file):
""""
Login with use name, password and cookies.
(1) If cookie file exists then try to load cookies;
(2) If no cookies found then do login
"""
# If cookie file exists then try to load cookies
if os.path.exists(cookie_file):
try:
cookie_jar = cookielib.LWPCookieJar(cookie_file)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
loaded = 1
except cookielib.LoadError:
loaded = 0
LOG.info('Loading cookies error')
# install loaded cookies for urllib2
if loaded:
cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
opener = urllib2.build_opener(cookie_support,
urllib2.HTTPHandler)
urllib2.install_opener(opener)
LOG.info('Loading cookies success')
return 1
else:
return self.do_login(username, pwd, cookie_file)
else: # If no cookies found
return self.do_login(username, pwd, cookie_file)