def file(self, path, output, args={}, progress_callback=lambda *x: None):
self.logger.debug('??????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, False)
self.curl.setopt(pycurl.WRITEDATA, output)
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
python类NOBODY的实例源码
def file_size(self, path, args={}):
self.logger.debug('????????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, True)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, io.BytesIO())
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
def web_redirect(self, path, args={}):
self.logger.debug('????????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
headers = io.BytesIO()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, NoneIO())
self.curl.setopt(pycurl.HEADERFUNCTION, headers.write)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 302:
raise ServerError(status)
for header_line in headers.getvalue().split(b'\r\n'):
if header_line.startswith(b'Location:'):
return header_line.split(b':', maxsplit=1)[1].strip().decode()
return None
def api(self, args, encoding='utf-8', allow_return_none=False):
self.logger.debug('???? API ??')
if args.get('mode', '') == 'semester':
semester = args.get('semester', '')
if allow_return_none and self.api_cache == semester:
self.logger.debug('????? {} ?? API ??'.format(semester))
return
self.api_cache = semester
query_args = dict()
query_args.update(self.api_args)
query_args.update(args)
url = self.api_url + '?' + urllib.parse.urlencode(query_args)
data = io.BytesIO()
self.logger.debug('HTTP ?????{}'.format(url))
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.api_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, data)
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
try:
value = data.getvalue()
return json.loads(value.decode(encoding))
except json.decoder.JSONDecodeError:
raise NotJSONError(value.decode(encoding))
def web(self, path, args={}, encoding=None, allow_return_none=False):
self.logger.debug('????????')
if allow_return_none:
if path in self.web_cache and self.web_cache[path] == args:
self.logger.debug('????? {} ????'.format(path))
self.logger.debug('???{}'.format(args))
return
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
data = io.BytesIO()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, data)
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
data.seek(io.SEEK_SET)
return etree.parse(data, etree.HTMLParser(
encoding=encoding, remove_comments=True))
def head(self):
conn=pycurl.Curl()
conn.setopt(pycurl.SSL_VERIFYPEER,False)
conn.setopt(pycurl.SSL_VERIFYHOST,0)
conn.setopt(pycurl.URL,self.completeUrl)
conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD
conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
conn.perform()
rp=Response()
rp.parseResponse(self.__performHead)
self.response=rp
def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=False, multipart=False, decode=False):
""" load and returns a given page """
self.setRequestContext(url, get, post, referer, cookies, multipart)
self.header = ""
self.c.setopt(pycurl.HTTPHEADER, self.headers)
if just_header:
self.c.setopt(pycurl.FOLLOWLOCATION, 0)
self.c.setopt(pycurl.NOBODY, 1)
self.c.perform()
rep = self.header
self.c.setopt(pycurl.FOLLOWLOCATION, 1)
self.c.setopt(pycurl.NOBODY, 0)
else:
self.c.perform()
rep = self.getResponse()
self.c.setopt(pycurl.POSTFIELDS, "")
self.lastEffectiveURL = self.c.getinfo(pycurl.EFFECTIVE_URL)
self.code = self.verifyHeader()
self.addCookies()
if decode:
rep = self.decodeResponse(rep)
return rep
def _init_method(self):
if self.m_request.m_method == "GET":
self.m_handle.setopt(pycurl.HTTPGET, 1)
elif self.m_request.m_method == "PUT":
self.m_handle.setopt(pycurl.PUT, 1)
elif self.m_request.m_method == "POST":
if self.m_request.m_data:
l_data = self.m_request.m_data
self.m_handle.setopt(pycurl.POSTFIELDS, l_data)
else:
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST")
elif self.m_request.m_method == "HEAD":
self.m_handle.setopt(pycurl.NOBODY, 1)
elif self.m_request.m_method == "DELETE":
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
def head(self):
conn=pycurl.Curl()
conn.setopt(pycurl.SSL_VERIFYPEER,False)
conn.setopt(pycurl.SSL_VERIFYHOST,0)
conn.setopt(pycurl.URL,self.completeUrl)
conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD
conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
conn.perform()
rp=Response()
rp.parseResponse(self.__performHead)
self.response=rp
def to_pycurl_object(c, req):
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.WRITEFUNCTION, req.body_callback)
c.setopt(pycurl.HEADERFUNCTION, req.header_callback)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.SSL_VERIFYPEER, False)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.URL,req.completeUrl)
if req.getConnTimeout():
c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout())
if req.getTotalTimeout():
c.setopt(pycurl.TIMEOUT, req.getTotalTimeout())
authMethod, userpass = req.getAuth()
if authMethod or userpass:
if authMethod == "basic":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif authMethod == "ntlm":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM)
elif authMethod == "digest":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
c.setopt(pycurl.USERPWD, userpass)
c.setopt(pycurl.HTTPHEADER, req.getHeaders())
if req.method == "POST":
c.setopt(pycurl.POSTFIELDS, req.postdata)
if req.method != "GET" and req.method != "POST":
c.setopt(pycurl.CUSTOMREQUEST, req.method)
if req.method == "HEAD":
c.setopt(pycurl.NOBODY, True)
if req.followLocation:
c.setopt(pycurl.FOLLOWLOCATION, 1)
proxy = req.getProxy()
if proxy != None:
c.setopt(pycurl.PROXY, proxy)
if req.proxytype=="SOCKS5":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5)
elif req.proxytype=="SOCKS4":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4)
req.delHeader("Proxy-Connection")
return c
def process(self, pyfile):
p_url = urlparse.urlparse(pyfile.url)
netloc = p_url.netloc
pyfile.name = parse_name(p_url.path.rpartition('/')[2])
if not "@" in netloc:
#@TODO: Recheck in 0.4.10
if self.account:
servers = [x['login'] for x in self.account.getAllAccounts()]
else:
servers = []
if netloc in servers:
self.log_debug("Logging on to %s" % netloc)
self.req.addAuth(self.account.get_login('password'))
else:
pwd = self.get_password()
if ':' in pwd:
self.log_debug("Logging on to %s" % netloc)
self.req.addAuth(pwd)
else:
self.log_debug("Using anonymous logon")
try:
headers = self.load(pyfile.url, just_header=True)
except pycurl.error, e:
if "530" in e.args[1]:
self.fail(_("Authorization required"))
else:
self.fail(_("Error %d: %s") % e.args)
self.req.http.c.setopt(pycurl.NOBODY, 0)
self.log_debug(self.req.http.header)
if "content-length" in headers:
pyfile.size = headers.get("content-length")
self.download(pyfile.url)
else:
#: Naive ftp directory listing
if re.search(r'^25\d.*?"', self.req.http.header, re.M):
pyfile.url = pyfile.url.rstrip('/')
pkgname = "/".join([pyfile.package().name,
urlparse.urlparse(pyfile.url).path.rpartition('/')[2]])
pyfile.url += '/'
self.req.http.c.setopt(48, 1) #: CURLOPT_DIRLISTONLY
res = self.load(pyfile.url, decode=False)
links = [pyfile.url + x for x in res.splitlines()]
self.log_debug("LINKS", links)
self.pyload.api.addPackage(pkgname, links)
else:
self.fail(_("Unexpected server response"))
def to_pycurl_object(c, req):
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.WRITEFUNCTION, req.body_callback)
c.setopt(pycurl.HEADERFUNCTION, req.header_callback)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.SSL_VERIFYPEER, False)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.URL,req.completeUrl)
if req.getConnTimeout():
c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout())
if req.getTotalTimeout():
c.setopt(pycurl.TIMEOUT, req.getTotalTimeout())
authMethod, userpass = req.getAuth()
if authMethod or userpass:
if authMethod == "basic":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif authMethod == "ntlm":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM)
elif authMethod == "digest":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
c.setopt(pycurl.USERPWD, userpass)
c.setopt(pycurl.HTTPHEADER, req.getHeaders())
if req.method == "POST":
c.setopt(pycurl.POSTFIELDS, req.postdata)
if req.method != "GET" and req.method != "POST":
c.setopt(pycurl.CUSTOMREQUEST, req.method)
if req.method == "HEAD":
c.setopt(pycurl.NOBODY, True)
if req.followLocation:
c.setopt(pycurl.FOLLOWLOCATION, 1)
proxy = req.getProxy()
if proxy != None:
c.setopt(pycurl.PROXY, proxy)
if req.proxytype=="SOCKS5":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5)
elif req.proxytype=="SOCKS4":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4)
req.delHeader("Proxy-Connection")
return c
def load(self, url, get={}, post={}, referer=True, cookies=True,
just_header=False, multipart=False, decode=False):
"""
Load and returns a given page.
"""
self.set_request_context(url, get, post, referer, cookies, multipart)
# TODO: use http/rfc message instead
self.header = ""
if "header" in self.options:
# TODO
# print("custom header not implemented")
self.setopt(pycurl.HTTPHEADER, self.options['header'])
if just_header:
self.setopt(pycurl.FOLLOWLOCATION, 0)
self.setopt(pycurl.NOBODY, 1) # TODO: nobody= no post?
# overwrite HEAD request, we want a common request type
if post:
self.setopt(pycurl.CUSTOMREQUEST, 'POST')
else:
self.setopt(pycurl.CUSTOMREQUEST, 'GET')
try:
self.c.perform()
rep = self.header
finally:
self.setopt(pycurl.FOLLOWLOCATION, 1)
self.setopt(pycurl.NOBODY, 0)
self.unsetopt(pycurl.CUSTOMREQUEST)
else:
self.c.perform()
rep = self.get_response()
self.setopt(pycurl.POSTFIELDS, '')
self.last_url = safequote(url)
self.last_effective_url = self.c.getinfo(pycurl.EFFECTIVE_URL)
if self.last_effective_url:
self.last_url = self.last_effective_url
self.code = self.verify_header()
if cookies:
self.parse_cookies()
if decode:
rep = self.decode_response(rep)
return rep
def process(self, pyfile):
p_url = urlparse.urlparse(pyfile.url)
netloc = p_url.netloc
pyfile.name = parse_name(p_url.path.rpartition('/')[2])
if not "@" in netloc:
#@TODO: Recheck in 0.4.10
if self.account:
servers = [x['login'] for x in self.account.getAllAccounts()]
else:
servers = []
if netloc in servers:
self.log_debug("Logging on to %s" % netloc)
self.req.addAuth(self.account.get_login('password'))
else:
pwd = self.get_password()
if ':' in pwd:
self.log_debug("Logging on to %s" % netloc)
self.req.addAuth(pwd)
else:
self.log_debug("Using anonymous logon")
try:
headers = self.load(pyfile.url, just_header=True)
except pycurl.error, e:
if "530" in e.args[1]:
self.fail(_("Authorization required"))
else:
self.fail(_("Error %d: %s") % e.args)
self.req.http.c.setopt(pycurl.NOBODY, 0)
self.log_debug(self.req.http.header)
if "content-length" in headers:
pyfile.size = headers.get("content-length")
self.download(pyfile.url)
else:
#: Naive ftp directory listing
if re.search(r'^25\d.*?"', self.req.http.header, re.M):
pyfile.url = pyfile.url.rstrip('/')
pkgname = "/".join([pyfile.package().name,
urlparse.urlparse(pyfile.url).path.rpartition('/')[2]])
pyfile.url += '/'
self.req.http.c.setopt(48, 1) #: CURLOPT_DIRLISTONLY
res = self.load(pyfile.url, decode=False)
links = [pyfile.url + x for x in res.splitlines()]
self.log_debug("LINKS", links)
self.pyload.api.addPackage(pkgname, links)
else:
self.fail(_("Unexpected server response"))