python类urlunparse()的实例源码

utils.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
base.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate_link(self, link, bookmark=False):
        """Checks if the given link can get correct data."""
        # removes the scheme and net location parts of the link
        url_parts = list(urlparse.urlparse(link))
        url_parts[0] = url_parts[1] = ''

        # bookmark link should not have the version in the URL
        if bookmark and url_parts[2].startswith(PATH_PREFIX):
            return False

        full_path = urlparse.urlunparse(url_parts)
        try:
            self.get_json(full_path, path_prefix='')
            return True
        except Exception:
            return False
processors.py 文件源码 项目:portia2code 作者: scrapinghub 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def extract_image_url(text):
    text = _strip_url(text)
    imgurl = None
    if text:
        # check if the text is style content
        match = _CSS_IMAGERE.search(text)
        text = match.groups()[0] if match else text
        parsed = urlparse(text)
        path = None
        match = _IMAGE_PATH_RE.search(parsed.path)
        if match:
            path = match.group()
        elif parsed.query:
            match = _GENERIC_PATH_RE.search(parsed.path)
            if match:
                path = match.group()
        if path is not None:
            parsed = list(parsed)
            parsed[2] = path
            imgurl = urlunparse(parsed)
        if not imgurl:
            imgurl = text
    return imgurl
rw_handles.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _fix_esx_url(url, host, port):
        """Fix netloc in the case of an ESX host.

        In the case of an ESX host, the netloc is set to '*' in the URL
        returned in HttpNfcLeaseInfo. It should be replaced with host name
        or IP address.
        """
        urlp = urlparse.urlparse(url)
        if urlp.netloc == '*':
            scheme, netloc, path, params, query, fragment = urlp
            if netutils.is_valid_ipv6(host):
                netloc = '[%s]:%d' % (host, port)
            else:
                netloc = "%s:%d" % (host, port)
            url = urlparse.urlunparse((scheme,
                                       netloc,
                                       path,
                                       params,
                                       query,
                                       fragment))
        return url
auth.py 文件源码 项目:myautotest 作者: auuppp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _decorate_request(self, filters, method, url, headers=None, body=None,
                          auth_data=None):
        if auth_data is None:
            auth_data = self.auth_data
        token, _ = auth_data
        #base_url = self.base_url(filters=filters, auth_data=auth_data)
        base_url = url
        # build authenticated request
        # returns new request, it does not touch the original values
        _headers = copy.deepcopy(headers) if headers is not None else {}
        _headers['X-Auth-Token'] = str(token)
        if url is None or url == "":
            _url = base_url
        else:
            # Join base URL and url, and remove multiple contiguous slashes
            _url = "/".join([base_url, url])
            parts = [x for x in urlparse.urlparse(_url)]
            parts[2] = re.sub("/{2,}", "/", parts[2])
            _url = urlparse.urlunparse(parts)
        # no change to method or body
        return str(_url), _headers, body
command.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def config():
    answer = input("Please enter the URL for your custom Quilt registry (ask your administrator),\n" +
                   "or leave this line blank to use the default registry: ")
    if answer:
        url = urlparse(answer.rstrip('/'))
        if (url.scheme not in ['http', 'https'] or not url.netloc or
            url.path or url.params or url.query or url.fragment):
            raise CommandException("Invalid URL: %s" % answer)
        canonical_url = urlunparse(url)
    else:
        # When saving the config, store '' instead of the actual URL in case we ever change it.
        canonical_url = ''

    cfg = _load_config()
    cfg['registry_url'] = canonical_url
    _save_config(cfg)

    # Clear the cached URL.
    global _registry_url
    _registry_url = None
protocol.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ticket_request_url(
        url, fmt=None, reference_name=None, reference_md5=None,
        start=None, end=None, fields=None, tags=None, notags=None,
        data_format=None):
    parsed_url = urlparse(url)
    get_vars = parse_qs(parsed_url.query)
    # TODO error checking
    if reference_name is not None:
        get_vars["referenceName"] = reference_name
    if reference_md5 is not None:
        get_vars["referenceMD5"] = reference_md5
    if start is not None:
        get_vars["start"] = int(start)
    if end is not None:
        get_vars["end"] = int(end)
    if data_format is not None:
        get_vars["format"] = data_format.upper()
    # if fields is not None:
    #     get_vars["fields"] = ",".join(fields)
    # if tags is not None:
    #     get_vars["tags"] = ",".join(tags)
    # if notags is not None:
    #     get_vars["notags"] = ",".join(notags)
    new_url = list(parsed_url)
    new_url[4] = urlencode(get_vars, doseq=True)
    return urlunparse(new_url)
