python类WRITEFUNCTION的实例源码

InstagramRegistration.py 文件源码 项目:Instagram-API 作者: danleyb2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
tuling.py 文件源码 项目:QQBot 作者: springhack 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def CurlPOST(url, data, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
    # c.setopt(pycurl.TIMEOUT, 10)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.setopt(pycurl.POSTFIELDS, data)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
test_fetch.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_post(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
test_fetch.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_post_data(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", post=True, data="data", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.POST: True,
                          pycurl.POSTFIELDSIZE: 4,
                          pycurl.READFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
test_fetch.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_cainfo(self):
        curl = CurlStub(b"result")
        result = fetch("https://example.com", cainfo="cainfo", curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"https://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.CAINFO: b"cainfo",
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
test_fetch.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_timeouts(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com", connect_timeout=5,
                       total_timeout=30, curl=curl)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 5,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 30,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
test_fetch.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_pycurl_insecure(self):
        curl = CurlStub(b"result")
        result = fetch("http://example.com/get-ca-cert", curl=curl,
                       insecure=True)
        self.assertEqual(result, b"result")
        self.assertEqual(curl.options,
                         {pycurl.URL: b"http://example.com/get-ca-cert",
                          pycurl.FOLLOWLOCATION: 1,
                          pycurl.MAXREDIRS: 5,
                          pycurl.CONNECTTIMEOUT: 30,
                          pycurl.LOW_SPEED_LIMIT: 1,
                          pycurl.LOW_SPEED_TIME: 600,
                          pycurl.NOSIGNAL: 1,
                          pycurl.WRITEFUNCTION: Any(),
                          pycurl.SSL_VERIFYPEER: False,
                          pycurl.DNS_CACHE_TIMEOUT: 0,
                          pycurl.ENCODING: b"gzip,deflate"})
wzhifuSDK.py 文件源码 项目:GOKU 作者: bingweichen 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def postXmlSSL(self, xml, url, second=30, cert=True, post=True):
        """????"""
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.TIMEOUT, second)
        # ????
        # ?????cert ? key ??????.pem??
        # ?????PEM?????
        if cert:
            self.curl.setopt(pycurl.SSLKEYTYPE, "PEM")
            self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH)
            self.curl.setopt(pycurl.SSLCERTTYPE, "PEM")
            self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH)
        # post????
        if post:
            self.curl.setopt(pycurl.POST, True)
            self.curl.setopt(pycurl.POSTFIELDS, xml)
        buff = StringIO()
        self.curl.setopt(pycurl.WRITEFUNCTION, buff.write)

        self.curl.perform()
        return buff.getvalue()
zoomeye.py 文件源码 项目:ZPoc 作者: zidier215 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_url(self, url):
        if self.API_TOKEN == None:
            logging.error('none token') # 3 For ERROR level
            return
        try:
            c = pycurl.Curl()
            c.setopt(pycurl.CAINFO, certifi.where())
            c.setopt(pycurl.URL, url)
            b = StringIO.StringIO()
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.perform()
            result = b.getvalue()
            logging.debug('result')
        except Exception as e:
            logging.error(e.message)
            logging.error('go error')
            pass
        return result
request.py 文件源码 项目:tus-py-client 作者: tus 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, uploader):
        self.handle = pycurl.Curl()
        self.response_headers = {}
        self.output = six.StringIO()
        self.status_code = None

        self.handle.setopt(pycurl.CAINFO, certifi.where())
        self.handle.setopt(pycurl.URL, uploader.url)
        self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
        self.handle.setopt(pycurl.UPLOAD, 1)
        self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')

        self.file = uploader.get_file_stream()
        self.file.seek(uploader.offset)
        self.handle.setopt(pycurl.READFUNCTION, self.file.read)
        self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
        self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)

        headers = ["upload-offset: {}".format(uploader.offset),
                   "Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
        self.handle.setopt(pycurl.HTTPHEADER, headers)
WebCurl.py 文件源码 项目:python-Speech_Recognition 作者: zthxxx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_page_data(url, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"GET")
    curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    curl.setopt(pycurl.TIMEOUT, 30)
    curl.setopt(pycurl.SSL_VERIFYPEER, 0)
    curl.setopt(pycurl.SSL_VERIFYHOST, 0)
    curl.perform()
    page_data =stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
WebCurl.py 文件源码 项目:python-Speech_Recognition 作者: zthxxx 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def post_page_data(url, data = None, head = None, curl = None):
    stream_buffer = StringIO()
    if not curl:
        curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)#curl doesn't support unicode
    if head:
        curl.setopt(pycurl.HTTPHEADER,  head)#must be list, not dict
    curl.setopt(pycurl.POSTFIELDS,  data)
    curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
    curl.setopt(pycurl.CUSTOMREQUEST,"POST")
    # curl.setopt(pycurl.CONNECTTIMEOUT, 30)
    # curl.setopt(pycurl.TIMEOUT, 30)
    curl.perform()
    page_data = stream_buffer.getvalue()
    stream_buffer.close()
    return page_data
