def check_remote(url):
# TODO need a better solution
o = urlparse.urlparse(url)
host = o.netloc
while "@" in host:
host = host[host.find("@")+1:]
while ":" in host:
host = host[:host.find(":")]
cmd = list()
cmd.append("ping")
if platform.system().lower().startswith("win"):
cmd.append("-n")
cmd.append("1")
cmd.append("-w")
cmd.append("1000")
else:
cmd.append("-c1")
cmd.append("-t1")
cmd.append(host)
p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True)
out, err = p.communicate()
return len(err) == 0
python类urlparse()的实例源码
def configure(self):
opts = self.options
use_cfg = opts.use_config
if use_cfg is None:
return
url = urlparse(opts.use_config_dir)
kwargs = {}
if url.scheme:
kwargs['download'] = True
kwargs['remote_url'] = url.geturl()
# search first with the exact url, else try with +'/wafcfg'
kwargs['remote_locs'] = ['', DEFAULT_DIR]
tooldir = url.geturl() + ' ' + DEFAULT_DIR
for cfg in use_cfg.split(','):
Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
self.load(cfg, tooldir=tooldir, **kwargs)
self.start_msg('Checking for configuration')
self.end_msg(use_cfg)
def configure(self):
opts = self.options
use_cfg = opts.use_config
if use_cfg is None:
return
url = urlparse(opts.use_config_dir)
kwargs = {}
if url.scheme:
kwargs['download'] = True
kwargs['remote_url'] = url.geturl()
# search first with the exact url, else try with +'/wafcfg'
kwargs['remote_locs'] = ['', DEFAULT_DIR]
tooldir = url.geturl() + ' ' + DEFAULT_DIR
for cfg in use_cfg.split(','):
Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
self.load(cfg, tooldir=tooldir, **kwargs)
self.start_msg('Checking for configuration')
self.end_msg(use_cfg)
def configure(self):
opts = self.options
use_cfg = opts.use_config
if use_cfg is None:
return
url = urlparse(opts.use_config_dir)
kwargs = {}
if url.scheme:
kwargs['download'] = True
kwargs['remote_url'] = url.geturl()
# search first with the exact url, else try with +'/wafcfg'
kwargs['remote_locs'] = ['', DEFAULT_DIR]
tooldir = url.geturl() + ' ' + DEFAULT_DIR
for cfg in use_cfg.split(','):
Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
self.load(cfg, tooldir=tooldir, **kwargs)
self.start_msg('Checking for configuration')
self.end_msg(use_cfg)
def check_headers(self, headers):
etag = headers.get('etag')
if etag is not None:
if etag.startswith(('W/', 'w/')):
if etag.startswith('w/'):
warn(HTTPWarning('weak etag indicator should be upcase.'),
stacklevel=4)
etag = etag[2:]
if not (etag[:1] == etag[-1:] == '"'):
warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)
location = headers.get('location')
if location is not None:
if not urlparse(location).netloc:
warn(HTTPWarning('absolute URLs required for location header'),
stacklevel=4)
def downloadVideo(self):
url = unicode(self.tabWidget.currentWidget().url().toString())
# For youtube videos
if validYoutubeUrl(url):
vid_id = parse_qs(urlparse(url).query)['v'][0]
url = 'https://m.youtube.com/watch?v=' + vid_id
yt = YouTube(url) # Use PyTube module for restricted videos
videos = yt.get_videos()
dialog = youtube_dialog.YoutubeDialog(videos, self)
if dialog.exec_() == 1 :
index = abs(dialog.buttonGroup.checkedId())-2
vid = videos[index]
reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) )
self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension)
return
# For embeded HTML5 videos
request = QNetworkRequest(self.video_URL)
request.setRawHeader('Referer', self.video_page_url)
reply = networkmanager.get(request)
self.handleUnsupportedContent(reply)
def _add_query_parameter(url, name, value):
"""Adds a query parameter to a url.
Replaces the current value if it already exists in the URL.
Args:
url: string, url to add the query parameter to.
name: string, query parameter name.
value: string, query parameter value.
Returns:
Updated query parameter. Does not update the url if value is None.
"""
if value is None:
return url
else:
parsed = list(urlparse.urlparse(url))
q = dict(parse_qsl(parsed[4]))
q[name] = value
parsed[4] = urllib.urlencode(q)
return urlparse.urlunparse(parsed)
def neutron_settings():
neutron_settings = {}
if is_relation_made('neutron-api', 'neutron-plugin'):
neutron_api_info = NeutronAPIContext()()
neutron_settings.update({
# XXX: Rename these relations settings?
'quantum_plugin': neutron_api_info['neutron_plugin'],
'region': config('region'),
'quantum_security_groups':
neutron_api_info['neutron_security_groups'],
'quantum_url': neutron_api_info['neutron_url'],
})
neutron_url = urlparse(neutron_settings['quantum_url'])
neutron_settings['quantum_host'] = neutron_url.hostname
neutron_settings['quantum_port'] = neutron_url.port
return neutron_settings
def _should_use_proxy(url, no_proxy=None):
"""Determines whether a proxy should be used to open a connection to the
specified URL, based on the value of the no_proxy environment variable.
@param url: URL
@type url: basestring or urllib2.Request
"""
if no_proxy is None:
no_proxy_effective = os.environ.get('no_proxy', '')
else:
no_proxy_effective = no_proxy
urlObj = urlparse_.urlparse(_url_as_string(url))
for np in [h.strip() for h in no_proxy_effective.split(',')]:
if urlObj.hostname == np:
return False
return True
def get_query_tag_value(path, query_tag):
"""This is a utility method to query for specific the http parameters in the uri.
Returns the value of the parameter, or None if not found."""
data = { }
parsed_path = urlparse(path)
query_tokens = parsed_path.query.split('&')
# find the 'ids' query, there can only be one
for tok in query_tokens:
query_tok = tok.split('=')
query_key = query_tok[0]
if query_key is not None and query_key == query_tag:
# ids tag contains a comma delimited list of ids
data[query_tag] = query_tok[1]
break
return data.get(query_tag,None)
# sign a message with revocation key. telling of verification problem
def get_query_tag_value(self, path, query_tag):
"""This is a utility method to query for specific the http parameters in the uri.
Returns the value of the parameter, or None if not found."""
data = { }
parsed_path = urlparse(self.path)
query_tokens = parsed_path.query.split('&')
# find the 'ids' query, there can only be one
for tok in query_tokens:
query_tok = tok.split('=')
query_key = query_tok[0]
if query_key is not None and query_key == query_tag:
# ids tag contains a comma delimited list of ids
data[query_tag] = query_tok[1]
break
return data.get(query_tag,None)
def get_restful_params(urlstring):
"""Returns a dictionary of paired RESTful URI parameters"""
parsed_path = urlparse(urlstring.strip("/"))
tokens = parsed_path.path.split('/')
# Be sure we at least have /v#/opt
if len(tokens) < 2:
return None
# Be sure first token is API version
if len(tokens[0]) == 2 and tokens[0][0] == 'v':
params = list_to_dict(tokens[1:])
params["api_version"] = tokens[0][1]
return params
else:
return None
def file_to_url(self, file_rel_path):
"""Convert a relative file path to a file URL."""
_abs_path = os.path.abspath(file_rel_path)
return urlparse.urlparse(_abs_path, scheme='file').geturl()
def download(self, source, dest):
"""
Download an archive file.
:param str source: URL pointing to an archive file.
:param str dest: Local path location to download archive file to.
"""
# propogate all exceptions
# URLError, OSError, etc
proto, netloc, path, params, query, fragment = urlparse(source)
if proto in ('http', 'https'):
auth, barehost = splituser(netloc)
if auth is not None:
source = urlunparse((proto, barehost, path, params, query, fragment))
username, password = splitpasswd(auth)
passman = HTTPPasswordMgrWithDefaultRealm()
# Realm is set to None in add_password to force the username and password
# to be used whatever the realm
passman.add_password(None, source, username, password)
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):
os.unlink(dest)
raise e
# Mandatory file validation via Sha1 or MD5 hashing.
def parse_url(self, url):
return urlparse(url)
def load_logging_config_uri(orb, uri, binding=None):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
if scheme == "file":
ossie.utils.log4py.config.fileConfig(path, binding)
elif scheme == "sca":
q = dict([x.split("=") for x in query.split("&")])
try:
fileSys = orb.string_to_object(q["fs"])
except KeyError:
logging.warning("sca URI missing fs query parameter")
else:
if fileSys == None:
logging.warning("Failed to lookup file system")
else:
try:
t = tempfile.mktemp()
tf = open(t, "w+")
scaFile = fileSys.open(path, True)
fileSize = scaFile.sizeOf()
buf = scaFile.read(fileSize)
tf.write(buf)
tf.close()
scaFile.close()
ossie.utils.log4py.config.fileConfig(t)
finally:
os.remove(t)
else:
# Invalid scheme
logging.warning("Invalid logging config URI scheme")
def GetConfigFileContents( url ):
fc=None
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
if scheme == "file":
try:
f = open(path,'r')
fc=""
for line in f:
fc += line
f.close()
except:
fc=None
elif scheme == "sca":
fc=GetSCAFileContents(url)
elif scheme == "http":
fc=GetHTTPFileContents(url)
elif scheme == "str":
## RESOLVE
if path.startswith("/"):
fc=path[1:]
else:
fc=path
pass
else:
# Invalid scheme
logging.warning("Invalid logging config URI scheme")
return fc
def authenticate_keystone_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with the keystone public endpoint."""
self.log.debug('Authenticating keystone user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
interface='publicURL')
keystone_ip = urlparse.urlparse(ep).hostname
return self.authenticate_keystone(keystone_ip, user, password,
project_name=tenant)
def file_to_url(self, file_rel_path):
"""Convert a relative file path to a file URL."""
_abs_path = os.path.abspath(file_rel_path)
return urlparse.urlparse(_abs_path, scheme='file').geturl()
def download(self, source, dest):
"""
Download an archive file.
:param str source: URL pointing to an archive file.
:param str dest: Local path location to download archive file to.
"""
# propogate all exceptions
# URLError, OSError, etc
proto, netloc, path, params, query, fragment = urlparse(source)
if proto in ('http', 'https'):
auth, barehost = splituser(netloc)
if auth is not None:
source = urlunparse((proto, barehost, path, params, query, fragment))
username, password = splitpasswd(auth)
passman = HTTPPasswordMgrWithDefaultRealm()
# Realm is set to None in add_password to force the username and password
# to be used whatever the realm
passman.add_password(None, source, username, password)
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'wb') as dest_file:
dest_file.write(response.read())
except Exception as e:
if os.path.isfile(dest):
os.unlink(dest)
raise e
# Mandatory file validation via Sha1 or MD5 hashing.