python类urlsplit()的实例源码

urllib2.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
throttle.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _CalculateRequestSize(self, req):
    """Calculates the request size.

    May be overriden to support different types of requests.

    Args:
      req: A urllib2.Request.

    Returns:
      the size of the request, in bytes.
    """
    (unused_scheme,
     unused_host_port, url_path,
     unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
    size = len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
    size += self._CalculateHeaderSize(req.headers)
    size += self._CalculateHeaderSize(req.unredirected_hdrs)


    data = req.get_data()
    if data:
      size += len(data)
    return size
taskqueue.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _parse_relative_url(relative_url):
  """Parses a relative URL and splits it into its path and query string.

  Args:
    relative_url: The relative URL, starting with a '/'.

  Returns:
    Tuple (path, query) where:
      path: The path in the relative URL.
      query: The query string in the URL without the '?' character.

  Raises:
    _RelativeUrlError if the relative_url is invalid for whatever reason.
  """
  if not relative_url:
    raise _RelativeUrlError('Relative URL is empty')
  (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
  if scheme or netloc:
    raise _RelativeUrlError('Relative URL may not have a scheme or location')
  if fragment:
    raise _RelativeUrlError('Relative URL may not specify a fragment')
  if not path or path[0] != '/':
    raise _RelativeUrlError('Relative URL path must start with "/"')
  return path, query
utils.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def join_url(base_url, path):
    """Joins base url and path removing extra slashes.
    Removes trailing slashes. Joins queries.
    E.g.: See unit tests.
    :param base_url: Base url.
    :param path: Path.
    :return: Joined url.
    """
    # Example of usages see in unittests
    base_url = urlparse.urlsplit(base_url, allow_fragments=False)
    path = urlparse.urlsplit(path, allow_fragments=False)
    full_path = _join_paths(base_url.path, path.path)
    full_query = _join_queries(base_url.query, path.query)
    return urlparse.urlunsplit(
        (base_url.scheme, base_url.netloc, full_path, full_query,
         base_url.fragment))
images.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_registry_status(url=DEFAULT_IMAGES_URL, _v2=False):
    """
    Performs api check for registry health status.

    :params url: registry url
    :raises RegistryError: if registry is not available
    """
    url = urlsplit(url)._replace(path='/v2/' if _v2 else '/v1/_ping').geturl()

    with raise_registry_error(url):
        response = requests.get(url, timeout=PING_REQUEST_TIMEOUT,
                                verify=False)
        need_v2 = not _v2 and response.status_code == 404 and \
            response.headers.get(API_VERSION_HEADER) == 'registry/2.0'
        if need_v2:
            check_registry_status(url, _v2=True)
        elif response.status_code == 401:
            return  # user is not authorized, but registry is available
        else:
            response.raise_for_status()
helpers.py 文件源码 项目:webkit-crawler 作者: dozymoe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def url_join(*parts, **kwargs):
    """
    Normalize url parts and join them with a slash.
    adapted from: http://codereview.stackexchange.com/q/13027
    """
    def concat_paths(sequence):
        result = []
        for path in sequence:
            result.append(path)
            if path.startswith('/'):
                break
        return '/'.join(reversed(result))

    schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts)))
    scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http'))
    netloc = next((x for x in netlocs if x), '')
    path = concat_paths(paths)
    query = queries[0]
    fragment = fragments[0]
    return urlunsplit((scheme, netloc, path, query, fragment))
disk_cache.py 文件源码 项目:WebScraping 作者: liinnux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def url_to_path(self, url):
        """Create file system path for this URL
        """
        components = urlparse.urlsplit(url)
        # when empty path set to /index.html
        path = components.path
        if not path:
            path = '/index.html'
        elif path.endswith('/'):
            path += 'index.html'
        filename = components.netloc + path + components.query
        # replace invalid characters
        filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename)
        # restrict maximum number of characters
        filename = '/'.join(segment[:255] for segment in filename.split('/'))
        return os.path.join(self.cache_dir, filename)
model_dictize.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resource_dictize(res, context):
    model = context['model']
    resource = d.table_dictize(res, context)
    extras = resource.pop("extras", None)
    if extras:
        resource.update(extras)
    # some urls do not have the protocol this adds http:// to these
    url = resource['url']
    ## for_edit is only called at the times when the dataset is to be edited
    ## in the frontend. Without for_edit the whole qualified url is returned.
    if resource.get('url_type') == 'upload' and not context.get('for_edit'):
        cleaned_name = munge.munge_filename(url)
        resource['url'] = h.url_for(controller='package',
                                    action='resource_download',
                                    id=resource['package_id'],
                                    resource_id=res.id,
                                    filename=cleaned_name,
                                    qualified=True)
    elif resource['url'] and not urlparse.urlsplit(url).scheme and not context.get('for_edit'):
        resource['url'] = u'http://' + url.lstrip('/')
    return resource