protocol.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self):
        self.__retry(self._handle_ticket_request)
        self.data_format = self.ticket.get("format", "BAM")
        self.md5 = self.ticket.get("md5", None)
        for url_object in self.ticket["urls"]:
            url = urlparse(url_object["url"])
            if url.scheme.startswith("http"):
                headers = url_object.get("headers", "")
                self.__retry(self._handle_http_url, urlunparse(url), headers)
            elif url.scheme == "data":
                self._handle_data_uri(url)
            else:
                raise ValueError("Unsupported URL scheme:{}".format(url.scheme))
db.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, url, conf):
        db_name = 'panko_%s' % uuidutils.generate_uuid(dashed=False)
        engine = sqlalchemy.create_engine(url)
        conn = engine.connect()
        self._create_database(conn, db_name)
        conn.close()
        engine.dispose()
        parsed = list(urlparse.urlparse(url))
        parsed[2] = '/' + db_name
        self.url = urlparse.urlunparse(parsed)
        self.conf = conf
push.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def iter_json_batches(inputs, base_url, batch_size, keep_instance_path):
    parsed_base_url = urlparse(base_url)

    current_uri = None
    current_batch = []

    for href, resource in inputs:
        # Skip over links-only (discovery) resources
        if resource.keys() == ["_links"]:
            continue

        # Inject the base URL's scheme and netloc; `urljoin` should do exactly this operation,
        # but actually won't if the right-hand-side term defines its own netloc
        parsed_href = urlparse(href)
        uri = urlunparse(parsed_href._replace(
            scheme=parsed_base_url.scheme,
            netloc=parsed_base_url.netloc,
        ))

        if batch_size == 1:
            yield (uri, [resource])
        else:
            # batch handling
            if keep_instance_path:
                collection_uri = uri.rsplit("?", 1)[0]
            else:
                collection_uri = uri.rsplit("/", 1)[0]

            if any((
                    current_uri is not None and current_uri != collection_uri,
                    len(current_batch) >= batch_size,
            )):
                yield (current_uri, current_batch)
                current_batch = []

            current_uri = collection_uri
            current_batch.append(resource)

    if current_batch:
        yield (current_uri, current_batch)
ws_client.py 文件源码 项目:python-base 作者: kubernetes-client 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_websocket_url(url):
    parsed_url = urlparse(url)
    parts = list(parsed_url)
    if parsed_url.scheme == 'http':
        parts[0] = 'ws'
    elif parsed_url.scheme == 'https':
        parts[0] = 'wss'
    return urlunparse(parts)
cluster.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _build_conf_providers(self):

        def _schemed_url(uri):
            uri = uri.strip('/')
            return urlparse.urlparse(
                uri if uri.startswith('http') else
                "%s://%s" % (self._http_provider.default_scheme, uri))

        conf_urls = self.nsxlib_config.nsx_api_managers[:]
        urls = []
        providers = []
        provider_index = -1
        for conf_url in conf_urls:
            provider_index += 1
            conf_url = _schemed_url(conf_url)
            if conf_url in urls:
                LOG.warning("'%s' already defined in configuration file. "
                            "Skipping.", urlparse.urlunparse(conf_url))
                continue
            urls.append(conf_url)
            providers.append(
                Provider(
                    conf_url.netloc,
                    urlparse.urlunparse(conf_url),
                    self.nsxlib_config.username(provider_index),
                    self.nsxlib_config.password(provider_index),
                    self.nsxlib_config.ca_file(provider_index)))
        return providers
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_endpoint_from_parsed_url(parsed_url):
        url_list = [(lambda: x if e < 2 else '')() for e, x in
                    enumerate(list(parsed_url))]
        return urlparse.urlunparse(url_list)
url.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _build_url(url, _params):
    """Build the actual URL to use."""

    # Support for unicode domain names and paths.
    scheme, netloc, path, params, query, fragment = urlparse(url)
    netloc = netloc.encode('idna').decode('utf-8')
    if not path:
        path = '/'

    if six.PY2:
        if isinstance(scheme, six.text_type):
            scheme = scheme.encode('utf-8')
        if isinstance(netloc, six.text_type):
            netloc = netloc.encode('utf-8')
        if isinstance(path, six.text_type):
            path = path.encode('utf-8')
        if isinstance(params, six.text_type):
            params = params.encode('utf-8')
        if isinstance(query, six.text_type):
            query = query.encode('utf-8')
        if isinstance(fragment, six.text_type):
            fragment = fragment.encode('utf-8')

    enc_params = _encode_params(_params)
    if enc_params:
        if query:
            query = '%s&%s' % (query, enc_params)
        else:
            query = enc_params
    url = (urlunparse([scheme, netloc, path, params, query, fragment]))
    return url