check_wan_latency.py 文件源码 项目:check_wan_latency 作者: computingbee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getlat4city():
    avglat = -1

    sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
    sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
        sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']

    crl = pyc.Curl()
    sio = StringIO()

    crl.setopt(pyc.URL, sp_url)
        crl.setopt(pyc.HTTPHEADER, sp_http_headers)
    crl.setopt(pyc.WRITEFUNCTION, sio.write)
    crl.perform()
    crl.close()

    lat_http_result = sio.getvalue() #process http result only if html
    if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
        fstring="ping-avg'>"
        lstring="</div>"
        start = lat_http_result.index(fstring) + len(fstring)
        end = lat_http_result.index(lstring,start)
        avglat = lat_http_result[start:end]

    return float(avglat)
xiala_loop.py 文件源码 项目:hzlgithub 作者: hzlRises 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def curl(url, debug=False, **kwargs):
        while 1:
                try:
                        s = StringIO.StringIO()
                        c = pycurl.Curl()
                        c.setopt(pycurl.URL, url)
                        c.setopt(pycurl.REFERER, url)
                        c.setopt(pycurl.FOLLOWLOCATION, True)
                        c.setopt(pycurl.TIMEOUT, 60)
                        c.setopt(pycurl.ENCODING, 'gzip')
                        c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
                        c.setopt(pycurl.NOSIGNAL, True)
                        c.setopt(pycurl.WRITEFUNCTION, s.write)
                        for k, v in kwargs.iteritems():
                                c.setopt(vars(pycurl)[k], v)
                        c.perform()
                        c.close()
                        return s.getvalue()
                except:
                        if debug:
                                raise
                        continue
request.py 文件源码 项目:pyload-requests 作者: pyload 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        self.c = pycurl.Curl()
        Request.__init__(self, *args, **kwargs)

        self.rep = io.StringIO()
        self.last_url = None
        self.last_effective_url = None
        self.header = ""

        # cookiejar defines the context
        self.cj = self.context

        self.setopt(pycurl.WRITEFUNCTION, self.write)
        self.setopt(pycurl.HEADERFUNCTION, self.write_header)

    # TODO: Rename to curl
mzURL.py 文件源码 项目:multiplierz 作者: BlaisProteomics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def scan(self, time):
        scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0]))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, str(scan_name + '.txt'))

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'scan %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        # how to get charge for an mzURL scan?
        return mzScan([tuple(float(v) for v in s.split()) for s in scan],
                      scan_time, mode=scan_mode, mz=mz)
mzURL.py 文件源码 项目:multiplierz 作者: BlaisProteomics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
        xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
                                                                  start_mz, stop_mz)))

        self.crl.setopt(pycurl.HTTPGET, True)
        self.crl.setopt(pycurl.URL, xic_url)

        response = cStringIO.StringIO()
        self.crl.setopt(pycurl.WRITEFUNCTION, response.write)

        for i in range(5):
            #print 'xic %d' % i
            self.crl.perform()
            if response.getvalue():
                break

        scan = response.getvalue().splitlines()

        return [tuple(float(v) for v in s.split()) for s in scan]
sort_total_time.py 文件源码 项目:tools 作者: chenshiyang2015 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_gzip(url): 



        t = Test() 
        c = pycurl.Curl()  
        c.setopt(pycurl.WRITEFUNCTION,t.callback)
        c.setopt(pycurl.ENCODING, 'gzip')
        c.setopt(pycurl.URL,url) 
        c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
        c.perform()   

        TOTAL_TIME = c.getinfo(c.TOTAL_TIME)


        #print "????????%.2f ms" %(TOTAL_TIME*1000) 
        return TOTAL_TIME * 1000
