def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
python类WRITEFUNCTION的实例源码
def perform(self):
if self.error:
raise self.error
if self.performed:
raise AssertionError("Can't perform twice")
self.options[pycurl.WRITEFUNCTION](self.result)
self.performed = True
def test_basic(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_create_curl(self):
curls = []
def pycurl_Curl():
curl = CurlStub(b"result")
curls.append(curl)
return curl
Curl = pycurl.Curl
try:
pycurl.Curl = pycurl_Curl
result = fetch("http://example.com")
curl = curls[0]
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
finally:
pycurl.Curl = Curl
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
def __init__(self, cookies=None, options=None):
self.c = pycurl.Curl()
self.rep = StringIO()
self.cj = cookies #cookiejar
self.lastURL = None
self.lastEffectiveURL = None
self.abort = False
self.code = 0 # last http code
self.header = ""
self.headers = [] #temporary request header
self.initHandle()
self.setInterface(options)
self.c.setopt(pycurl.WRITEFUNCTION, self.write)
self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader)
self.log = getLogger("log")
def cleanup(self):
self.m_data = io.BytesIO()
self.m_headers = {}
self.m_response = TCPResponse()
self.m_handle.setopt(pycurl.WRITEFUNCTION, self.m_data.write)
def handle_request(self):
curl_handle = pycurl.Curl()
# set default options.
curl_handle.setopt(pycurl.URL, self.request_url)
curl_handle.setopt(pycurl.REFERER, self.request_url)
curl_handle.setopt(pycurl.USERAGENT, self.useragent)
curl_handle.setopt(pycurl.TIMEOUT, self.curlopts['TIMEOUT'])
curl_handle.setopt(pycurl.CONNECTTIMEOUT, self.curlopts['CONNECTTIMEOUT'])
curl_handle.setopt(pycurl.HEADER, True)
#curl_handle.setopt(pycurl.VERBOSE, 1)
curl_handle.setopt(pycurl.FOLLOWLOCATION, 1)
curl_handle.setopt(pycurl.MAXREDIRS, 5)
if(self.request_headers and len(self.request_headers) > 0):
tmplist = list()
for(key, value) in self.request_headers.items():
tmplist.append(key + ':' + value)
curl_handle.setopt(pycurl.HTTPHEADER, tmplist)
#??????POST
curl_handle.setopt(pycurl.HTTPPROXYTUNNEL, 1)
curl_handle.setopt(pycurl.POSTFIELDS, self.request_body)
response = StringIO.StringIO()
curl_handle.setopt(pycurl.WRITEFUNCTION, response.write)
try:
curl_handle.perform()
except pycurl.error as error:
raise ChannelException(error, 5)
self.response_code = curl_handle.getinfo(curl_handle.HTTP_CODE)
header_size = curl_handle.getinfo(curl_handle.HEADER_SIZE)
resp_str = response.getvalue()
self.response_headers = resp_str[0 : header_size]
self.response_body = resp_str[header_size : ]
response.close()
curl_handle.close()
def _login(self):
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, self.url)
b = StringIO.StringIO()
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CUSTOMREQUEST, "POST")
c.setopt(pycurl.POSTFIELDS, self.post_data)
c.perform()
if b.getvalue():
logging.info('success login') # For INFO level
self.API_TOKEN = json.loads(b.getvalue())["access_token"]
self.save_token()
else:
logging.warning('success fail,get null result') #2 For WARNING level
logging.debug(self.API_TOKEN)
b.close()
c.close()
except pycurl.E_HTTP_POST_ERROR:
logging.error(str(pycurl.E_HTTP_POST_ERROR))
except Exception as e:
logging.error('please check your password or username')
logging.error(e.message) #3 For ERROR level
pass
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
def getPage (self, url, requestHeader = []) :
resultFormate = StringIO.StringIO()
fakeIp = self.fakeIp()
requestHeader.append('CLIENT-IP:' + fakeIp)
requestHeader.append('X-FORWARDED-FOR:' + fakeIp)
try:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url.strip())
curl.setopt(pycurl.ENCODING, 'gzip,deflate')
curl.setopt(pycurl.HEADER, 1)
curl.setopt(pycurl.TIMEOUT, 120)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.setopt(pycurl.HTTPHEADER, requestHeader)
curl.setopt(pycurl.WRITEFUNCTION, resultFormate.write)
curl.perform()
headerSize = curl.getinfo(pycurl.HEADER_SIZE)
curl.close()
header = resultFormate.getvalue()[0 : headerSize].split('\r\n')
body = resultFormate.getvalue()[headerSize : ]
except Exception, e:
header = ''
body = ''
return header, body
def process(self, prio, obj):
self.pause.wait()
c = obj.to_http_object(self.freelist.get())
if self._proxies: c = self._set_proxy(c, obj)
c.response_queue = ((StringIO(), StringIO(), obj))
c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write)
c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write)
with self.mutex_multi:
self.m.add_handle(c)
def head(self):
conn=pycurl.Curl()
conn.setopt(pycurl.SSL_VERIFYPEER,False)
conn.setopt(pycurl.SSL_VERIFYHOST,0)
conn.setopt(pycurl.URL,self.completeUrl)
conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD
conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
conn.perform()
rp=Response()
rp.parseResponse(self.__performHead)
self.response=rp
def setUrl(self, url):
self.b = StringIO.StringIO()
self.c.setopt(pycurl.WRITEFUNCTION, self.b.write)
self.c.setopt(pycurl.URL, url)
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
def curlRequest(self, url, headers = False, post = False, returnHeaders=True):
ch = pycurl.Curl()
ch.setopt(pycurl.URL, url)
hdrs = [
"Host: poloniex.com",
"Connection: close",
"User-Agent: Mozilla/5.0 (CLI; Linux x86_64) polproxy",
"accept: application/json"
]
if post != False:
ch.setopt(pycurl.POSTFIELDS, post)
hdrs = hdrs + ["content-type: application/x-www-form-urlencoded", "content-length: " + str(len(post))]
if headers != False:
hdrs = hdrs + headers
ch.setopt(pycurl.HTTPHEADER, hdrs)
ch.setopt(pycurl.SSL_VERIFYHOST, 0)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.CONNECTTIMEOUT, 5)
ch.setopt(pycurl.TIMEOUT, 5)
ret = BytesIO()
if returnHeaders:
ch.setopt(pycurl.HEADERFUNCTION, ret.write)
ch.setopt(pycurl.WRITEFUNCTION, ret.write)
try:
ch.perform()
except:
return ""
ch.close()
return ret.getvalue().decode("ISO-8859-1")
def getpage(url):
c=pycurl.Curl()
print url
c.setopt(pycurl.URL,url)
b=StringIO.StringIO()
print '1'
c.setopt(pycurl.WRITEFUNCTION,b.write)
print '2'
c.perform()
print '-----'
# f=open('%s/posts.txt' % setting.CURRENT_PATH,'wb')
# f.write(b.getvalue())
# f.close()
return b.getvalue()