auth.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_GET(self):
        # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e

        qs = urllib_parse.urlsplit(self.path).query
        url_vars = urllib_parse.parse_qs(qs)

        oauth_token = url_vars['oauth_token'][0]
        oauth_verifier = url_vars['oauth_verifier'][0]

        if six.PY2:
            self.server.oauth_token = oauth_token.decode('utf-8')
            self.server.oauth_verifier = oauth_verifier.decode('utf-8')
        else:
            self.server.oauth_token = oauth_token
            self.server.oauth_verifier = oauth_verifier

        assert (isinstance(self.server.oauth_token, six.string_types))
        assert (isinstance(self.server.oauth_verifier, six.string_types))

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        self.wfile.write(html.auth_okay_html)
protocol_loop.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
feedparser.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
feedparser.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
upload2web.py 文件源码 项目:weevely3-stealth 作者: edibledinos 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_env_info(self, script_url):

        script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder')
        if not script_folder: return

        script_url_splitted = urlparse.urlsplit(script_url)
        script_url_path_folder, script_url_path_filename = os.path.split(
            script_url_splitted.path)

        url_folder_pieces = script_url_path_folder.split(os.sep)
        folder_pieces = script_folder.split(os.sep)

        for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)):
            if pieceurl == piecefolder:
                folder_pieces.pop()
                url_folder_pieces.pop()
            else:
                break

        base_url_path_folder = os.sep.join(url_folder_pieces)
        self.base_folder_url = urlparse.urlunsplit(
            script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:])
        self.base_folder_path = os.sep.join(folder_pieces)
altfuncs.py 文件源码 项目:Crunchyroll-XML-Decoder 作者: jaw20 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def gethtml(url):
    with open('cookies') as f:
        cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
        session = requests.session()
        session.cookies = cookies
        del session.cookies['c_visitor']
        if not forceusa and localizecookies:
            session.cookies['c_locale']={u'Español (Espana)' : 'esES', u'Français (France)' : 'frFR', u'Português (Brasil)' : 'ptBR',
                                        u'English' : 'enUS', u'Español' : 'esLA', u'Türkçe' : 'enUS', u'Italiano' : 'itIT',
                                        u'???????' : 'arME' , u'Deutsch' : 'deDE'}[lang]
        if forceusa:
            try:
                session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
            except:
                sleep(10)  # sleep so we don't overload crunblocker
                session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
    parts = urlparse.urlsplit(url)
    if not parts.scheme or not parts.netloc:
        print 'Apparently not a URL'
        sys.exit()
    data = {'Referer': 'http://crunchyroll.com/', 'Host': 'www.crunchyroll.com',
            'User-Agent': 'Mozilla/5.0  Windows NT 6.1; rv:26.0 Gecko/20100101 Firefox/26.0'}
    res = session.get(url, params=data)
    res.encoding = 'UTF-8'
    return res.text
protocol_loop.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
strack.py 文件源码 项目:strack_python_api 作者: cine-use 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, base_url, login, api_key):
        if not base_url.endswith("/"):
            base_url += "/"
        self.__base_url = base_url
        self.__api_key = api_key
        self.__login = login
        self._api_version = "api/v1/"
        self.__unique_code = self.get_unique_code()
        self._scheme, self._server, self._api_base, _, _ = urlparse.urlsplit(base_url)
        self.__sign_code = None
        self.__entity_list = []
        self.__general_doc_dict = None
        self.__logger = None

        # self.function_list = Command(self, "console/FunctionList", [])
        entity_list_params = [
            {"attr": "entity",
             "type": "list",
             "need": False}
        ]
        self._entities_detail = Command(self, "console/entity", entity_list_params)
        self.__init_entities()
ir_qweb.py 文件源码 项目:gooderp_org 作者: osbzr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_html(self, cr, uid, model, field, element, context=None):
        url = element.find('img').get('src')

        url_object = urlparse.urlsplit(url)
        if url_object.path.startswith('/website/image'):
            # url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>]
            fragments = url_object.path.split('/')
            query = dict(urlparse.parse_qsl(url_object.query))
            model = query.get('model', fragments[3])
            oid = query.get('id', fragments[4].split('_')[0])
            field = query.get('field', fragments[5])
            item = self.pool[model].browse(cr, uid, int(oid), context=context)
            return item[field]

        if self.local_url_re.match(url_object.path):
            return self.load_local_url(url)

        return self.load_remote_url(url)
ir_qweb.py 文件源码 项目:gooderp_org 作者: osbzr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_local_url(self, url):
        match = self.local_url_re.match(urlparse.urlsplit(url).path)

        rest = match.group('rest')
        for sep in os.sep, os.altsep:
            if sep and sep != '/':
                rest.replace(sep, '/')

        path = openerp.modules.get_module_resource(
            match.group('module'), 'static', *(rest.split('/')))

        if not path:
            return None

        try:
            with open(path, 'rb') as f:
                # force complete image load to ensure it's valid image data
                image = I.open(f)
                image.load()
                f.seek(0)
                return f.read().encode('base64')
        except Exception:
            logger.exception("Failed to load local image %r", url)
            return None
protocol_alt.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_socket.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -


问题


面经


文章

微信
公众号

扫码关注公众号