python类HTTP的实例源码

test_httplib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_response_headers(self):
        # test response with multiple message headers with the same field name.
        text = ('HTTP/1.1 200 OK\r\n'
                'Set-Cookie: Customer="WILE_E_COYOTE";'
                ' Version="1"; Path="/acme"\r\n'
                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
                ' Path="/acme"\r\n'
                '\r\n'
                'No body\r\n')
        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
               ', '
               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
        s = FakeSocket(text)
        r = httplib.HTTPResponse(s)
        r.begin()
        cookies = r.getheader("Set-Cookie")
        if cookies != hdr:
            self.fail("multiple headers not combined properly")
test_httplib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_chunked_head(self):
        chunked_start = (
            'HTTP/1.1 200 OK\r\n'
            'Transfer-Encoding: chunked\r\n\r\n'
            'a\r\n'
            'hello world\r\n'
            '1\r\n'
            'd\r\n'
        )
        sock = FakeSocket(chunked_start + '0\r\n')
        resp = httplib.HTTPResponse(sock, method="HEAD")
        resp.begin()
        self.assertEqual(resp.read(), '')
        self.assertEqual(resp.status, 200)
        self.assertEqual(resp.reason, 'OK')
        self.assertTrue(resp.isclosed())
urllib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
        if 'location' in headers:
            newurl = headers['location']
        elif 'uri' in headers:
            newurl = headers['uri']
        else:
            return
        fp.close()
        # In case the server sent a relative URL, join with original:
        newurl = basejoin(self.type + ":" + url, newurl)

        # For security reasons we do not allow redirects to protocols
        # other than HTTP, HTTPS or FTP.
        newurl_lower = newurl.lower()
        if not (newurl_lower.startswith('http://') or
                newurl_lower.startswith('https://') or
                newurl_lower.startswith('ftp://')):
            raise IOError('redirect error', errcode,
                          errmsg + " - Redirection to url '%s' is not allowed" %
                          newurl,
                          headers)

        return self.open(newurl)
PluginMovieMovieMeter.py 文件源码 项目:griffith 作者: Strit 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_connection(self, host):
        self.realhost = host
        proxies = urllib.getproxies()
        proxyurl = None
        if 'http' in proxies:
            proxyurl = proxies['http']
        elif 'all' in proxies:
            proxyurl = proxies['all']
        if proxyurl:
            urltype, proxyhost = urllib.splittype(proxyurl)
            host, selector = urllib.splithost(proxyhost)
            h = httplib.HTTP(host)
            self.proxy_is_used = True
            return h
        else:
            self.proxy_is_used = False
            return Transport.make_connection(self, host)
urllib.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
        if 'location' in headers:
            newurl = headers['location']
        elif 'uri' in headers:
            newurl = headers['uri']
        else:
            return
        fp.close()
        # In case the server sent a relative URL, join with original:
        newurl = basejoin(self.type + ":" + url, newurl)

        # For security reasons we do not allow redirects to protocols
        # other than HTTP, HTTPS or FTP.
        newurl_lower = newurl.lower()
        if not (newurl_lower.startswith('http://') or
                newurl_lower.startswith('https://') or
                newurl_lower.startswith('ftp://')):
            raise IOError('redirect error', errcode,
                          errmsg + " - Redirection to url '%s' is not allowed" %
                          newurl,
                          headers)

        return self.open(newurl)
urllib.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
        if 'location' in headers:
            newurl = headers['location']
        elif 'uri' in headers:
            newurl = headers['uri']
        else:
            return
        fp.close()
        # In case the server sent a relative URL, join with original:
        newurl = basejoin(self.type + ":" + url, newurl)

        # For security reasons we do not allow redirects to protocols
        # other than HTTP, HTTPS or FTP.
        newurl_lower = newurl.lower()
        if not (newurl_lower.startswith('http://') or
                newurl_lower.startswith('https://') or
                newurl_lower.startswith('ftp://')):
            raise IOError('redirect error', errcode,
                          errmsg + " - Redirection to url '%s' is not allowed" %
                          newurl,
                          headers)

        return self.open(newurl)