auth_plugin.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_plugin(self, session, version, url, raw_status=None):
        """Handle default Keystone endpoint configuration

        Build the actual API endpoint from the scheme, host and port of the
        original auth URL and the rest from the returned version URL.
        """

        ver_u = urlparse.urlparse(url)

        # Only hack this if it is the default setting
        if ver_u.netloc.startswith('localhost'):
            auth_u = urlparse.urlparse(self.auth_url)
            # from original auth_url: scheme, netloc
            # from api_url: path, query (basically, the rest)
            url = urlparse.urlunparse((
                auth_u.scheme,
                auth_u.netloc,
                ver_u.path,
                ver_u.params,
                ver_u.query,
                ver_u.fragment,
            ))
            LOG.debug('Version URL updated: %s', url)

        return super(OSCGenericPassword, self).create_plugin(
            session=session,
            version=version,
            url=url,
            raw_status=raw_status,
        )
tools.py 文件源码 项目:reahl 作者: reahl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def relative(self, url_string):
        url_bits = urllib_parse.urlparse(url_string)
        return urllib_parse.urlunparse(('', '', url_bits.path, url_bits.params, url_bits.query, url_bits.fragment))
ajaxdetail_sign.py 文件源码 项目:Spider 作者: poluo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start_requests(self):
        """
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        """
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        )
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        print(url)
        request = Request(url=url, callback=self.parse_entry_page)
        request.headers[
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
        request.headers[
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = '115.28.36.253:8000'
        request.headers['DNT'] = 1
        # request.cookies['token'] = '4TO4N49X81'

        yield request
list_extract_pagination.py 文件源码 项目:Spider 作者: poluo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_requests(self):
        """
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        """
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        )
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        request = Request(url=url, callback=self.parse_entry_page)
        request.headers[
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
        request.headers[
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = '115.28.36.253:8000'
        request.headers['DNT'] = 1
        yield request
regex_extract.py 文件源码 项目:Spider 作者: poluo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start_requests(self):
        """
        In the scrapy doc there are two ways to tell scrapy where to begin to
        crawl from.  One is start_requests, the other is start_urls which is
        shortcut to the start_requestt.

        Based on my experience, it is better to use start_requests instead of
        start_urls bacause in this methods you can know how the request object
        are created and how request is yield. You should keep it simple and
        try not to use some magic or it might confuse you.

        In this project, you have no need to change code in this method, just
        modify code in parse_entry_page

        If you fully understatnd how scrapy work, then you are free to choose
        between start_requests and start_urls.
        """
        prefix = self.settings["WEB_APP_PREFIX"]
        result = parse.urlparse(prefix)
        base_url = parse.urlunparse(
            (result.scheme, result.netloc, "", "", "", "")
        )
        # Generate start url from config and self.entry, when you paste the code
        # to another spider you can just change self.entry and self.taskid
        url = parse.urljoin(base_url, self.entry)
        print(url)
        request = Request(url=url, callback=self.parse_entry_page)
        request.headers[
            'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
        request.headers[
            'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
        request.headers['Accept-Encoding'] = 'gzip, deflate, sdch'
        request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
        request.headers['Connection'] = 'keep-alive'
        request.headers['Host'] = '115.28.36.253:8000'
        request.headers['DNT'] = 1
        yield request
utils.py 文件源码 项目:fulmar 作者: tylderen 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def build_url(url, _params):
    """Build the actual URL to use."""

    # Support for unicode domain names and paths.
    scheme, netloc, path, params, query, fragment = urlparse(url)
    netloc = netloc.encode('idna').decode('utf-8')
    if not path:
        path = '/'

    if six.PY2:
        if isinstance(scheme, six.text_type):
            scheme = scheme.encode('utf-8')
        if isinstance(netloc, six.text_type):
            netloc = netloc.encode('utf-8')
        if isinstance(path, six.text_type):
            path = path.encode('utf-8')
        if isinstance(params, six.text_type):
            params = params.encode('utf-8')
        if isinstance(query, six.text_type):
            query = query.encode('utf-8')
        if isinstance(fragment, six.text_type):
            fragment = fragment.encode('utf-8')

    enc_params = encode_params(_params)
    if enc_params:
        if query:
            query = '%s&%s' % (query, enc_params)
        else:
            query = enc_params
    url = (urlunparse([scheme, netloc, path, params, query, fragment]))
    return url
studios.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _parse_contents(self, response):
        # Wix pages aren't really parseable, so anytime we see them,
        # let's re-run it (depth-1) with an escaped-fragment to get the real html source
        if 'https://static.wixstatic.com/' in response.body and '_escaped_fragment_' not in response.url:
            parsed_url = urlparse(response.url)
            qs = parse_qs(parsed_url.query)
            qs['_escaped_fragment_'] = ''
            wix_scrapeable_url = urlunparse(
                (parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, urlencode(qs), parsed_url.fragment)
            )
            response.meta['depth'] -= 1
            return [scrapy.Request(wix_scrapeable_url, self.parse)]

        return
        if not hasattr(response, 'selector'):
            logging.info('Skipping unknown file from: %s', response.url)
            return
        # Get all text contents of tags (unless they are script or style tags)
        text_contents = ' '.join(response.selector.xpath('//*[not(self::script|self::style)]/text()').extract()).lower()

        processed_text = event_classifier.StringProcessor(text_contents, regex_keywords.WORD_BOUNDARIES)
        wrong = processed_text.get_tokens(keywords.DANCE_WRONG_STYLE)
        good = processed_text.get_tokens(rules.STREET_STYLE)
        if (wrong or good):
            #print response.url, set(wrong), set(good)
            pass
resource_collector.py 文件源码 项目:python-percy 作者: teeberg 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def strip_fragment(self, path):
        result = urlparse(path)
        result = result._replace(fragment='')
        return six.text_type(urlunparse(result))
utils.py 文件源码 项目:python-zeep 作者: mvantellingen 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def url_http_to_https(value):
    parts = urlparse(value)
    if parts.scheme != 'http':
        return value

    # Check if the url contains ':80' and remove it if that is the case
    netloc_parts = parts.netloc.rsplit(':', 1)
    if len(netloc_parts) == 2 and netloc_parts[1] == '80':
        netloc = netloc_parts[0]
    else:
        netloc = parts.netloc
    return urlunparse(('https', netloc) + parts[2:])
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def expand_redirect_url(starting_url, key, bucket):
    """ Add key and bucket parameters to starting URL query string. """
    parsed = urlparse.urlparse(starting_url)
    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))
    query.update([('key', key), ('bucket', bucket)])

    redirect_url = urlparse.urlunparse((
        parsed.scheme, parsed.netloc, parsed.path,
        parsed.params, urlparse.urlencode(query), None))

    return redirect_url
depotcontroller.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_repo_url(self):
                return urlunparse(("file", "", pathname2url(
                    self.__dir), "", "", ""))
transaction.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __new__(cls, origin_url, create_repo=False, pkg_name=None,
            repo_props=EmptyDict, trans_id=None, noexecute=False, xport=None,
            pub=None, progtrack=None):

                scheme, netloc, path, params, query, fragment = \
                    urlparse(origin_url, "http", allow_fragments=0)
                scheme = scheme.lower()

                if noexecute:
                        scheme = "null"
                if scheme != "null" and (not xport or not pub):
                        raise TransactionError("Caller must supply transport "
                            "and publisher.")
                if scheme not in cls.__schemes:
                        raise TransactionRepositoryURLError(origin_url,
                            scheme=scheme)
                if scheme.startswith("http") and not netloc:
                        raise TransactionRepositoryURLError(origin_url,
                            netloc=None)
                if scheme.startswith("file"):
                        if netloc:
                                raise TransactionRepositoryURLError(origin_url,
                                    msg="'{0}' contains host information, which "
                                    "is not supported for filesystem "
                                    "operations.".format(netloc))
                        # as we're urlunparsing below, we need to ensure that
                        # the path starts with only one '/' character, if any
                        # are present
                        if path.startswith("/"):
                                path = "/" + path.lstrip("/")
                        elif not path:
                                raise TransactionRepositoryURLError(origin_url)

                # Rebuild the url with the sanitized components.
                origin_url = urlunparse((scheme, netloc, path, params,
                    query, fragment))

                return cls.__schemes[scheme](origin_url,
                    create_repo=create_repo, pkg_name=pkg_name,
                    repo_props=repo_props, trans_id=trans_id, xport=xport,
                    pub=pub, progtrack=progtrack)
misc.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_uri(uri, cwd=None):
        """Parse the repository location provided and attempt to transform it
        into a valid repository URI.

        'cwd' is the working directory to use to turn paths into an absolute
        path.  If not provided, the current working directory is used.
        """

        if uri.find("://") == -1 and not uri.startswith("file:/"):
                # Convert the file path to a URI.
                if not cwd:
                        uri = os.path.abspath(uri)
                elif not os.path.isabs(uri):
                        uri = os.path.normpath(os.path.join(cwd, uri))

                uri = urlunparse(("file", "",
                    pathname2url(uri), "", "", ""))

        scheme, netloc, path, params, query, fragment = \
            urlparse(uri, "file", allow_fragments=0)
        scheme = scheme.lower()

        if scheme == "file":
                # During urlunparsing below, ensure that the path starts with
                # only one '/' character, if any are present.
                if path.startswith("/"):
                        path = "/" + path.lstrip("/")

        # Rebuild the URI with the sanitized components.
        return urlunparse((scheme, netloc, path, params,
            query, fragment))
http.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execute(self, http=None, num_retries=0):
    """Execute the request.

    Args:
      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry 500's with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

    Returns:
      A deserialized object model of the response body as determined
      by the postproc.

    Raises:
      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    """
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
           None)
          )
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
      callback(resp)
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)
http.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _serialize_request(self, request):
    """Convert an HttpRequest object into a string.

    Args:
      request: HttpRequest, the request to serialize.

    Returns:
      The request as a string in application/http format.
    """
    # Construct status line
    parsed = urlparse(request.uri)
    request_line = urlunparse(
        ('', '', parsed.path, parsed.params, parsed.query, '')
        )
    status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
    major, minor = request.headers.get('content-type', 'application/json').split('/')
    msg = MIMENonMultipart(major, minor)
    headers = request.headers.copy()

    if request.http is not None and hasattr(request.http.request,
        'credentials'):
      request.http.request.credentials.apply(headers)

    # MIMENonMultipart adds its own Content-Type header.
    if 'content-type' in headers:
      del headers['content-type']

    for key, value in six.iteritems(headers):
      msg[key] = value
    msg['Host'] = parsed.netloc
    msg.set_unixfrom(None)

    if request.body is not None:
      msg.set_payload(request.body)
      msg['content-length'] = str(len(request.body))

    # Serialize the mime message.
    fp = StringIO()
    # maxheaderlen=0 means don't line wrap headers.
    g = Generator(fp, maxheaderlen=0)
    g.flatten(msg, unixfrom=False)
    body = fp.getvalue()

    return status_line + body
