python类urlsplit()的实例源码

urllib2.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
wsonoma.py 文件源码 项目:recipebook 作者: dpapathanasiou 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getOtherRecipeLinks(self):
        """Return a list of other recipes found in the page: while single recipe
        pages do not have links, the various categories at
        http://www.williams-sonoma.com/recipe/ do.

        For example,
        http://www.williams-sonoma.com/search/results.html?activeTab=recipes&words=winter_weeknight_dinners
        has a collection of individual recipe links, and this method will find them.

        """
        data = []
        for link in self.tree.xpath('//ul[@class="recipe-list"]/li/a'):
            if 'href' in link.keys():
                href = urlsplit(link.get('href'))
                if 'cm_src=RECIPESEARCH' == href.query:
                    data.append(href.scheme + '://' + href.netloc + href.path)
        return data
recipe-578193.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self):
        ind=self.qu.get()
        url=self.url+str(ind)
        soup =bs.BeautifulSoup(''.join( ul.urlopen(url).readlines() ))
        bu = up.urlsplit(self.url)
        print 'started with the ' ,str(url).split('/')[-1],
        for i in  soup.find_all(attrs = { "class" : "recipe-title"}):
            sp = up.urlsplit(i.a.get('href'))
            path = sp.path
            print path
            if re.search(pat, path):
                path = bu.scheme+'://'+bu.netloc+path
                filename = str(path).split('/')[-2]
                filename = op.join(op.abspath(op.curdir),filename+'.py') # recipe will be stored in given location
#                filename = op.join(op.abspath(op.curdir),filename+'.html')
#uncomment the above line if downloading the web page for teh recipe
                print path
                self.q.put((path,filename))
        self.fetch_data()
        time.sleep(1)
        self.qu.task_done()
        self.q.join()
        print 'done with the ' ,str(url).split('/')[-1],
httpclient.py 文件源码 项目:trio2o 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_version_from_url(url):

    components = urlparse.urlsplit(url)

    path = components.path
    pos = path.find('/')

    ver = ''
    if pos == 0:
        path = path[1:]
        i = path.find('/')
        if i >= 0:
            ver = path[:i]
        else:
            ver = path
    elif pos > 0:
        ver = path[:pos]
    else:
        ver = path

    return ver
