def request(self, endpoint, post=None):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
ch.setopt(pycurl.VERBOSE, False)
ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
if post is not None:
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, post)
if self.proxy:
ch.setopt(pycurl.PROXY, self.proxyHost)
if self.proxyAuth:
ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print("DATA: " + str(post))
print("RESPONSE: " + body)
return [header, json_decode(body)]
python类WRITEFUNCTION的实例源码
def CurlPOST(url, data, cookie):
c = pycurl.Curl()
b = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
# c.setopt(pycurl.TIMEOUT, 10)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.COOKIEFILE, cookie)
c.setopt(pycurl.COOKIEJAR, cookie)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
html = b.getvalue()
b.close()
c.close()
return html
def test_post(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", post=True, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.POST: True,
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_post_data(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", post=True, data="data", curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.POST: True,
pycurl.POSTFIELDSIZE: 4,
pycurl.READFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_cainfo(self):
curl = CurlStub(b"result")
result = fetch("https://example.com", cainfo="cainfo", curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"https://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.CAINFO: b"cainfo",
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_timeouts(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", connect_timeout=5,
total_timeout=30, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 5,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 30,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_pycurl_insecure(self):
curl = CurlStub(b"result")
result = fetch("http://example.com/get-ca-cert", curl=curl,
insecure=True)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com/get-ca-cert",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.SSL_VERIFYPEER: False,
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def postXmlSSL(self, xml, url, second=30, cert=True, post=True):
"""????"""
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.TIMEOUT, second)
# ????
# ?????cert ? key ??????.pem??
# ?????PEM?????
if cert:
self.curl.setopt(pycurl.SSLKEYTYPE, "PEM")
self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH)
self.curl.setopt(pycurl.SSLCERTTYPE, "PEM")
self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH)
# post????
if post:
self.curl.setopt(pycurl.POST, True)
self.curl.setopt(pycurl.POSTFIELDS, xml)
buff = StringIO()
self.curl.setopt(pycurl.WRITEFUNCTION, buff.write)
self.curl.perform()
return buff.getvalue()
def _get_url(self, url):
if self.API_TOKEN == None:
logging.error('none token') # 3 For ERROR level
return
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, url)
b = StringIO.StringIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.perform()
result = b.getvalue()
logging.debug('result')
except Exception as e:
logging.error(e.message)
logging.error('go error')
pass
return result
def __init__(self, uploader):
self.handle = pycurl.Curl()
self.response_headers = {}
self.output = six.StringIO()
self.status_code = None
self.handle.setopt(pycurl.CAINFO, certifi.where())
self.handle.setopt(pycurl.URL, uploader.url)
self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
self.handle.setopt(pycurl.UPLOAD, 1)
self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
self.file = uploader.get_file_stream()
self.file.seek(uploader.offset)
self.handle.setopt(pycurl.READFUNCTION, self.file.read)
self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)
headers = ["upload-offset: {}".format(uploader.offset),
"Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
self.handle.setopt(pycurl.HTTPHEADER, headers)
def get_page_data(url, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"GET")
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 30)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.perform()
page_data =stream_buffer.getvalue()
stream_buffer.close()
return page_data
def post_page_data(url, data = None, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.POSTFIELDS, data)
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"POST")
# curl.setopt(pycurl.CONNECTTIMEOUT, 30)
# curl.setopt(pycurl.TIMEOUT, 30)
curl.perform()
page_data = stream_buffer.getvalue()
stream_buffer.close()
return page_data
def getlat4city():
avglat = -1
sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']
crl = pyc.Curl()
sio = StringIO()
crl.setopt(pyc.URL, sp_url)
crl.setopt(pyc.HTTPHEADER, sp_http_headers)
crl.setopt(pyc.WRITEFUNCTION, sio.write)
crl.perform()
crl.close()
lat_http_result = sio.getvalue() #process http result only if html
if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
fstring="ping-avg'>"
lstring="</div>"
start = lat_http_result.index(fstring) + len(fstring)
end = lat_http_result.index(lstring,start)
avglat = lat_http_result[start:end]
return float(avglat)
def curl(url, debug=False, **kwargs):
while 1:
try:
s = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.REFERER, url)
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.TIMEOUT, 60)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
c.setopt(pycurl.NOSIGNAL, True)
c.setopt(pycurl.WRITEFUNCTION, s.write)
for k, v in kwargs.iteritems():
c.setopt(vars(pycurl)[k], v)
c.perform()
c.close()
return s.getvalue()
except:
if debug:
raise
continue
def __init__(self, *args, **kwargs):
self.c = pycurl.Curl()
Request.__init__(self, *args, **kwargs)
self.rep = io.StringIO()
self.last_url = None
self.last_effective_url = None
self.header = ""
# cookiejar defines the context
self.cj = self.context
self.setopt(pycurl.WRITEFUNCTION, self.write)
self.setopt(pycurl.HEADERFUNCTION, self.write_header)
# TODO: Rename to curl
def scan(self, time):
scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0]))
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, str(scan_name + '.txt'))
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
for i in range(5):
#print 'scan %d' % i
self.crl.perform()
if response.getvalue():
break
scan = response.getvalue().splitlines()
# how to get charge for an mzURL scan?
return mzScan([tuple(float(v) for v in s.split()) for s in scan],
scan_time, mode=scan_mode, mz=mz)
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
start_mz, stop_mz)))
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, xic_url)
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
for i in range(5):
#print 'xic %d' % i
self.crl.perform()
if response.getvalue():
break
scan = response.getvalue().splitlines()
return [tuple(float(v) for v in s.split()) for s in scan]
def test_gzip(url):
t = Test()
c = pycurl.Curl()
c.setopt(pycurl.WRITEFUNCTION,t.callback)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.URL,url)
c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
c.perform()
TOTAL_TIME = c.getinfo(c.TOTAL_TIME)
#print "????????%.2f ms" %(TOTAL_TIME*1000)
return TOTAL_TIME * 1000
def searchIP(self, query, pages, queue, STOP_ME):
if self.API_TOKEN == None:
print "please config your API_TOKEN"
sys.exit()
for page in range(1, pages+1):
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page))
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.perform()
hosts = json.loads(b.getvalue())
for host in hosts['matches']:
queue.put(host["ip"])
STOP_ME[0] = True
def get (url, user_agent=UA, referrer=None):
"""Make a GET request of the url using pycurl and return the data
(which is None if unsuccessful)"""
data = None
databuffer = StringIO()
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.CONNECTTIMEOUT, 5)
curl.setopt(pycurl.TIMEOUT, 8)
curl.setopt(pycurl.WRITEFUNCTION, databuffer.write)
curl.setopt(pycurl.COOKIEFILE, '')
if user_agent:
curl.setopt(pycurl.USERAGENT, user_agent)
if referrer is not None:
curl.setopt(pycurl.REFERER, referrer)
try:
curl.perform()
data = databuffer.getvalue()
except Exception:
pass
curl.close()
return data
def request(self, endpoint, headers=None, post=None, first=True):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
if headers:
ch.setopt(pycurl.HTTPHEADER, headers)
ch.setopt(pycurl.VERBOSE, self.debug)
ch.setopt(pycurl.SSL_VERIFYPEER, False)
ch.setopt(pycurl.SSL_VERIFYHOST, False)
ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')
if post:
import urllib
ch.setopt(pycurl.POST, len(post))
ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
import urllib
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
print("RESPONSE: " + body + "\n")
return [header, json_decode(body)]
def CurlGET(url, cookie):
c = pycurl.Curl()
b = StringIO.StringIO()
c.setopt(pycurl.URL, url)
# c.setopt(pycurl.TIMEOUT, 10)
# c.setopt(pycurl.POST, 1)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.COOKIEFILE, cookie)
c.setopt(pycurl.COOKIEJAR, cookie)
c.perform()
html = b.getvalue()
b.close()
c.close()
return html
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
def __search_video_id(self, query):
params = {
"key": self.key,
"part": "id,snippet",
"order": "relevance",
"type": "video",
"videoSyndicated": "true",
"maxResults": 3,
"q": query
}
buf = BytesIO()
client = pycurl.Curl()
client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/search?" +
urlencode(params))
client.setopt(pycurl.WRITEFUNCTION, buf.write)
client.perform()
client.close()
body = json.loads(buf.getvalue().decode("utf-8"))
buf.close()
if "error" in body:
raise Exception("query error: {0}".format(body))
if len(body["items"]) == 0:
raise Exception("result not found")
video_id = body["items"][0]["id"]["videoId"]
return video_id
def __get_video_duration(self, video_id):
params = {
"key": self.key,
"part": "contentDetails",
"id": video_id
}
buf = BytesIO()
client = pycurl.Curl()
client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/videos?" +
urlencode(params))
client.setopt(pycurl.WRITEFUNCTION, buf.write)
client.perform()
client.close()
body = json.loads(buf.getvalue().decode("utf-8"))
buf.close()
if "error" in body:
raise Exception("query error: {0}".format(body))
if len(body["items"]) == 0:
raise Exception("result not found")
duration = body["items"][0]["contentDetails"]["duration"]
duration_seconds = isodate.parse_duration(duration).seconds
return duration_seconds
def __search(self, keyword, search_type):
offset = 0
result_key = search_type + "s"
result = []
while True:
self.logger.info("search " + keyword + ", offset = " + str(offset))
params = {
"q": keyword,
"type": search_type,
"offset": offset,
"limit": 50
}
buf = BytesIO()
client = pycurl.Curl()
client.setopt(pycurl.URL, self.service_host +
"/v1/search" + "?" + urlencode(params))
client.setopt(pycurl.WRITEFUNCTION, buf.write)
client.perform()
client.close()
body = json.loads(buf.getvalue().decode("utf-8"))
buf.close()
if body[result_key]["total"] > 0:
for data in body[result_key]["items"]:
result.append(data)
if body[result_key]["total"] > \
body[result_key]["limit"] * (offset + 1):
offset = offset + 1
else:
break
self.logger.info("Done")
return result
def get_tracks_by_album_id(self, album_id):
self.logger.info("Get tracks of the album(" +
album_id + ") from Spotify.")
if not album_id:
return None
offset = 0
tracks = []
while True:
params = {
"offset": offset,
"limit": 50
}
buf = BytesIO()
client = pycurl.Curl()
client.setopt(pycurl.URL, self.service_host + "/v1/albums/" +
album_id + "/tracks" + "?" + urlencode(params))
client.setopt(pycurl.WRITEFUNCTION, buf.write)
client.perform()
client.close()
body = json.loads(buf.getvalue().decode("utf-8"))
buf.close()
if body["total"] > 0:
for data in body["items"]:
tracks.append(
Track(data["name"], None, data["track_number"]))
if body["total"] > body["limit"] * (offset + 1):
offset = offset + 1
else:
break
return tracks
def process(self, prio, obj):
self.pause.wait()
c = obj.to_http_object(self.freelist.get())
if self._proxies: c = self._set_proxy(c, obj)
c.response_queue = ((StringIO(), StringIO(), obj))
c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write)
c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write)
with self.mutex_multi:
self.m.add_handle(c)
def head(self):
conn=pycurl.Curl()
conn.setopt(pycurl.SSL_VERIFYPEER,False)
conn.setopt(pycurl.SSL_VERIFYHOST,0)
conn.setopt(pycurl.URL,self.completeUrl)
conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD
conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
conn.perform()
rp=Response()
rp.parseResponse(self.__performHead)
self.response=rp
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()