zoomeye.py 文件源码 项目:pymsf 作者: s0m30ne 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def searchIP(self, query, pages, queue, STOP_ME):
        if self.API_TOKEN == None:
            print "please config your API_TOKEN"
            sys.exit()
        for page in range(1, pages+1):
            b = StringIO.StringIO()
            c = pycurl.Curl()
            c.setopt(pycurl.URL, "%s?query=%s&page=%s" % (self.API_URL, query, page))
            c.setopt(pycurl.WRITEFUNCTION, b.write)
            c.setopt(pycurl.FOLLOWLOCATION, 1)
            c.setopt(pycurl.CUSTOMREQUEST, "GET")
            c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
            c.perform()
            hosts = json.loads(b.getvalue())
            for host in hosts['matches']:
                queue.put(host["ip"])
        STOP_ME[0] = True
restClient.py 文件源码 项目:recipebook 作者: dpapathanasiou 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get (url, user_agent=UA, referrer=None):
    """Make a GET request of the url using pycurl and return the data
    (which is None if unsuccessful)"""

    data = None
    databuffer = StringIO()

    curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)
    curl.setopt(pycurl.FOLLOWLOCATION, 1)
    curl.setopt(pycurl.CONNECTTIMEOUT, 5)
    curl.setopt(pycurl.TIMEOUT, 8)
    curl.setopt(pycurl.WRITEFUNCTION, databuffer.write)
    curl.setopt(pycurl.COOKIEFILE, '')
    if user_agent:
        curl.setopt(pycurl.USERAGENT, user_agent)
    if referrer is not None:
        curl.setopt(pycurl.REFERER, referrer)
    try:
        curl.perform()
        data = databuffer.getvalue()
    except Exception:
        pass
    curl.close()

    return data
Checkpoint.py 文件源码 项目:Instagram-API 作者: danleyb2 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def request(self, endpoint, headers=None, post=None, first=True):
        buffer = BytesIO()

        ch = pycurl.Curl()

        ch.setopt(pycurl.URL, endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        if headers:
            ch.setopt(pycurl.HTTPHEADER, headers)

        ch.setopt(pycurl.VERBOSE, self.debug)
        ch.setopt(pycurl.SSL_VERIFYPEER, False)
        ch.setopt(pycurl.SSL_VERIFYHOST, False)
        ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
        ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')

        if post:
            import urllib
            ch.setopt(pycurl.POST, len(post))
            ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]
        ch.close()

        if self.debug:
            import urllib
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
            print("RESPONSE: " + body + "\n")

        return [header, json_decode(body)]
tuling.py 文件源码 项目:QQBot 作者: springhack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def CurlGET(url, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    # c.setopt(pycurl.TIMEOUT, 10)
    # c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
transport.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()
youtubeprovider.py 文件源码 项目:xmusic-crawler 作者: rockers7414 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __search_video_id(self, query):
        params = {
            "key": self.key,
            "part": "id,snippet",
            "order": "relevance",
            "type": "video",
            "videoSyndicated": "true",
            "maxResults": 3,
            "q": query
        }

        buf = BytesIO()

        client = pycurl.Curl()
        client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/search?" +
                      urlencode(params))
        client.setopt(pycurl.WRITEFUNCTION, buf.write)
        client.perform()
        client.close()

        body = json.loads(buf.getvalue().decode("utf-8"))
        buf.close()

        if "error" in body:
            raise Exception("query error: {0}".format(body))

        if len(body["items"]) == 0:
            raise Exception("result not found")

        video_id = body["items"][0]["id"]["videoId"]

        return video_id
youtubeprovider.py 文件源码 项目:xmusic-crawler 作者: rockers7414 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __get_video_duration(self, video_id):
        params = {
            "key": self.key,
            "part": "contentDetails",
            "id": video_id
        }

        buf = BytesIO()

        client = pycurl.Curl()
        client.setopt(pycurl.URL, self.SERVICE_HOST + "/youtube/v3/videos?" +
                      urlencode(params))
        client.setopt(pycurl.WRITEFUNCTION, buf.write)
        client.perform()
        client.close()

        body = json.loads(buf.getvalue().decode("utf-8"))
        buf.close()

        if "error" in body:
            raise Exception("query error: {0}".format(body))

        if len(body["items"]) == 0:
            raise Exception("result not found")

        duration = body["items"][0]["contentDetails"]["duration"]
        duration_seconds = isodate.parse_duration(duration).seconds

        return duration_seconds
