def put (url, data, headers={}):
"""Make a PUT request to the url, using data in the message body,
with the additional headers, if any"""
reply = -1 # default, non-http response
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
if len(headers) > 0:
curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()])
curl.setopt(pycurl.PUT, 1)
curl.setopt(pycurl.INFILESIZE, len(data))
databuffer = StringIO(data)
curl.setopt(pycurl.READFUNCTION, databuffer.read)
try:
curl.perform()
reply = curl.getinfo(pycurl.HTTP_CODE)
except Exception:
pass
curl.close()
return reply
python类HTTPHEADER的实例源码
def CurlPOST(url, data, cookie):
c = pycurl.Curl()
b = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
# c.setopt(pycurl.TIMEOUT, 10)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.COOKIEFILE, cookie)
c.setopt(pycurl.COOKIEJAR, cookie)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
html = b.getvalue()
b.close()
c.close()
return html
def test_headers(self):
curl = CurlStub(b"result")
result = fetch("http://example.com",
headers={"a": "1", "b": "2"}, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.HTTPHEADER: ["a: 1", "b: 2"],
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def create_virtual_endpoints(self, data, category):
'''
Description:
create a virtual endpoints
category:
SENSOR
METER
GAUGE
ONOFF
LEVEL_CONTROL
'''
self.data = data
self.category = category
uri = "virtualEndpoints/?category=" + self.category
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def create_rooms(self, data):
'''
Description:
create a room
'''
self.data = data
uri = "rooms/"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def put_attributes_config(self, data, uuid):
'''
Description:
modify an attribute
'''
self.data = data
self.uuid = uuid
uri = "attributes/" + self.uuid + "/config"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def put_attributes(self, data, uuid):
'''
Description:
set attribute value with application/json content
'''
self.data = data
self.uuid = uuid
uri = "attributes/" + self.uuid + "/value"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.CUSTOMREQUEST, "PUT")
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def initHandle(self):
""" sets common options to curl handle """
self.c.setopt(pycurl.FOLLOWLOCATION, 1)
self.c.setopt(pycurl.MAXREDIRS, 5)
self.c.setopt(pycurl.CONNECTTIMEOUT, 30)
self.c.setopt(pycurl.NOSIGNAL, 1)
self.c.setopt(pycurl.NOPROGRESS, 1)
if hasattr(pycurl, "AUTOREFERER"):
self.c.setopt(pycurl.AUTOREFERER, 1)
self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
self.c.setopt(pycurl.LOW_SPEED_TIME, 30)
self.c.setopt(pycurl.LOW_SPEED_LIMIT, 5)
#self.c.setopt(pycurl.VERBOSE, 1)
self.c.setopt(pycurl.USERAGENT,
"Mozilla/5.0 (Windows NT 6.1; Win64; x64;en; rv:5.0) Gecko/20110619 Firefox/5.0")
if pycurl.version_info()[7]:
self.c.setopt(pycurl.ENCODING, "gzip, deflate")
self.c.setopt(pycurl.HTTPHEADER, ["Accept: */*",
"Accept-Language: en-US,en",
"Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7",
"Connection: keep-alive",
"Keep-Alive: 300",
"Expect:"])
def handle_free(self, pyfile):
self.data = self.load(
"http://turbobit.net/download/free/%s" %
self.info['pattern']['ID'])
rtUpdate = self.get_rt_update()
self.solve_captcha()
self.req.http.c.setopt(
pycurl.HTTPHEADER,
["X-Requested-With: XMLHttpRequest"])
self.data = self.load(self.get_download_url(rtUpdate))
self.req.http.c.setopt(pycurl.HTTPHEADER, ["X-Requested-With:"])
m = re.search(self.LINK_FREE_PATTERN, self.data)
if m is not None:
self.link = m.group(1)
def api_request(self, method, data=None, get={}, post={}):
session = self.account.info['data'].get('session', None)
if session:
ts = str(int(time.time() - float(session['tsd'])))
sha1 = Crypto.Hash.SHA.new()
sha1.update(ts + method + session['key'])
sign = sha1.hexdigest()
self.req.http.c.setopt(pycurl.HTTPHEADER, ["X-DL-TOKEN: " + session['token'],
"X-DL-SIGN: " + sign,
"X-DL-TS: " + ts])
json_data = self.load(self.API_URL + method, get=get, post=post)
return json.loads(json_data)
def api_request(self, method, data=None, get={}, post={}):
session = self.info['data'].get('session', None)
if session:
ts = str(int(time.time() - float(session['tsd'])))
sha1 = Crypto.Hash.SHA.new()
sha1.update(ts + method + session['key'])
sign = sha1.hexdigest()
self.req.http.c.setopt(pycurl.HTTPHEADER, ["X-DL-TOKEN: " + session['token'],
"X-DL-SIGN: " + sign,
"X-DL-TS: " + ts])
json_data = self.load(self.API_URL + method, get=get, post=post)
return json.loads(json_data)
def api_response(self, url, post_data):
self.req.http.c.setopt(pycurl.HTTPHEADER, ["Accept: application/json, text/plain, */*",
"Content-Type: application/json"])
try:
res = json.loads(self.load(url, post=json.dumps(post_data)))
except (BadHeader, ValueError), e:
self.log_error(e.message)
self.fail(e.message)
# Headers back to normal
self.req.http.c.setopt(pycurl.HTTPHEADER, ["Accept: */*",
"Accept-Language: en-US,en",
"Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7",
"Connection: keep-alive",
"Keep-Alive: 300",
"Expect:"])
return res
def _get_url(self, url):
if self.API_TOKEN == None:
logging.error('none token') # 3 For ERROR level
return
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, url)
b = StringIO.StringIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.perform()
result = b.getvalue()
logging.debug('result')
except Exception as e:
logging.error(e.message)
logging.error('go error')
pass
return result
def __init__(self, uploader):
self.handle = pycurl.Curl()
self.response_headers = {}
self.output = six.StringIO()
self.status_code = None
self.handle.setopt(pycurl.CAINFO, certifi.where())
self.handle.setopt(pycurl.URL, uploader.url)
self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
self.handle.setopt(pycurl.UPLOAD, 1)
self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
self.file = uploader.get_file_stream()
self.file.seek(uploader.offset)
self.handle.setopt(pycurl.READFUNCTION, self.file.read)
self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)
headers = ["upload-offset: {}".format(uploader.offset),
"Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
self.handle.setopt(pycurl.HTTPHEADER, headers)
def transfer(ipaddr, username, password, commandfile):
#transfers commandfile to camera
storage = StringIO()
c = pycurl.Curl()
c.setopt(c.URL, 'http://' + ipaddr + '/admin/remoteconfig')
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 5)
c.setopt(c.TIMEOUT, TIMEOUT)
filesize = os.path.getsize(commandfile)
f = open(commandfile, 'rb')
c.setopt(c.FAILONERROR, True)
c.setopt(pycurl.POSTFIELDSIZE, filesize)
c.setopt(pycurl.READFUNCTION, FileReader(f).read_callback)
c.setopt(c.WRITEFUNCTION, storage.write)
c.setopt(pycurl.HTTPHEADER, ["application/x-www-form-urlencoded"])
c.setopt(c.VERBOSE, VERBOSE)
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
c.setopt(pycurl.USERPWD, username + ':' + password)
try:
c.perform()
except pycurl.error, error:
errno, errstr = error
print 'An error occurred: ', errstr
return False, ''
c.close()
content = storage.getvalue()
f.close()
return True, content
# ***************************************************************
# *** Main program ***
# ***************************************************************
def get_page_data(url, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"GET")
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 30)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.perform()
page_data =stream_buffer.getvalue()
stream_buffer.close()
return page_data
def post_page_data(url, data = None, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.POSTFIELDS, data)
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"POST")
# curl.setopt(pycurl.CONNECTTIMEOUT, 30)
# curl.setopt(pycurl.TIMEOUT, 30)
curl.perform()
page_data = stream_buffer.getvalue()
stream_buffer.close()
return page_data
def setup_curl_for_post(c, p, data_buf, headers=None, share=None):
setup_curl_basic(c, p, data_buf, headers, share)
httpheader = p.get('httpheader', ['Accept: application/json', "Content-type: application/json"])
if httpheader:
# c.setopt(pycurl.HEADER, p.get('header', 1))
c.setopt(pycurl.HTTPHEADER, httpheader)
post301 = getattr(pycurl, 'POST301', None)
if post301 is not None:
# Added in libcurl 7.17.1.
c.setopt(post301, True)
c.setopt(pycurl.POST, 1)
postfields = p.get('postfields')
if postfields:
postfields = json.dumps(postfields, indent=2, ensure_ascii=False)
c.setopt(pycurl.POSTFIELDS, postfields)
return c
def getlat4city():
avglat = -1
sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']
crl = pyc.Curl()
sio = StringIO()
crl.setopt(pyc.URL, sp_url)
crl.setopt(pyc.HTTPHEADER, sp_http_headers)
crl.setopt(pyc.WRITEFUNCTION, sio.write)
crl.perform()
crl.close()
lat_http_result = sio.getvalue() #process http result only if html
if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
fstring="ping-avg'>"
lstring="</div>"
start = lat_http_result.index(fstring) + len(fstring)
end = lat_http_result.index(lstring,start)
avglat = lat_http_result[start:end]
return float(avglat)
def Curl(url,headers):
while 1:
try:
c = pycurl.Curl()
c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/')
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.MAXREDIRS,5)
c.setopt(pycurl.CONNECTTIMEOUT, 60)
c.setopt(pycurl.TIMEOUT,120)
c.setopt(pycurl.ENCODING, 'gzip,deflate')
c.fp = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER,headers)
c.setopt(c.WRITEFUNCTION, c.fp.write)
c.perform()
html = c.fp.getvalue()
if '??????' in html:
print u'??????,??10??'
time.sleep(600)
else:
return html
except Exception, e:
print url,'curl(url)',e
continue
#????????
def getHtml(url,headers):
c = pycurl.Curl() #??curl????????
c.setopt(pycurl.URL, url) #??????URL
c.setopt(pycurl.FOLLOWLOCATION, True) #????????
c.setopt(pycurl.MAXREDIRS,5) #?????????
c.setopt(pycurl.CONNECTTIMEOUT, 60) #??????
c.setopt(pycurl.TIMEOUT,120) #????
c.setopt(pycurl.ENCODING, 'gzip,deflate') #??gzip???????????????????gzip?????????gzip??????
c.fp = StringIO.StringIO() #??StringIO??
c.setopt(pycurl.HTTPHEADER,headers) #?????
c.setopt(pycurl.POST, 1) #??get
c.setopt(pycurl.POSTFIELDS, data) #??POST??
c.setopt(c.WRITEFUNCTION, c.fp.write) #???????
c.perform() #??
html = c.fp.getvalue() #?????
return html
def handle_free(self, pyfile):
self.data = self.load(
"http://turbobit.net/download/free/%s" %
self.info['pattern']['ID'])
rtUpdate = self.get_rt_update()
self.solve_captcha()
self.req.http.c.setopt(
pycurl.HTTPHEADER,
["X-Requested-With: XMLHttpRequest"])
self.data = self.load(self.get_download_url(rtUpdate))
self.req.http.c.setopt(pycurl.HTTPHEADER, ["X-Requested-With:"])
m = re.search(self.LINK_FREE_PATTERN, self.data)
if m is not None:
self.link = m.group(1)
def api_request(self, method, data=None, get={}, post={}):
session = self.account.info['data'].get('session', None)
if session:
ts = str(int(time.time() - float(session['tsd'])))
sha1 = Crypto.Hash.SHA.new()
sha1.update(ts + method + session['key'])
sign = sha1.hexdigest()
self.req.http.c.setopt(pycurl.HTTPHEADER, ["X-DL-TOKEN: " + session['token'],
"X-DL-SIGN: " + sign,
"X-DL-TS: " + ts])
json_data = self.load(self.API_URL + method, get=get, post=post)
return json.loads(json_data)
def api_response(self, url, post_data):
self.req.http.c.setopt(pycurl.HTTPHEADER, ["Accept: application/json, text/plain, */*",
"Content-Type: application/json"])
try:
res = json.loads(self.load(url, post=json.dumps(post_data)))
except (BadHeader, ValueError), e:
self.log_error(e.message)
self.fail(e.message)
# Headers back to normal
self.req.http.c.setopt(pycurl.HTTPHEADER, ["Accept: */*",
"Accept-Language: en-US,en",
"Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7",
"Connection: keep-alive",
"Keep-Alive: 300",
"Expect:"])
return res
def searchIP(self, query, pages, queue, STOP_ME):
if self.API_TOKEN == None:
print "please config your API_TOKEN"
sys.exit()
for page in range(1, pages+1):
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page))
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.perform()
hosts = json.loads(b.getvalue())
for host in hosts['matches']:
queue.put(host["ip"])
STOP_ME[0] = True
def fetchURL(self, url, headers=[]):
sc = safeurl.SafeURL()
if headers:
newHeaders = []
try:
for header in json_decode(headers):
try:
header = header.split(":")
# pop the first item off in case there are multiple :
key = header.pop(0).strip()
val = ":".join(header).strip()
newHeaders.append("{0}: {1}".format(key, val))
# ignore if invalid format (lacks :)
except:
continue
# ignore if not json
except:
print("ignoring custom headers")
if newHeaders:
sc._handle.setopt(pycurl.HTTPHEADER, newHeaders)
res = sc.execute(url)
return res
def sendAsset(pubKey, privKey, recipient, assetId, amount, txfee):
timestamp = int(time.time() * 1000)
sData = '\4' + base58.b58decode(pubKey) + '\1' + base58.b58decode(assetId) + '\0' + struct.pack(">Q", timestamp) + struct.pack(">Q", amount) + struct.pack(">Q", txfee) + base58.b58decode(recipient) + '\0\0'
random64 = os.urandom(64)
signature = base58.b58encode(curve.calculateSignature(random64, base58.b58decode(privKey), sData))
data = json.dumps({
"assetId": assetId,
"senderPublicKey": pubKey,
"recipient": recipient,
"amount": amount,
"fee": txfee,
"timestamp": timestamp,
"attachment": "",
"signature": signature
})
c = pycurl.Curl()
c.setopt(pycurl.URL, "http://%s:%s/assets/broadcast/transfer" % (NODE_IP, NODE_PORT))
c.setopt(pycurl.HTTPHEADER, ['Content-Type: application/json', 'Accept: application/json'])
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
c.close()
def request(self, endpoint, headers=None, post=None, first=True):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
if headers:
ch.setopt(pycurl.HTTPHEADER, headers)
ch.setopt(pycurl.VERBOSE, self.debug)
ch.setopt(pycurl.SSL_VERIFYPEER, False)
ch.setopt(pycurl.SSL_VERIFYHOST, False)
ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')
if post:
import urllib
ch.setopt(pycurl.POST, len(post))
ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
import urllib
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
print("RESPONSE: " + body + "\n")
return [header, json_decode(body)]
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()