def get_cookie(site, params, agent):
"""
type params: list;
param params: post-??????;
"""
post = urllib.urlencode(params)
cookie = None
try:
urlopener = urllib.FancyURLopener()
urlopener.addheaders = [("User-agent", agent)]
cookie = urlopener.open(site, post).info()["Set-Cookie"].split()
except Exception:
return False
return cookie
python类FancyURLopener()的实例源码
def get_cookie(site, params, agent):
"""
type site: str
param site: ?????? ?? ???????? ????? ?? ????
type params: list
param params: post-??????
type agent: str
param agent: ?????????? ?? ??????
"""
post = urllib.urlencode(params)
try:
urlopener = urllib.FancyURLopener()
urlopener.addheaders = [("User-agent", agent)]
cookie = urlopener.open(site, post).info()["Set-Cookie"].split()
except Exception:
return False
return cookie
def open_file(self, url):
path = urllib.url2pathname(urllib.unquote(url))
if os.path.isdir(path):
if path[-1] != os.sep:
url = url + '/'
indexpath = os.path.join(path, "index.html")
if os.path.exists(indexpath):
return self.open_file(url + "index.html")
try:
names = os.listdir(path)
except os.error, msg:
raise IOError, msg, sys.exc_traceback
names.sort()
s = MyStringIO("file:"+url, {'content-type': 'text/html'})
s.write('<BASE HREF="file:%s">\n' %
urllib.quote(os.path.join(path, "")))
for name in names:
q = urllib.quote(name)
s.write('<A HREF="%s">%s</A>\n' % (q, q))
s.seek(0)
return s
return urllib.FancyURLopener.open_file(self, url)
def proxyvalidator(proxylist):
finalcount = 0
for proxy in proxylist:
proxy.replace('\n', '')
try:
proxies = {'http': "http://"+proxy[:-1]}
opener = urllib.FancyURLopener(proxies)
try:
loopchk = opener.open("http://www.google.com").read()
except:
pass
except(IOError,socket.timeout), detail:
pass
ipcheck(proxy)
alivelist.append(proxy)
finalcount += 1
return alivelist
def __init__(self, *args):
urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.errcode = errcode
return urllib.FancyURLopener.http_error_default(self, url, fp, errcode,
errmsg, headers)
def expect(self, pattern, *args, **kwargs):
print >>self.structured_log_f, "expect(" + repr(pattern) + ")"
r = pexpect.spawn.expect(self, pattern, *args, **kwargs)
print >>self.structured_log_f, "match(" + repr(self.match.group(0)) + ")"
return r
# Subclass urllib.FancyURLopener so that we can catch
# HTTP 404 errors
def __init__(self, *args):
urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.errcode = errcode
return urllib.FancyURLopener.http_error_default(self, url, fp, errcode,
errmsg, headers)
def Download(link, filename):
subtitle_list = []
exts = [".srt", ".sub", ".txt", ".smi", ".ssa", ".ass"]
if link:
downloadlink = link
log(__name__, "Downloadlink %s" % link)
class MyOpener(urllib.FancyURLopener):
version = "User-Agent=Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3 ( .NET CLR 3.5.30729)"
my_urlopener = MyOpener()
my_urlopener.addheader('Referer', link)
postparams = None
log(__name__, "Fetching subtitles using url '%s' with referer header '%s' and post parameters '%s'" % (link, link, postparams))
response = my_urlopener.open(link, postparams)
local_tmp_file = os.path.join(__temp__, "sub.srt")
if xbmcvfs.exists(__temp__):
shutil.rmtree(__temp__)
xbmcvfs.mkdirs(__temp__)
try:
log(__name__, "Saving subtitles to '%s'" % local_tmp_file)
local_file_handle = open(local_tmp_file, "wb")
local_file_handle.write(response.read())
local_file_handle.close()
subtitle_list.append(local_tmp_file)
log(__name__, "=== returning subtitle file %s" % file)
except:
log(__name__, "Failed to save subtitle to %s" % local_tmp_file)
return subtitle_list
def wsopen(self, url, post, **params):
noparam = params.pop('noparam',False)
if noparam:
params = {}
else:
if self.user is not None:
params['user'] = self.user
if self.password is not None:
params.pop('hmac', None)
HMAC=hmac.new(self.password)
for k,v in sorted(params.items()):
HMAC.update("%s=%s" % (k,v))
params.update({'hmac':HMAC.hexdigest()})
query = urllib.urlencode(params)
if post:
body = query
elif query:
url = "{}?{}".format(url, query)
if self.debug:
if post:
print("POST:\n{}\n{!r}\n".format(url, body), file=sys.stderr)
else:
print("GET:\n{}\n".format(url), file=sys.stderr)
class URLopener(urllib.FancyURLopener):
def http_error_default(self, url, fp, errcode, errmsg, headers):
return urllib.addinfourl(fp, headers, "http:" + url, errcode)
try:
urllib._urlopener = URLopener()
if post:
resp = urllib.urlopen(url, body)
else:
resp = urllib.urlopen(url)
except IOError as e:
raise WSError(url, msg=e)
if self.debug:
print("RESPONSE:\n{}\n{}".format(resp.getcode(), resp.info()), file=sys.stderr)
if resp.getcode() != 200:
raise WSError(url, resp.getcode(), resp.read())
return resp
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.testmode == True:
url = 'https://test.authorize.net/gateway/transact.dll'
else:
url = 'https://secure.authorize.net/gateway/transact.dll'
if self.proxy is None:
self.results += str(urllib.urlopen(
url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(url, encoded_args)
try:
self.results += str(opened.read()).split(self.delimiter)
finally:
opened.close()
Results = namedtuple('Results', 'ResultResponse ResponseSubcode ResponseCode ResponseText AuthCode \
AVSResponse TransactionID InvoiceNumber Description Amount PaymentMethod \
TransactionType CustomerID CHFirstName CHLastName Company BillingAddress \
BillingCity BillingState BillingZip BillingCountry Phone Fax Email ShippingFirstName \
ShippingLastName ShippingCompany ShippingAddress ShippingCity ShippingState \
ShippingZip ShippingCountry TaxAmount DutyAmount FreightAmount TaxExemptFlag \
PONumber MD5Hash CVVResponse CAVVResponse')
self.response = Results(*tuple(r for r in self.results)[0:40])
if self.getResultResponseFull() == 'Approved':
self.error = False
self.success = True
self.declined = False
elif self.getResultResponseFull() == 'Declined':
self.error = False
self.success = False
self.declined = True
else:
raise AIM.AIMError(self.response.ResponseText)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.proxy is None:
results = str(urllib.urlopen(
self.url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(self.url, encoded_args)
try:
results = str(opened.read()).split(self.delimiter)
finally:
opened.close()
for result in results:
(key, val) = result.split('=')
self.results[key] = val
if self.results['response'] == '1':
self.error = False
self.success = True
self.declined = False
elif self.results['response'] == '2':
self.error = False
self.success = False
self.declined = True
elif self.results['response'] == '3':
self.error = True
self.success = False
self.declined = False
else:
self.error = True
self.success = False
self.declined = False
raise DowCommerce.DowCommerceError(self.results)
def fetch_url(src, dst):
"""
Fetch file from URL src and save it to dst.
"""
# we do not use the nicer sys.version_info.major
# for compatibility with Python < 2.7
if sys.version_info[0] > 2:
import urllib.request
class URLopener(urllib.request.FancyURLopener):
def http_error_default(self, url, fp, errcode, errmsg, headers):
sys.stderr.write("ERROR: could not fetch {0}\n".format(url))
sys.exit(-1)
else:
import urllib
class URLopener(urllib.FancyURLopener):
def http_error_default(self, url, fp, errcode, errmsg, headers):
sys.stderr.write("ERROR: could not fetch {0}\n".format(url))
sys.exit(-1)
dirname = os.path.dirname(dst)
if dirname != '':
if not os.path.isdir(dirname):
os.makedirs(dirname)
opener = URLopener()
opener.retrieve(src, dst)
def __init__(self):
super(FancyURLopener, self).__init__()
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.testmode == True:
url = 'https://test.authorize.net/gateway/transact.dll'
else:
url = 'https://secure.authorize.net/gateway/transact.dll'
if self.proxy is None:
self.results += str(urllib.urlopen(
url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(url, encoded_args)
try:
self.results += str(opened.read()).split(self.delimiter)
finally:
opened.close()
Results = namedtuple('Results', 'ResultResponse ResponseSubcode ResponseCode ResponseText AuthCode \
AVSResponse TransactionID InvoiceNumber Description Amount PaymentMethod \
TransactionType CustomerID CHFirstName CHLastName Company BillingAddress \
BillingCity BillingState BillingZip BillingCountry Phone Fax Email ShippingFirstName \
ShippingLastName ShippingCompany ShippingAddress ShippingCity ShippingState \
ShippingZip ShippingCountry TaxAmount DutyAmount FreightAmount TaxExemptFlag \
PONumber MD5Hash CVVResponse CAVVResponse')
self.response = Results(*tuple(r for r in self.results)[0:40])
if self.getResultResponseFull() == 'Approved':
self.error = False
self.success = True
self.declined = False
elif self.getResultResponseFull() == 'Declined':
self.error = False
self.success = False
self.declined = True
else:
raise AIM.AIMError(self.response.ResponseText)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.proxy is None:
results = str(urllib.urlopen(
self.url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(self.url, encoded_args)
try:
results = str(opened.read()).split(self.delimiter)
finally:
opened.close()
for result in results:
(key, val) = result.split('=')
self.results[key] = val
if self.results['response'] == '1':
self.error = False
self.success = True
self.declined = False
elif self.results['response'] == '2':
self.error = False
self.success = False
self.declined = True
elif self.results['response'] == '3':
self.error = True
self.success = False
self.declined = False
else:
self.error = True
self.success = False
self.declined = False
raise DowCommerce.DowCommerceError(self.results)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.testmode == True:
url = 'https://test.authorize.net/gateway/transact.dll'
else:
url = 'https://secure.authorize.net/gateway/transact.dll'
if self.proxy is None:
self.results += str(urllib.urlopen(
url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(url, encoded_args)
try:
self.results += str(opened.read()).split(self.delimiter)
finally:
opened.close()
Results = namedtuple('Results', 'ResultResponse ResponseSubcode ResponseCode ResponseText AuthCode \
AVSResponse TransactionID InvoiceNumber Description Amount PaymentMethod \
TransactionType CustomerID CHFirstName CHLastName Company BillingAddress \
BillingCity BillingState BillingZip BillingCountry Phone Fax Email ShippingFirstName \
ShippingLastName ShippingCompany ShippingAddress ShippingCity ShippingState \
ShippingZip ShippingCountry TaxAmount DutyAmount FreightAmount TaxExemptFlag \
PONumber MD5Hash CVVResponse CAVVResponse')
self.response = Results(*tuple(r for r in self.results)[0:40])
if self.getResultResponseFull() == 'Approved':
self.error = False
self.success = True
self.declined = False
elif self.getResultResponseFull() == 'Declined':
self.error = False
self.success = False
self.declined = True
else:
raise AIM.AIMError(self.response.ResponseText)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.proxy is None:
results = str(urllib.urlopen(
self.url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(self.url, encoded_args)
try:
results = str(opened.read()).split(self.delimiter)
finally:
opened.close()
for result in results:
(key, val) = result.split('=')
self.results[key] = val
if self.results['response'] == '1':
self.error = False
self.success = True
self.declined = False
elif self.results['response'] == '2':
self.error = False
self.success = False
self.declined = True
elif self.results['response'] == '3':
self.error = True
self.success = False
self.declined = False
else:
self.error = True
self.success = False
self.declined = False
raise DowCommerce.DowCommerceError(self.results)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.testmode == True:
url = 'https://test.authorize.net/gateway/transact.dll'
else:
url = 'https://secure.authorize.net/gateway/transact.dll'
if self.proxy is None:
self.results += str(urllib.urlopen(
url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(url, encoded_args)
try:
self.results += str(opened.read()).split(self.delimiter)
finally:
opened.close()
Results = namedtuple('Results', 'ResultResponse ResponseSubcode ResponseCode ResponseText AuthCode \
AVSResponse TransactionID InvoiceNumber Description Amount PaymentMethod \
TransactionType CustomerID CHFirstName CHLastName Company BillingAddress \
BillingCity BillingState BillingZip BillingCountry Phone Fax Email ShippingFirstName \
ShippingLastName ShippingCompany ShippingAddress ShippingCity ShippingState \
ShippingZip ShippingCountry TaxAmount DutyAmount FreightAmount TaxExemptFlag \
PONumber MD5Hash CVVResponse CAVVResponse')
self.response = Results(*tuple(r for r in self.results)[0:40])
if self.getResultResponseFull() == 'Approved':
self.error = False
self.success = True
self.declined = False
elif self.getResultResponseFull() == 'Declined':
self.error = False
self.success = False
self.declined = True
else:
raise AIM.AIMError(self.response.ResponseText)
def process(self):
encoded_args = urllib.urlencode(self.parameters)
if self.proxy is None:
results = str(urllib.urlopen(
self.url, encoded_args).read()).split(self.delimiter)
else:
opener = urllib.FancyURLopener(self.proxy)
opened = opener.open(self.url, encoded_args)
try:
results = str(opened.read()).split(self.delimiter)
finally:
opened.close()
for result in results:
(key, val) = result.split('=')
self.results[key] = val
if self.results['response'] == '1':
self.error = False
self.success = True
self.declined = False
elif self.results['response'] == '2':
self.error = False
self.success = False
self.declined = True
elif self.results['response'] == '3':
self.error = True
self.success = False
self.declined = False
else:
self.error = True
self.success = False
self.declined = False
raise DowCommerce.DowCommerceError(self.results)
def __init__(self, *args):
urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.errcode = errcode
return urllib.FancyURLopener.http_error_default(self, url, fp, errcode,
errmsg, headers)
def test_redirect_limit_independent(self):
# Ticket #12923: make sure independent requests each use their
# own retry limit.
for i in range(urllib.FancyURLopener().maxtries):
self.fakehttp(b'''HTTP/1.1 302 Found
Location: file://guidocomputer.athome.com:/python/license
Connection: close
''')
try:
self.assertRaises(IOError, urllib.urlopen,
"http://something")
finally:
self.unfakehttp()
def test_getcode(self):
# test getcode() with the fancy opener to get 404 error codes
URL = "http://www.pythontest.net/XXXinvalidXXX"
open_url = urllib.FancyURLopener().open(URL)
try:
code = open_url.getcode()
finally:
open_url.close()
self.assertEqual(code, 404)
def __init__(*args):
self = args[0]
apply(urllib.FancyURLopener.__init__, args)
self.addheaders = [
('User-agent', 'Python-webchecker/%s' % __version__),
]
def __init__(self, *args):
urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.errcode = errcode
return urllib.FancyURLopener.http_error_default(self, url, fp, errcode,
errmsg, headers)
def test_redirect_limit_independent(self):
# Ticket #12923: make sure independent requests each use their
# own retry limit.
for i in range(urllib.FancyURLopener().maxtries):
self.fakehttp(b'''HTTP/1.1 302 Found
Location: file://guidocomputer.athome.com:/python/license
Connection: close
''')
try:
self.assertRaises(IOError, urllib.urlopen,
"http://something")
finally:
self.unfakehttp()
def test_getcode(self):
# test getcode() with the fancy opener to get 404 error codes
URL = "http://www.pythontest.net/XXXinvalidXXX"
open_url = urllib.FancyURLopener().open(URL)
try:
code = open_url.getcode()
finally:
open_url.close()
self.assertEqual(code, 404)