spotifyprovider.py 文件源码 项目:xmusic-crawler 作者: rockers7414 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __search(self, keyword, search_type):
        offset = 0
        result_key = search_type + "s"
        result = []

        while True:
            self.logger.info("search " + keyword + ", offset = " + str(offset))
            params = {
                "q": keyword,
                "type": search_type,
                "offset": offset,
                "limit": 50
            }

            buf = BytesIO()

            client = pycurl.Curl()
            client.setopt(pycurl.URL, self.service_host +
                          "/v1/search" + "?" + urlencode(params))
            client.setopt(pycurl.WRITEFUNCTION, buf.write)
            client.perform()
            client.close()

            body = json.loads(buf.getvalue().decode("utf-8"))
            buf.close()

            if body[result_key]["total"] > 0:
                for data in body[result_key]["items"]:
                    result.append(data)

            if body[result_key]["total"] > \
                    body[result_key]["limit"] * (offset + 1):
                offset = offset + 1
            else:
                break

        self.logger.info("Done")
        return result
spotifyprovider.py 文件源码 项目:xmusic-crawler 作者: rockers7414 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_tracks_by_album_id(self, album_id):
        self.logger.info("Get tracks of the album(" +
                         album_id + ") from Spotify.")
        if not album_id:
            return None

        offset = 0
        tracks = []
        while True:
            params = {
                "offset": offset,
                "limit": 50
            }

            buf = BytesIO()

            client = pycurl.Curl()
            client.setopt(pycurl.URL, self.service_host + "/v1/albums/" +
                          album_id + "/tracks" + "?" + urlencode(params))
            client.setopt(pycurl.WRITEFUNCTION, buf.write)
            client.perform()
            client.close()

            body = json.loads(buf.getvalue().decode("utf-8"))
            buf.close()

            if body["total"] > 0:
                for data in body["items"]:
                    tracks.append(
                        Track(data["name"], None, data["track_number"]))

            if body["total"] > body["limit"] * (offset + 1):
                offset = offset + 1
            else:
                break

        return tracks
myhttp.py 文件源码 项目:defcon-workshop 作者: devsecops 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process(self, prio, obj):
    self.pause.wait()
    c = obj.to_http_object(self.freelist.get())
    if self._proxies: c = self._set_proxy(c, obj)

    c.response_queue = ((StringIO(), StringIO(), obj))
    c.setopt(pycurl.WRITEFUNCTION, c.response_queue[0].write)
    c.setopt(pycurl.HEADERFUNCTION, c.response_queue[1].write)

    with self.mutex_multi:
        self.m.add_handle(c)
Request.py 文件源码 项目:defcon-workshop 作者: devsecops 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def head(self):
        conn=pycurl.Curl()
        conn.setopt(pycurl.SSL_VERIFYPEER,False)
        conn.setopt(pycurl.SSL_VERIFYHOST,0)
        conn.setopt(pycurl.URL,self.completeUrl)

        conn.setopt(pycurl.NOBODY, True) # para hacer un pedido HEAD

        conn.setopt(pycurl.WRITEFUNCTION, self.header_callback)
        conn.perform()

        rp=Response()
        rp.parseResponse(self.__performHead)
        self.response=rp
transport.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def request(self, url, method, body, headers):
            c = pycurl.Curl()
            c.setopt(pycurl.URL, url)
            if 'proxy_host' in self.proxy:
                c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
            if 'proxy_port' in self.proxy:
                c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
            if 'proxy_user' in self.proxy:
                c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
            self.buf = StringIO()
            c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
            #c.setopt(pycurl.READFUNCTION, self.read)
            #self.body = StringIO(body)
            #c.setopt(pycurl.HEADERFUNCTION, self.header)
            if self.cacert:
                c.setopt(c.CAINFO, self.cacert)
            c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
            c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
            c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
            c.setopt(pycurl.TIMEOUT, self.timeout)
            if method == 'POST':
                c.setopt(pycurl.POST, 1)
                c.setopt(pycurl.POSTFIELDS, body)
            if headers:
                hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
                log.debug(hdrs)
                c.setopt(pycurl.HTTPHEADER, hdrs)
            c.perform()
            c.close()
            return {}, self.buf.getvalue()


问题


面经


文章

微信
公众号

扫码关注公众号