def put_attributes(self, data, uuid):
'''
Description:
set attribute value with application/json content
'''
self.data = data
self.uuid = uuid
uri = "attributes/" + self.uuid + "/value"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.CUSTOMREQUEST, "PUT")
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
python类CUSTOMREQUEST的实例源码
def _get_url(self, url):
if self.API_TOKEN == None:
logging.error('none token') # 3 For ERROR level
return
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, url)
b = StringIO.StringIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.perform()
result = b.getvalue()
logging.debug('result')
except Exception as e:
logging.error(e.message)
logging.error('go error')
pass
return result
def __init__(self, uploader):
self.handle = pycurl.Curl()
self.response_headers = {}
self.output = six.StringIO()
self.status_code = None
self.handle.setopt(pycurl.CAINFO, certifi.where())
self.handle.setopt(pycurl.URL, uploader.url)
self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
self.handle.setopt(pycurl.UPLOAD, 1)
self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
self.file = uploader.get_file_stream()
self.file.seek(uploader.offset)
self.handle.setopt(pycurl.READFUNCTION, self.file.read)
self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)
headers = ["upload-offset: {}".format(uploader.offset),
"Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
self.handle.setopt(pycurl.HTTPHEADER, headers)
def get_page_data(url, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"GET")
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 30)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.perform()
page_data =stream_buffer.getvalue()
stream_buffer.close()
return page_data
def post_page_data(url, data = None, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.POSTFIELDS, data)
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"POST")
# curl.setopt(pycurl.CONNECTTIMEOUT, 30)
# curl.setopt(pycurl.TIMEOUT, 30)
curl.perform()
page_data = stream_buffer.getvalue()
stream_buffer.close()
return page_data
def searchIP(self, query, pages, queue, STOP_ME):
if self.API_TOKEN == None:
print "please config your API_TOKEN"
sys.exit()
for page in range(1, pages+1):
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page))
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.perform()
hosts = json.loads(b.getvalue())
for host in hosts['matches']:
queue.put(host["ip"])
STOP_ME[0] = True
def _init_method(self):
if self.m_request.m_method == "GET":
self.m_handle.setopt(pycurl.HTTPGET, 1)
elif self.m_request.m_method == "PUT":
self.m_handle.setopt(pycurl.PUT, 1)
elif self.m_request.m_method == "POST":
if self.m_request.m_data:
l_data = self.m_request.m_data
self.m_handle.setopt(pycurl.POSTFIELDS, l_data)
else:
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST")
elif self.m_request.m_method == "HEAD":
self.m_handle.setopt(pycurl.NOBODY, 1)
elif self.m_request.m_method == "DELETE":
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
def _login(self):
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, self.url)
b = StringIO.StringIO()
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CUSTOMREQUEST, "POST")
c.setopt(pycurl.POSTFIELDS, self.post_data)
c.perform()
if b.getvalue():
logging.info('success login') # For INFO level
self.API_TOKEN = json.loads(b.getvalue())["access_token"]
self.save_token()
else:
logging.warning('success fail,get null result') #2 For WARNING level
logging.debug(self.API_TOKEN)
b.close()
c.close()
except pycurl.E_HTTP_POST_ERROR:
logging.error(str(pycurl.E_HTTP_POST_ERROR))
except Exception as e:
logging.error('please check your password or username')
logging.error(e.message) #3 For ERROR level
pass
def getToken(self):
user_auth = '{"username": "%s","password": "%s"}' % (self.USERNAME, self.PASSWORD)
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "http://api.zoomeye.org/user/login")
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.CUSTOMREQUEST, "POST")
c.setopt(pycurl.POSTFIELDS, user_auth)
c.perform()
ReturnData = json.loads(b.getvalue())
self.API_TOKEN = ReturnData['access_token']
b.close()
c.close()
def to_pycurl_object(c, req):
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.WRITEFUNCTION, req.body_callback)
c.setopt(pycurl.HEADERFUNCTION, req.header_callback)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.SSL_VERIFYPEER, False)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.URL,req.completeUrl)
if req.getConnTimeout():
c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout())
if req.getTotalTimeout():
c.setopt(pycurl.TIMEOUT, req.getTotalTimeout())
authMethod, userpass = req.getAuth()
if authMethod or userpass:
if authMethod == "basic":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif authMethod == "ntlm":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM)
elif authMethod == "digest":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
c.setopt(pycurl.USERPWD, userpass)
c.setopt(pycurl.HTTPHEADER, req.getHeaders())
if req.method == "POST":
c.setopt(pycurl.POSTFIELDS, req.postdata)
if req.method != "GET" and req.method != "POST":
c.setopt(pycurl.CUSTOMREQUEST, req.method)
if req.method == "HEAD":
c.setopt(pycurl.NOBODY, True)
if req.followLocation:
c.setopt(pycurl.FOLLOWLOCATION, 1)
proxy = req.getProxy()
if proxy != None:
c.setopt(pycurl.PROXY, proxy)
if req.proxytype=="SOCKS5":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5)
elif req.proxytype=="SOCKS4":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4)
req.delHeader("Proxy-Connection")
return c
def request(self, method, url, headers, post_data=None):
s = util.StringIO.StringIO()
rheaders = util.StringIO.StringIO()
curl = pycurl.Curl()
proxy = self._get_proxy(url)
if proxy:
if proxy.hostname:
curl.setopt(pycurl.PROXY, proxy.hostname)
if proxy.port:
curl.setopt(pycurl.PROXYPORT, proxy.port)
if proxy.username or proxy.password:
curl.setopt(
pycurl.PROXYUSERPWD,
"%s:%s" % (proxy.username, proxy.password))
if method == 'get':
curl.setopt(pycurl.HTTPGET, 1)
elif method == 'post':
curl.setopt(pycurl.POST, 1)
curl.setopt(pycurl.POSTFIELDS, post_data)
else:
curl.setopt(pycurl.CUSTOMREQUEST, method.upper())
# pycurl doesn't like unicode URLs
curl.setopt(pycurl.URL, util.utf8(url))
curl.setopt(pycurl.WRITEFUNCTION, s.write)
curl.setopt(pycurl.HEADERFUNCTION, rheaders.write)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 80)
curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v)
for k, v in headers.items()])
if self._verify_ssl_certs:
curl.setopt(pycurl.CAINFO, os.path.join(
os.path.dirname(__file__), 'data/ca-certificates.crt'))
else:
curl.setopt(pycurl.SSL_VERIFYHOST, False)
try:
curl.perform()
except pycurl.error as e:
self._handle_request_error(e)
rbody = s.getvalue()
rcode = curl.getinfo(pycurl.RESPONSE_CODE)
return rbody, rcode, self.parse_headers(rheaders.getvalue())
def to_pycurl_object(c, req):
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.WRITEFUNCTION, req.body_callback)
c.setopt(pycurl.HEADERFUNCTION, req.header_callback)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.SSL_VERIFYPEER, False)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.URL,req.completeUrl)
if req.getConnTimeout():
c.setopt(pycurl.CONNECTTIMEOUT, req.getConnTimeout())
if req.getTotalTimeout():
c.setopt(pycurl.TIMEOUT, req.getTotalTimeout())
authMethod, userpass = req.getAuth()
if authMethod or userpass:
if authMethod == "basic":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif authMethod == "ntlm":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM)
elif authMethod == "digest":
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
c.setopt(pycurl.USERPWD, userpass)
c.setopt(pycurl.HTTPHEADER, req.getHeaders())
if req.method == "POST":
c.setopt(pycurl.POSTFIELDS, req.postdata)
if req.method != "GET" and req.method != "POST":
c.setopt(pycurl.CUSTOMREQUEST, req.method)
if req.method == "HEAD":
c.setopt(pycurl.NOBODY, True)
if req.followLocation:
c.setopt(pycurl.FOLLOWLOCATION, 1)
proxy = req.getProxy()
if proxy != None:
c.setopt(pycurl.PROXY, proxy)
if req.proxytype=="SOCKS5":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS5)
elif req.proxytype=="SOCKS4":
c.setopt(pycurl.PROXYTYPE,pycurl.PROXYTYPE_SOCKS4)
req.delHeader("Proxy-Connection")
return c
def load(self, url, get={}, post={}, referer=True, cookies=True,
just_header=False, multipart=False, decode=False):
"""
Load and returns a given page.
"""
self.set_request_context(url, get, post, referer, cookies, multipart)
# TODO: use http/rfc message instead
self.header = ""
if "header" in self.options:
# TODO
# print("custom header not implemented")
self.setopt(pycurl.HTTPHEADER, self.options['header'])
if just_header:
self.setopt(pycurl.FOLLOWLOCATION, 0)
self.setopt(pycurl.NOBODY, 1) # TODO: nobody= no post?
# overwrite HEAD request, we want a common request type
if post:
self.setopt(pycurl.CUSTOMREQUEST, 'POST')
else:
self.setopt(pycurl.CUSTOMREQUEST, 'GET')
try:
self.c.perform()
rep = self.header
finally:
self.setopt(pycurl.FOLLOWLOCATION, 1)
self.setopt(pycurl.NOBODY, 0)
self.unsetopt(pycurl.CUSTOMREQUEST)
else:
self.c.perform()
rep = self.get_response()
self.setopt(pycurl.POSTFIELDS, '')
self.last_url = safequote(url)
self.last_effective_url = self.c.getinfo(pycurl.EFFECTIVE_URL)
if self.last_effective_url:
self.last_url = self.last_effective_url
self.code = self.verify_header()
if cookies:
self.parse_cookies()
if decode:
rep = self.decode_response(rep)
return rep