flickrUpload.py 文件源码 项目:flickrpy 作者: khaxis 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post_multipart(host, selector, fields, files):
    """
    Post fields and files to an http host as multipart/form-data.
    fields is a sequence of (name, value) elements for regular form fields.
    files is a sequence of (name, filename, value) elements for data to be uploaded as files
    Return the server's response page.
    """
    content_type, body = encode_multipart_formdata(fields, files)
    h = httplib.HTTP(host)
    h.putrequest('POST', selector)
    h.putheader('content-type', content_type)
    h.putheader('content-length', str(len(body)))
    h.endheaders()
    h.send(body)
    errcode, errmsg, headers = h.getreply()
    return h.file.read()
XMLRPCTransport.py 文件源码 项目:GAMADV-X 作者: taers232c 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def make_connection(self, host):
        # create a HTTPS connection object from a host descriptor
        host, extra_headers, x509 = self.get_host_info(host)
        http = HTTPTLSConnection(host, None,
                                 self.username, self.password,
                                 self.sharedKey,
                                 self.certChain, self.privateKey,
                                 self.checker.cryptoID,
                                 self.checker.protocol,
                                 self.checker.x509Fingerprint,
                                 self.checker.x509TrustList,
                                 self.checker.x509CommonName,
                                 self.settings)
        http2 = httplib.HTTP()
        http2._setup(http)
        return http2
urllib.py 文件源码 项目:pmatic 作者: LarsMichelsen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
        if 'location' in headers:
            newurl = headers['location']
        elif 'uri' in headers:
            newurl = headers['uri']
        else:
            return
        fp.close()
        # In case the server sent a relative URL, join with original:
        newurl = basejoin(self.type + ":" + url, newurl)

        # For security reasons we do not allow redirects to protocols
        # other than HTTP, HTTPS or FTP.
        newurl_lower = newurl.lower()
        if not (newurl_lower.startswith('http://') or
                newurl_lower.startswith('https://') or
                newurl_lower.startswith('ftp://')):
            raise IOError('redirect error', errcode,
                          errmsg + " - Redirection to url '%s' is not allowed" %
                          newurl,
                          headers)

        return self.open(newurl)
xmlrpctransport.py 文件源码 项目:sslxray 作者: portcullislabs 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def make_connection(self, host):
        # return an existing connection if possible.  This allows
        # HTTP/1.1 keep-alive.
        if self._connection and host == self._connection[0]:
            http = self._connection[1]
        else:
            # create a HTTPS connection object from a host descriptor
            chost, extra_headers, x509 = self.get_host_info(host)

            http = HTTPTLSConnection(chost, None,
                                     username=self.username, password=self.password,
                                     certChain=self.certChain, privateKey=self.privateKey,
                                     checker=self.checker,
                                     settings=self.settings,
                                     ignoreAbruptClose=self.ignoreAbruptClose)
            # store the host argument along with the connection object
            self._connection = host, http
        if not self.conn_class_is_http:
            return http
        http2 = httplib.HTTP()
        http2._setup(http)
        return http2
urllib.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
        if 'location' in headers:
            newurl = headers['location']
        elif 'uri' in headers:
            newurl = headers['uri']
        else:
            return
        fp.close()
        # In case the server sent a relative URL, join with original:
        newurl = basejoin(self.type + ":" + url, newurl)

        # For security reasons we do not allow redirects to protocols
        # other than HTTP, HTTPS or FTP.
        newurl_lower = newurl.lower()
        if not (newurl_lower.startswith('http://') or
                newurl_lower.startswith('https://') or
                newurl_lower.startswith('ftp://')):
            raise IOError('redirect error', errcode,
                          errmsg + " - Redirection to url '%s' is not allowed" %
                          newurl,
                          headers)

        return self.open(newurl)
