python类urlparse()的实例源码

abstract_repository.py 文件源码 项目:poco 作者: shiwaforce 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_remote(url):
        # TODO need a better solution
        o = urlparse.urlparse(url)
        host = o.netloc
        while "@" in host:
            host = host[host.find("@")+1:]
        while ":" in host:
            host = host[:host.find(":")]
        cmd = list()
        cmd.append("ping")
        if platform.system().lower().startswith("win"):
            cmd.append("-n")
            cmd.append("1")
            cmd.append("-w")
            cmd.append("1000")
        else:
            cmd.append("-c1")
            cmd.append("-t1")
        cmd.append(host)

        p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True)
        out, err = p.communicate()

        return len(err) == 0
use_config.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
use_config.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
use_config.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
lint.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_headers(self, headers):
        etag = headers.get('etag')
        if etag is not None:
            if etag.startswith(('W/', 'w/')):
                if etag.startswith('w/'):
                    warn(HTTPWarning('weak etag indicator should be upcase.'),
                         stacklevel=4)
                etag = etag[2:]
            if not (etag[:1] == etag[-1:] == '"'):
                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)

        location = headers.get('location')
        if location is not None:
            if not urlparse(location).netloc:
                warn(HTTPWarning('absolute URLs required for location header'),
                     stacklevel=4)
main.py 文件源码 项目:quartz-browser 作者: ksharindam 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def downloadVideo(self):
        url = unicode(self.tabWidget.currentWidget().url().toString())
        # For youtube videos
        if validYoutubeUrl(url):
            vid_id = parse_qs(urlparse(url).query)['v'][0]
            url = 'https://m.youtube.com/watch?v=' + vid_id
            yt = YouTube(url)    # Use PyTube module for restricted videos
            videos = yt.get_videos()
            dialog = youtube_dialog.YoutubeDialog(videos, self)
            if dialog.exec_() == 1 :
                index = abs(dialog.buttonGroup.checkedId())-2
                vid = videos[index]
                reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) )
                self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension)
            return
        # For embeded HTML5 videos
        request = QNetworkRequest(self.video_URL)
        request.setRawHeader('Referer', self.video_page_url)
        reply = networkmanager.get(request)
        self.handleUnsupportedContent(reply)
util.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
nova_cc_hooks.py 文件源码 项目:charm-nova-cloud-controller 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def neutron_settings():
    neutron_settings = {}
    if is_relation_made('neutron-api', 'neutron-plugin'):
        neutron_api_info = NeutronAPIContext()()
        neutron_settings.update({
            # XXX: Rename these relations settings?
            'quantum_plugin': neutron_api_info['neutron_plugin'],
            'region': config('region'),
            'quantum_security_groups':
            neutron_api_info['neutron_security_groups'],
            'quantum_url': neutron_api_info['neutron_url'],
        })
        neutron_url = urlparse(neutron_settings['quantum_url'])
        neutron_settings['quantum_host'] = neutron_url.hostname
        neutron_settings['quantum_port'] = neutron_url.port
    return neutron_settings
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _should_use_proxy(url, no_proxy=None):
    """Determines whether a proxy should be used to open a connection to the 
    specified URL, based on the value of the no_proxy environment variable.
    @param url: URL
    @type url: basestring or urllib2.Request
    """
    if no_proxy is None:
        no_proxy_effective = os.environ.get('no_proxy', '')
    else:
        no_proxy_effective = no_proxy

    urlObj = urlparse_.urlparse(_url_as_string(url))
    for np in [h.strip() for h in no_proxy_effective.split(',')]:
        if urlObj.hostname == np:
            return False

    return True
cloud_verifier_common.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_query_tag_value(path, query_tag):
    """This is a utility method to query for specific the http parameters in the uri.  

    Returns the value of the parameter, or None if not found."""  
    data = { }
    parsed_path = urlparse(path)
    query_tokens = parsed_path.query.split('&')
    # find the 'ids' query, there can only be one
    for tok in query_tokens:
        query_tok = tok.split('=')
        query_key = query_tok[0]
        if query_key is not None and query_key == query_tag:
            # ids tag contains a comma delimited list of ids
            data[query_tag] = query_tok[1]    
            break        
    return data.get(query_tag,None) 

# sign a message with revocation key.  telling of verification problem
cloud_node.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_query_tag_value(self, path, query_tag):
        """This is a utility method to query for specific the http parameters in the uri.  

        Returns the value of the parameter, or None if not found."""  
        data = { }
        parsed_path = urlparse(self.path)
        query_tokens = parsed_path.query.split('&')
        # find the 'ids' query, there can only be one
        for tok in query_tokens:
            query_tok = tok.split('=')
            query_key = query_tok[0]
            if query_key is not None and query_key == query_tag:
                # ids tag contains a comma delimited list of ids
                data[query_tag] = query_tok[1]    
                break        
        return data.get(query_tag,None)
common.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_restful_params(urlstring):
    """Returns a dictionary of paired RESTful URI parameters"""
    parsed_path = urlparse(urlstring.strip("/"))
    tokens = parsed_path.path.split('/')

    # Be sure we at least have /v#/opt
    if len(tokens) < 2:
        return None

    # Be sure first token is API version
    if len(tokens[0]) == 2 and tokens[0][0] == 'v':
        params = list_to_dict(tokens[1:])
        params["api_version"] = tokens[0][1]
        return params
    else:
        return None
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
archiveurl.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
__init__.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_url(self, url):
        return urlparse(url)
resource.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def load_logging_config_uri(orb, uri, binding=None):
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
    if scheme == "file":
        ossie.utils.log4py.config.fileConfig(path, binding)
    elif scheme == "sca":
        q = dict([x.split("=") for x in query.split("&")])
        try:
            fileSys = orb.string_to_object(q["fs"])
        except KeyError:
            logging.warning("sca URI missing fs query parameter")
        else:
            if fileSys == None:
                logging.warning("Failed to lookup file system")
            else:
                try:
                    t = tempfile.mktemp()

                    tf = open(t, "w+")
                    scaFile = fileSys.open(path, True)
                    fileSize = scaFile.sizeOf()
                    buf = scaFile.read(fileSize)
                    tf.write(buf)
                    tf.close()
                    scaFile.close()

                    ossie.utils.log4py.config.fileConfig(t)
                finally:
                    os.remove(t)
    else:
        # Invalid scheme
        logging.warning("Invalid logging config URI scheme")
__init__.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def GetConfigFileContents( url ):
    fc=None
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
    if scheme == "file":
        try:
            f = open(path,'r')
            fc=""
            for line in f:
                fc += line
            f.close()
        except:
            fc=None
    elif scheme == "sca":
        fc=GetSCAFileContents(url)

    elif scheme == "http":
        fc=GetHTTPFileContents(url)

    elif scheme == "str":
        ## RESOLVE
        if path.startswith("/"):
          fc=path[1:]
        else:
           fc=path
        pass
    else:
        # Invalid scheme
        logging.warning("Invalid logging config URI scheme")

    return fc
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def authenticate_keystone_user(self, keystone, user, password, tenant):
        """Authenticates a regular user with the keystone public endpoint."""
        self.log.debug('Authenticating keystone user ({})...'.format(user))
        ep = keystone.service_catalog.url_for(service_type='identity',
                                              interface='publicURL')
        keystone_ip = urlparse.urlparse(ep).hostname

        return self.authenticate_keystone(keystone_ip, user, password,
                                          project_name=tenant)
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
archiveurl.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.


问题


面经


文章

微信
公众号

扫码关注公众号