utils.py 文件源码 项目:trio2o 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.nova.com/compute/v1.1'
    Returns: 'http://www.nova.com/compute'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        raise ValueError('URL %s does not contain version' % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
urlpath.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def click(self, st):
        """Return a path which is the URL where a browser would presumably take
        you if you clicked on a link with an HREF as given.
        """
        scheme, netloc, path, query, fragment = urlparse.urlsplit(st)
        if not scheme:
            scheme = self.scheme
        if not netloc:
            netloc = self.netloc
            if not path:
                path = self.path
                if not query:
                    query = self.query
            elif path[0] != '/':
                l = self.pathList()
                l[-1] = path
                path = '/'.join(l)

        return URLPath(scheme,
                        netloc,
                        path,
                        query,
                        fragment)
urllib2.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
address.py 文件源码 项目:vspheretools 作者: devopshq 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _checkFrom(self, pyobj):
        '''WS-Address From,
        XXX currently not checking the hostname, not forwarding messages.
        pyobj  -- From server returned.
        '''
        if pyobj is None: return
        value = pyobj._Address
        if value != self._addressTo:
            scheme,netloc,path,query,fragment = urlparse.urlsplit(value)
            schemeF,netlocF,pathF,queryF,fragmentF = urlparse.urlsplit(self._addressTo)
            if scheme==schemeF and path==pathF and query==queryF and fragment==fragmentF:
                netloc = netloc.split(':') + ['80']
                netlocF = netlocF.split(':') + ['80']
                if netloc[1]==netlocF[1] and (socket.gethostbyname(netlocF[0]) in
                    ('127.0.0.1', socket.gethostbyname(netloc[0]))):
                    return

            raise WSActionException('wrong WS-Address From(%s), expecting %s'%(value,self._addressTo))
views.py 文件源码 项目:pyconjp-website 作者: pyconjp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def change_locale(request):
    """
    Redirect to a given url while changing the locale in the path
    The url and the locale code need to be specified in the
    request parameters.
    """
    next = request.REQUEST.get('next', None)
    if not next:
        referrer = request.META.get('HTTP_REFERER', None)
        if referrer:
            next = urlsplit(referrer)[2]
    if not next:
        next = '/'
    _, path = utils.strip_path(next)
    if request.method == 'POST':
        locale = request.POST.get('locale', None)
        if locale and check_for_language(locale):
            if localeurl_settings.USE_SESSION:
                request.session['django_language'] = locale
            path = utils.locale_path(path, locale)

    response = http.HttpResponseRedirect(path)
    return response
protocol_alt.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_socket.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_loop.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_spy.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
devtools_parser.py 文件源码 项目:devtools-parser 作者: WPO-Foundation 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_base_page_info(self, page_data):
        """Find the reverse-ip info for the base page"""
        domain = urlparse.urlsplit(page_data['final_url']).hostname
        try:
            import socket
            addr = socket.gethostbyname(domain)
            host = str(socket.gethostbyaddr(addr)[0])
            page_data['base_page_ip_ptr'] = host
        except Exception:
            pass
        # keep moving up the domain until we can get a NS record
        while domain is not None and 'base_page_dns_soa' not in page_data:
            try:
                import dns.resolver
                dns_servers = dns.resolver.query(domain, "NS")
                dns_server = str(dns_servers[0].target).strip('. ')
                page_data['base_page_dns_ns'] = dns_server
            except Exception:
                pass
            pos = domain.find('.')
            if pos > 0:
                domain = domain[pos + 1:]
            else:
                domain = None
contextmenu.py 文件源码 项目:linkchecker-gui 作者: linkcheck 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def can_view_parent_source (self, url_data):
        """Determine if parent URL source can be retrieved."""
        if not url_data.valid:
            return False
        parent = url_data.parent_url
        if not parent:
            return False
        # Directory contents are dynamically generated, so it makes
        # no sense in viewing/editing them.
        if parent.startswith(u"file:"):
            path = urlparse.urlsplit(parent)[2]
            return not os.path.isdir(get_os_filename(path))
        if parent.startswith((u"ftp:", u"ftps:")):
            path = urlparse.urlsplit(parent)[2]
            return bool(path) and not path.endswith(u'/')
        # Only HTTP left
        return parent.startswith((u"http:", u"https:"))
protocol_alt.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_socket.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_loop.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_spy.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
feedparser.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
pipeline_functions.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
pipeline_functions.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
pipeline_functions.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
pipeline_functions.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
pipeline_functions.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
proxylib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_http_request(self, method, url, headers, body, timeout, **kwargs):
        scheme, netloc, path, query, _ = urlparse.urlsplit(url)
        if netloc.rfind(':') <= netloc.rfind(']'):
            # no port number
            host = netloc
            port = 443 if scheme == 'https' else 80
        else:
            host, _, port = netloc.rpartition(':')
            port = int(port)
        if query:
            path += '?' + query
        if 'Host' not in headers:
            headers['Host'] = host
        if body and 'Content-Length' not in headers:
            headers['Content-Length'] = str(len(body))
        ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection
        connection = ConnectionType(netloc, timeout=timeout)
        connection.request(method, path, body=body, headers=headers)
        response = connection.getresponse()
        return response
proxylib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
urllib2.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
throttle.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _CalculateRequestSize(self, req):
    """Calculates the request size.

    Args:
      req: A tuple of (uri, method name, request body, header map)
    Returns:
      the size of the request, in bytes.
    """
    uri, method, body, headers = req
    (unused_scheme,
     unused_host_port, url_path,
     unused_query, unused_fragment) = urlparse.urlsplit(uri)
    size = len('%s %s HTTP/1.1\n' % (method, url_path))
    size += self._CalculateHeaderSize(headers)


    if body:
      size += len(body)
    return size
taskqueue.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _parse_relative_url(relative_url):
  """Parses a relative URL and splits it into its path and query string.

  Args:
    relative_url: The relative URL, starting with a '/'.

  Returns:
    Tuple (path, query) where:
      path: The path in the relative URL.
      query: The query string in the URL without the '?' character.

  Raises:
    _RelativeUrlError if the relative_url is invalid for whatever reason.
  """
  if not relative_url:
    raise _RelativeUrlError('Relative URL is empty')
  (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
  if scheme or netloc:
    raise _RelativeUrlError('Relative URL may not have a scheme or location')
  if fragment:
    raise _RelativeUrlError('Relative URL may not specify a fragment')
  if not path or path[0] != '/':
    raise _RelativeUrlError('Relative URL path must start with "/"')
  return path, query


问题


面经


文章

微信
公众号

扫码关注公众号