python类Curl()的实例源码

request.py 文件源码 项目:tus-py-client 作者: tus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, uploader):
        self.handle = pycurl.Curl()
        self.response_headers = {}
        self.output = six.StringIO()
        self.status_code = None

        self.handle.setopt(pycurl.CAINFO, certifi.where())
        self.handle.setopt(pycurl.URL, uploader.url)
        self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
        self.handle.setopt(pycurl.UPLOAD, 1)
        self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')

        self.file = uploader.get_file_stream()
        self.file.seek(uploader.offset)
        self.handle.setopt(pycurl.READFUNCTION, self.file.read)
        self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
        self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)

        headers = ["upload-offset: {}".format(uploader.offset),
                   "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
        self.handle.setopt(pycurl.HTTPHEADER, headers)
mxbackup.py 文件源码 项目:Mobotix-tools 作者: keptenkurk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def transfer(ipaddr, username, password, commandfile):   
    #transfers commandfile to camera
    storage = StringIO()
    c = pycurl.Curl()
    c.setopt(c.URL, 'http://' + ipaddr + '/admin/remoteconfig')
    c.setopt(c.POST, 1)
    c.setopt(c.CONNECTTIMEOUT, 5)
    c.setopt(c.TIMEOUT, TIMEOUT)
    filesize = os.path.getsize(commandfile)
    f = open(commandfile, 'rb')
    c.setopt(c.FAILONERROR, True)
    c.setopt(pycurl.POSTFIELDSIZE, filesize)
    c.setopt(pycurl.READFUNCTION, FileReader(f).read_callback)
    c.setopt(c.WRITEFUNCTION, storage.write)
    c.setopt(pycurl.HTTPHEADER, ["application/x-www-form-urlencoded"])
    c.setopt(c.VERBOSE, VERBOSE)
    c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
    c.setopt(pycurl.USERPWD, username + ':' + password)
    try:
        c.perform()
    except pycurl.error, error:
        errno, errstr = error
        print 'An error occurred: ', errstr
        return False, ''
    c.close()
    content = storage.getvalue()
    f.close()
    return True, content


# ***************************************************************
# *** Main program ***
# ***************************************************************
WebCurl.py 文件源码 项目:python-Speech_Recognition 作者: zthxxx 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_page_data(url, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"GET")
    curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    curl.setopt(pycurl.TIMEOUT, 30)
    curl.setopt(pycurl.SSL_VERIFYPEER, 0)
    curl.setopt(pycurl.SSL_VERIFYHOST, 0)
    curl.perform()
    page_data =stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
WebCurl.py 文件源码 项目:python-Speech_Recognition 作者: zthxxx 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post_page_data(url, data = None, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.POSTFIELDS,  data)
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"POST")
    # curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    # curl.setopt(pycurl.TIMEOUT, 30)
    curl.perform()
    page_data = stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
check_wan_latency.py 文件源码 项目:check_wan_latency 作者: computingbee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getlat4city():
    avglat = -1

    sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
    sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
        sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']

    crl = pyc.Curl()
    sio = StringIO()

    crl.setopt(pyc.URL, sp_url)
        crl.setopt(pyc.HTTPHEADER, sp_http_headers)
    crl.setopt(pyc.WRITEFUNCTION, sio.write)
    crl.perform()
    crl.close()

    lat_http_result = sio.getvalue() #process http result only if html
    if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
        fstring="ping-avg'>"
        lstring="</div>"
        start = lat_http_result.index(fstring) + len(fstring)
        end = lat_http_result.index(lstring,start)
        avglat = lat_http_result[start:end]

    return float(avglat)
monitoring.py 文件源码 项目:hzlgithub 作者: hzlRises 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getWant(line):#??????   
    keyword = keyword_list[line]
    url = url_list[line]
    try:        
        c = pycurl.Curl()
        c.setopt(c.URL,url)
        c.setopt(c.CONNECTTIMEOUT, 60)
        c.setopt(c.TIMEOUT,120) 
        b = StringIO.StringIO()
        c.setopt(c.WRITEFUNCTION,b.write)
        c.perform()
        html = b.getvalue()
        mutex.acquire()
        global match
        global all  
        global percentage
        if(getIfmatch(html)):
            match += 1
        else:
            pass
        all += 1    
        print 'all: '+str(all)+' match: '+str(match)+', percentage '+'%.1f'%((float(match)/all)*100)+'%'
        mutex.release()
    except:
        print '%s Empty reply from server' %keyword