WindowsPhoneNotify.py 文件源码 项目:pyload-plugins 作者: pyload 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def send(self, event, msg, key):
        id, url = key
        request = self.format_request(
            "%s: %s" %
            (event, msg) if msg else event)
        webservice = httplib.HTTP(url)

        webservice.putrequest("POST", id)
        webservice.putheader("Host", url)
        webservice.putheader("Content-type", "text/xml")
        webservice.putheader("X-NotificationClass", "2")
        webservice.putheader("X-WindowsPhone-Target", "toast")
        webservice.putheader("Content-length", "%d" % len(request))
        webservice.endheaders()
        webservice.send(request)
        webservice.close()
exaleadsearch.py 文件源码 项目:pat 作者: GusKhawaja 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def do_search_files(self, files):
        h = httplib.HTTP(self.server)
        h.putrequest(
            'GET',
            "search/web/results/?q=" +
            self.word +
            "filetype:" +
            self.files +
            "&elements_per_page=50&start_index=" +
            self.counter)
        h.putheader('Host', self.hostname)
        h.putheader('User-agent', self.userAgent)
        h.endheaders()
        returncode, returnmsg, headers = h.getreply()
        self.results = h.getfile().read()
        self.totalresults += self.results
goofile.py 文件源码 项目:pat 作者: GusKhawaja 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(dmn,file):

    h = httplib.HTTP('www.google.com')
    h.putrequest('GET',"/search?num=500&q=site:"+dmn+"+filetype:"+file)
    h.putheader('Host', 'www.google.com')
    h.putheader('User-agent', 'Internet Explorer 6.0 ')
    h.putheader('Referrer', 'www.g13net.com')
    h.endheaders()
    returncode, returnmsg, headers = h.getreply()
    data=h.getfile().read()
    data=re.sub('<b>','',data)
        for e in ('>','=','<','\\','(',')','"','http',':','//'):
        data = string.replace(data,e,' ')
    r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+file) 
    res = r1.findall(data) 
    return res
bingsearch.py 文件源码 项目:infoga 作者: cys3c 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def run_search(self):
        try:
            con = httplib.HTTP(self.server)
            con.putrequest('GET', '/search?q=%40'+self.keyword)
            con.putheader('Host', self.host)
            con.putheader('Cookie', 'SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50')
            con.putheader('Accept-Language', 'en-us,en')
            con.putheader('User-agent', self.u_agent)
            con.endheaders()
            # return code,messagge and header
            returncode, returnmsg, header = con.getreply()
            self.results = con.getfile().read()
            self.tresult += self.results
        except Exception as err: 
            print "\t\t|"
            print "\t\t|__"+self.r+" Server not found!!\n"+self.t
client.py 文件源码 项目:ZServer 作者: zopefoundation 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getreply(self):
        file = self.sock.makefile('rb')
        data = ''.join(file.readlines())
        file.close()
        self.file = StringIO(data)
        line = self.file.readline()
        try:
            [ver, code, msg] = line.split(None, 2)
        except ValueError:
            try:
                [ver, code] = line.split(None, 1)
                msg = ""
            except ValueError:
                return -1, line, None
        if ver[:5] != 'HTTP/':
            return -1, line, None
        code = int(code)
        msg = msg.strip()
        headers = mimetools.Message(self.file, 0)
        return ver, code, msg, headers
client.py 文件源码 项目:ZServer 作者: zopefoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __snd_request(self, method, uri, headers={}, body='', eh=1):
        try:
            h = HTTP()
            h.connect(self.host, self.port)
            h.putrequest(method, uri)
            for n, v in headers.items():
                h.putheader(n, v)
            if eh:
                h.endheaders()
            if body:
                h.send(body)
            ver, code, msg, hdrs = h.getreply()
            data = h.getfile().read()
            h.close()
        except Exception:
            raise NotAvailable(sys.exc_value)
        return http_response(ver, code, msg, hdrs, data)

    # HTTP methods


问题


面经


文章

微信
公众号

扫码关注公众号