http.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute(self, http=None, num_retries=0):
    """Execute the request.

    Args:
      http: httplib2.Http, an http object to be used in place of the
            one the HttpRequest request object was constructed with.
      num_retries: Integer, number of times to retry with randomized
            exponential backoff. If all retries fail, the raised HttpError
            represents the last request. If zero (default), we attempt the
            request only once.

    Returns:
      A deserialized object model of the response body as determined
      by the postproc.

    Raises:
      googleapiclient.errors.HttpError if the response was not a 2xx.
      httplib2.HttpLib2Error if a transport error has occured.
    """
    if http is None:
      http = self.http

    if self.resumable:
      body = None
      while body is None:
        _, body = self.next_chunk(http=http, num_retries=num_retries)
      return body

    # Non-resumable case.

    if 'content-length' not in self.headers:
      self.headers['content-length'] = str(self.body_size)
    # If the request URI is too long then turn it into a POST request.
    # Assume that a GET request never contains a request body.
    if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
      self.method = 'POST'
      self.headers['x-http-method-override'] = 'GET'
      self.headers['content-type'] = 'application/x-www-form-urlencoded'
      parsed = urlparse(self.uri)
      self.uri = urlunparse(
          (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
           None)
          )
      self.body = parsed.query
      self.headers['content-length'] = str(len(self.body))

    # Handle retries for server-side errors.
    resp, content = _retry_request(
          http, num_retries, 'request', self._sleep, self._rand, str(self.uri),
          method=str(self.method), body=self.body, headers=self.headers)

    for callback in self.response_callbacks:
      callback(resp)
    if resp.status >= 300:
      raise HttpError(resp, content, uri=self.uri)
    return self.postproc(resp, content)


问题


面经


文章

微信
公众号

扫码关注公众号