weixin.py 文件源码 项目:hzlgithub 作者: hzlRises 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def Curl(url,headers):
    while 1:
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/')
            c.setopt(pycurl.FOLLOWLOCATION, True)
            c.setopt(pycurl.MAXREDIRS,5)
            c.setopt(pycurl.CONNECTTIMEOUT, 60)
            c.setopt(pycurl.TIMEOUT,120)
            c.setopt(pycurl.ENCODING, 'gzip,deflate')
            c.fp = StringIO.StringIO()  
            c.setopt(pycurl.URL, url)
            c.setopt(pycurl.HTTPHEADER,headers)
            c.setopt(c.WRITEFUNCTION, c.fp.write)
            c.perform()
            html = c.fp.getvalue()
            if '??????' in html:
                print u'??????,??10??'
                time.sleep(600)
            else:
                return html
        except Exception, e:
            print url,'curl(url)',e
            continue 
#????????
weixin.py 文件源码 项目:hzlgithub 作者: hzlRises 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def getArticleInfo(url):
    headers = [
    "User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36",
    "Cookie:RK=EPnrvyKs33; pgv_pvi=9085208576; pac_uid=1_2273358437; noticeLoginFlag=1; ts_uid=864776808; ptcz=5eb5cdef0bba881b5fdad31c18353d40fd0242f5a873f155197343eaec8b3730; pt2gguin=o2273358437; o_cookie=2273358437; tvfe_boss_uuid=1a889f93cefc98b9; pgv_pvid=2616135824",
]
    html = Curl(url,headers)
    title = search('<title>(.*?)</title>',html).decode('utf-8','ignore')
    datetime = search('<em id="post-date" class="rich_media_meta rich_media_meta_text">(.*?)</em>',html).decode('utf-8','ignore')
    num_url= url.replace('http://mp.weixin.qq.com/s','http://mp.weixin.qq.com/mp/getcomment') +'&&uin=&key=&pass_ticket=&wxtoken=&devicetype=&clientversion=0&x5=0'#?????/?????
    num_html = Curl(num_url,headers)
    dict_weixin  = eval(num_html)
    read_num = dict_weixin['read_num']
    like_num = dict_weixin['like_num']
    return title,read_num,like_num,datetime

#?????
xiala_loop.py 文件源码 项目:hzlgithub 作者: hzlRises 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def curl(url, debug=False, **kwargs):
        while 1:
                try:
                        s = StringIO.StringIO()
                        c = pycurl.Curl()
                        c.setopt(pycurl.URL, url)
                        c.setopt(pycurl.REFERER, url)
                        c.setopt(pycurl.FOLLOWLOCATION, True)
                        c.setopt(pycurl.TIMEOUT, 60)
                        c.setopt(pycurl.ENCODING, 'gzip')
                        c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
                        c.setopt(pycurl.NOSIGNAL, True)
                        c.setopt(pycurl.WRITEFUNCTION, s.write)
                        for k, v in kwargs.iteritems():
                                c.setopt(vars(pycurl)[k], v)
                        c.perform()
                        c.close()
                        return s.getvalue()
                except:
                        if debug:
                                raise
                        continue
PayloadSender.py 文件源码 项目:Eagle 作者: magerx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def curl(source_url, is_post, cookie):
    """
    ??pycurl?????WEB???????
    :source_url:       ??URL
    :is_post:   ???POST
    :cookie:    cookie
    """
    buffer = BytesIO()
    c = pycurl.Curl()
    c.setopt(c.ENCODING, 'gzip,deflate')
    c.setopt(c.COOKIE, cookie)
    c.setopt(c.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:42.0) Gecko/20100101 Firefox/42.0')
    try:
        if is_post == 2:  # post
            url, query = source_url.split('?', 1)
            c.setopt(c.URL, url)
            c.setopt(c.POSTFIELDS, query)
        else:
            c.setopt(c.URL, source_url)
        c.setopt(c.WRITEDATA, buffer)
        c.perform()
        c.close()
        return buffer.getvalue()
    except:
        return ''
download.py 文件源码 项目:pyload-requests 作者: pyload 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def close(self):
        """
        Cleanup.
        """
        for chunk in self.chunks:
            self.close_chunk(chunk)

        # Workaround: pycurl segfaults when closing multi, that never had
        # any curl handles
        if hasattr(self, 'manager'):
            with closing(pycurl.Curl()) as c:
                self.__manager.add_handle(c)
                self.__manager.remove_handle(c)

        self.chunks = []
        if hasattr(self, 'manager'):
            self.__manager.close()
            del self.__manager
        if hasattr(self, "info"):
            del self.info
