def __init__(self, uploader):
self.handle = pycurl.Curl()
self.response_headers = {}
self.output = six.StringIO()
self.status_code = None
self.handle.setopt(pycurl.CAINFO, certifi.where())
self.handle.setopt(pycurl.URL, uploader.url)
self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
self.handle.setopt(pycurl.UPLOAD, 1)
self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
self.file = uploader.get_file_stream()
self.file.seek(uploader.offset)
self.handle.setopt(pycurl.READFUNCTION, self.file.read)
self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)
headers = ["upload-offset: {}".format(uploader.offset),
"Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
self.handle.setopt(pycurl.HTTPHEADER, headers)
python类Curl()的实例源码
def transfer(ipaddr, username, password, commandfile):
#transfers commandfile to camera
storage = StringIO()
c = pycurl.Curl()
c.setopt(c.URL, 'http://' + ipaddr + '/admin/remoteconfig')
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 5)
c.setopt(c.TIMEOUT, TIMEOUT)
filesize = os.path.getsize(commandfile)
f = open(commandfile, 'rb')
c.setopt(c.FAILONERROR, True)
c.setopt(pycurl.POSTFIELDSIZE, filesize)
c.setopt(pycurl.READFUNCTION, FileReader(f).read_callback)
c.setopt(c.WRITEFUNCTION, storage.write)
c.setopt(pycurl.HTTPHEADER, ["application/x-www-form-urlencoded"])
c.setopt(c.VERBOSE, VERBOSE)
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
c.setopt(pycurl.USERPWD, username + ':' + password)
try:
c.perform()
except pycurl.error, error:
errno, errstr = error
print 'An error occurred: ', errstr
return False, ''
c.close()
content = storage.getvalue()
f.close()
return True, content
# ***************************************************************
# *** Main program ***
# ***************************************************************
def get_page_data(url, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"GET")
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 30)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.perform()
page_data =stream_buffer.getvalue()
stream_buffer.close()
return page_data
def post_page_data(url, data = None, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.POSTFIELDS, data)
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"POST")
# curl.setopt(pycurl.CONNECTTIMEOUT, 30)
# curl.setopt(pycurl.TIMEOUT, 30)
curl.perform()
page_data = stream_buffer.getvalue()
stream_buffer.close()
return page_data
def getlat4city():
avglat = -1
sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']
crl = pyc.Curl()
sio = StringIO()
crl.setopt(pyc.URL, sp_url)
crl.setopt(pyc.HTTPHEADER, sp_http_headers)
crl.setopt(pyc.WRITEFUNCTION, sio.write)
crl.perform()
crl.close()
lat_http_result = sio.getvalue() #process http result only if html
if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
fstring="ping-avg'>"
lstring="</div>"
start = lat_http_result.index(fstring) + len(fstring)
end = lat_http_result.index(lstring,start)
avglat = lat_http_result[start:end]
return float(avglat)
def getWant(line):#??????
keyword = keyword_list[line]
url = url_list[line]
try:
c = pycurl.Curl()
c.setopt(c.URL,url)
c.setopt(c.CONNECTTIMEOUT, 60)
c.setopt(c.TIMEOUT,120)
b = StringIO.StringIO()
c.setopt(c.WRITEFUNCTION,b.write)
c.perform()
html = b.getvalue()
mutex.acquire()
global match
global all
global percentage
if(getIfmatch(html)):
match += 1
else:
pass
all += 1
print 'all: '+str(all)+' match: '+str(match)+', percentage '+'%.1f'%((float(match)/all)*100)+'%'
mutex.release()
except:
print '%s Empty reply from server' %keyword
def Curl(url,headers):
while 1:
try:
c = pycurl.Curl()
c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/')
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.MAXREDIRS,5)
c.setopt(pycurl.CONNECTTIMEOUT, 60)
c.setopt(pycurl.TIMEOUT,120)
c.setopt(pycurl.ENCODING, 'gzip,deflate')
c.fp = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER,headers)
c.setopt(c.WRITEFUNCTION, c.fp.write)
c.perform()
html = c.fp.getvalue()
if '??????' in html:
print u'??????,??10??'
time.sleep(600)
else:
return html
except Exception, e:
print url,'curl(url)',e
continue
#????????
def getArticleInfo(url):
headers = [
"User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36",
"Cookie:RK=EPnrvyKs33; pgv_pvi=9085208576; pac_uid=1_2273358437; noticeLoginFlag=1; ts_uid=864776808; ptcz=5eb5cdef0bba881b5fdad31c18353d40fd0242f5a873f155197343eaec8b3730; pt2gguin=o2273358437; o_cookie=2273358437; tvfe_boss_uuid=1a889f93cefc98b9; pgv_pvid=2616135824",
]
html = Curl(url,headers)
title = search('<title>(.*?)</title>',html).decode('utf-8','ignore')
datetime = search('<em id="post-date" class="rich_media_meta rich_media_meta_text">(.*?)</em>',html).decode('utf-8','ignore')
num_url= url.replace('http://mp.weixin.qq.com/s','http://mp.weixin.qq.com/mp/getcomment') +'&&uin=&key=&pass_ticket=&wxtoken=&devicetype=&clientversion=0&x5=0'#?????/?????
num_html = Curl(num_url,headers)
dict_weixin = eval(num_html)
read_num = dict_weixin['read_num']
like_num = dict_weixin['like_num']
return title,read_num,like_num,datetime
#?????
def curl(url, debug=False, **kwargs):
while 1:
try:
s = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.REFERER, url)
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.TIMEOUT, 60)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
c.setopt(pycurl.NOSIGNAL, True)
c.setopt(pycurl.WRITEFUNCTION, s.write)
for k, v in kwargs.iteritems():
c.setopt(vars(pycurl)[k], v)
c.perform()
c.close()
return s.getvalue()
except:
if debug:
raise
continue
def curl(source_url, is_post, cookie):
"""
??pycurl?????WEB???????
:source_url: ??URL
:is_post: ???POST
:cookie: cookie
"""
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.ENCODING, 'gzip,deflate')
c.setopt(c.COOKIE, cookie)
c.setopt(c.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:42.0) Gecko/20100101 Firefox/42.0')
try:
if is_post == 2: # post
url, query = source_url.split('?', 1)
c.setopt(c.URL, url)
c.setopt(c.POSTFIELDS, query)
else:
c.setopt(c.URL, source_url)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
return buffer.getvalue()
except:
return ''
def close(self):
"""
Cleanup.
"""
for chunk in self.chunks:
self.close_chunk(chunk)
# Workaround: pycurl segfaults when closing multi, that never had
# any curl handles
if hasattr(self, 'manager'):
with closing(pycurl.Curl()) as c:
self.__manager.add_handle(c)
self.__manager.remove_handle(c)
self.chunks = []
if hasattr(self, 'manager'):
self.__manager.close()
del self.__manager
if hasattr(self, "info"):
del self.info
def __init__(self, *args, **kwargs):
self.c = pycurl.Curl()
Request.__init__(self, *args, **kwargs)
self.rep = io.StringIO()
self.last_url = None
self.last_effective_url = None
self.header = ""
# cookiejar defines the context
self.cj = self.context
self.setopt(pycurl.WRITEFUNCTION, self.write)
self.setopt(pycurl.HEADERFUNCTION, self.write_header)
# TODO: Rename to curl
def _curl_a_link(self, target_url,post_target, commit_date=None):
'''
?????????????????????json??????????curl get??????????????????????????????????elastic???????
'''
buffer = StringIO()
c = Curl()
c.setopt(c.URL,target_url)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
load_target=json.loads(buffer.getvalue())
return load_target
pass
def test_gzip(url):
t = Test()
c = pycurl.Curl()
c.setopt(pycurl.WRITEFUNCTION,t.callback)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.URL,url)
c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
c.perform()
TOTAL_TIME = c.getinfo(c.TOTAL_TIME)
#print "????????%.2f ms" %(TOTAL_TIME*1000)
return TOTAL_TIME * 1000
def curl_read(url):
try:
c = pycurl.Curl()
c.setopt(c.URL, url)
resp = StringIO()
headers = StringIO()
c.setopt(c.WRITEFUNCTION, resp.write)
c.setopt(c.HEADERFUNCTION, headers.write)
c.setopt(pycurl.CONNECTTIMEOUT, 20)
c.setopt(pycurl.TIMEOUT, 20)
c.perform()
if c.getinfo(c.RESPONSE_CODE) == 200:
c.close()
is_hit = handle_response(resp, headers)
size = len(resp)
return True, is_hit, size
return False, False, 0
except:
return False, False, 0
def end_all_async_unsafe(self):
if not Config.RECORDING_ACTIVATED:
return
for rtmp_name in self._recording_rtmps:
curl = pycurl.Curl()
try:
self._set_def_curl_opts(curl)
curl.setopt(pycurl.URL, self._end_url(rtmp_name))
curl.setopt(pycurl.WRITEDATA, self._end_buffer)
curl.perform()
except pycurl.error as e:
console.warning(
'Pycurl error in end_all() for racer <{0}>: Tried to curl <{1}>. Error {2}.'.format(
rtmp_name,
self._end_url(rtmp_name),
e))
finally:
curl.close()
self._recording_rtmps.clear()
def _end_record_nolock(self, rtmp_name):
rtmp_name = rtmp_name.lower()
if rtmp_name not in self._recording_rtmps:
return
curl = pycurl.Curl()
try:
self._set_def_curl_opts(curl)
curl.setopt(pycurl.URL, self._end_url(rtmp_name))
curl.setopt(pycurl.WRITEDATA, self._end_buffer)
curl.perform()
self._recording_rtmps = [r for r in self._recording_rtmps if r != rtmp_name]
except pycurl.error as e:
console.warning(
'Pycurl error in end_record({0}): Tried to curl <{1}>. Error {2}.'.format(
rtmp_name,
self._end_url(rtmp_name),
e))
finally:
curl.close()
def ccurl_setcookie(url):
hdr = "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:45.0) Gecko/20100101 Firefox/45.0"
c = pycurl.Curl()
c.setopt(c.FOLLOWLOCATION, True)
c.setopt(c.USERAGENT, hdr)
c.setopt(c.COOKIEJAR, '/tmp/AnimeWatch/cookie.txt')
url = str(url)
c.setopt(c.URL, url)
storage = BytesIO()
c.setopt(c.WRITEDATA, storage)
c.perform()
c.close()
content = storage.getvalue()
content = getContentUnicode(content)
return (content)