request.py 文件源码 项目:pyload-requests 作者: pyload 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        self.c = pycurl.Curl()
        Request.__init__(self, *args, **kwargs)

        self.rep = io.StringIO()
        self.last_url = None
        self.last_effective_url = None
        self.header = ""

        # cookiejar defines the context
        self.cj = self.context

        self.setopt(pycurl.WRITEFUNCTION, self.write)
        self.setopt(pycurl.HEADERFUNCTION, self.write_header)

    # TODO: Rename to curl
__init__.py 文件源码 项目:matlab_ctp_md_td 作者: xunquant 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _curl_a_link(self, target_url,post_target, commit_date=None):
        '''
        ?????????????????????json??????????curl get??????????????????????????????????elastic???????
        '''

        buffer = StringIO()
        c = Curl()
        c.setopt(c.URL,target_url)
        c.setopt(c.WRITEDATA, buffer)
        c.perform()
        c.close()

        load_target=json.loads(buffer.getvalue())

        return load_target
        pass
sort_total_time.py 文件源码 项目:tools 作者: chenshiyang2015 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_gzip(url): 



        t = Test() 
        c = pycurl.Curl()  
        c.setopt(pycurl.WRITEFUNCTION,t.callback)
        c.setopt(pycurl.ENCODING, 'gzip')
        c.setopt(pycurl.URL,url) 
        c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
        c.perform()   

        TOTAL_TIME = c.getinfo(c.TOTAL_TIME)


        #print "????????%.2f ms" %(TOTAL_TIME*1000) 
        return TOTAL_TIME * 1000
runner_wiki_trace.py 文件源码 项目:hyperbolic-caching 作者: kantai 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def curl_read(url):
    try:
        c = pycurl.Curl()
        c.setopt(c.URL, url)
        resp = StringIO()
        headers = StringIO()
        c.setopt(c.WRITEFUNCTION, resp.write)
        c.setopt(c.HEADERFUNCTION, headers.write)
        c.setopt(pycurl.CONNECTTIMEOUT, 20)
        c.setopt(pycurl.TIMEOUT, 20)
        c.perform()
        if c.getinfo(c.RESPONSE_CODE) == 200:
            c.close()
            is_hit = handle_response(resp, headers)
            size = len(resp)
            return True, is_hit, size

        return False, False, 0
    except:
        return False, False, 0
vodrecord.py 文件源码 项目:necrobot 作者: incnone 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def end_all_async_unsafe(self):
        if not Config.RECORDING_ACTIVATED:
            return

        for rtmp_name in self._recording_rtmps:
            curl = pycurl.Curl()
            try:
                self._set_def_curl_opts(curl)

                curl.setopt(pycurl.URL, self._end_url(rtmp_name))
                curl.setopt(pycurl.WRITEDATA, self._end_buffer)

                curl.perform()
            except pycurl.error as e:
                console.warning(
                    'Pycurl error in end_all() for racer <{0}>: Tried to curl <{1}>. Error {2}.'.format(
                        rtmp_name,
                        self._end_url(rtmp_name),
                        e))
            finally:
                curl.close()

        self._recording_rtmps.clear()
vodrecord.py 文件源码 项目:necrobot 作者: incnone 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _end_record_nolock(self, rtmp_name):
        rtmp_name = rtmp_name.lower()
        if rtmp_name not in self._recording_rtmps:
            return

        curl = pycurl.Curl()
        try:
            self._set_def_curl_opts(curl)
            curl.setopt(pycurl.URL, self._end_url(rtmp_name))
            curl.setopt(pycurl.WRITEDATA, self._end_buffer)

            curl.perform()
            self._recording_rtmps = [r for r in self._recording_rtmps if r != rtmp_name]
        except pycurl.error as e:
            console.warning(
                'Pycurl error in end_record({0}): Tried to curl <{1}>. Error {2}.'.format(
                    rtmp_name,
                    self._end_url(rtmp_name),
                    e))
        finally:
            curl.close()
SubbedAnime.py 文件源码 项目:AnimeWatch 作者: kanishka-linux 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def ccurl_setcookie(url):

    hdr = "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:45.0) Gecko/20100101 Firefox/45.0"
    c = pycurl.Curl()
    c.setopt(c.FOLLOWLOCATION, True)
    c.setopt(c.USERAGENT, hdr)
    c.setopt(c.COOKIEJAR, '/tmp/AnimeWatch/cookie.txt')
    url = str(url)
    c.setopt(c.URL, url)
    storage = BytesIO()
    c.setopt(c.WRITEDATA, storage)
    c.perform()
    c.close()
    content = storage.getvalue()
    content = getContentUnicode(content)

    return (content)


问题


面经


文章

微信
公众号

扫码关注公众号