def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
#urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml")))
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
query_services_client = Client(query_services_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
user_services_client = Client(userservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
self.test_user_services_object = SymantecUserServices(user_services_client)
python类pathname2url()的实例源码
def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
managementservices_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
# managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
self.management_client = Client(managementservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt',
'vip_certificate.crt'))
self.test_management_services_object = SymantecManagementServices(self.management_client)
pass
def normalize_cdmi_url(self, path):
"""Normalize URL path relative to current path and return.
:arg path: path relative to current path
:returns: absolute CDMI URL
"""
# Turn URL path into OS path for manipulation
mypath = url2pathname(path)
if not os.path.isabs(mypath):
mypath = os.path.join(url2pathname(self.pwd()), mypath)
# normalize path
mypath = os.path.normpath(mypath)
if path.endswith(os.path.sep) and not mypath.endswith(os.path.sep):
mypath += os.path.sep
# if isinstance(mypath, str):
# mypath = mypath.encode('utf8')
url = self.cdmi_url + pathname2url(mypath)
return url
def AddSensor(self, name, description, device, args):
info('AddSensor: {}, {}, {}, {}'.format(name, description, device, args))
bVal = False
try:
sensorAdd = {}
if name:
sensorAdd['name'] = req.pathname2url(name)
if device:
sensorAdd['device'] = device
if args:
sensorAdd['args'] = args
if description:
sensorAdd['description'] = description
with self.sensorMutex:
retValue = manager.addDeviceJSON(sensorAdd)
info('Add device returned: {}'.format(retValue))
if retValue[0] == 200:
bVal = True
self.AddRefresh()
except:
bVal = False
return bVal
common.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def file_path_to_url(path):
"""
converts an absolute native path to a FILE URL.
Parameters
----------
path : a path in native format
Returns
-------
a valid FILE URL
"""
return urljoin('file:', pathname2url(path))
# ZipFile is not a context manager for <= 2.6
# must be tuple index here since 2.6 doesn't use namedtuple for version_info
def test_open_url(self):
if sys.version_info[0] < 3:
p2url = urllib.pathname2url
else:
p2url = urllib2.pathname2url
fpath = os.path.join(os.path.dirname(__file__), "resources")
fpath = os.path.abspath(fpath)
tfile = os.path.join(fpath, "rwopstest.txt")
urlpath = "file:%s" % p2url(tfile)
resfile = resources.open_url(urlpath)
self.assertIsNotNone(resfile)
tfile = os.path.join(fpath, "invalid")
urlpath = "file:%s" % p2url(tfile)
self.assertRaises(urllib2.URLError, resources.open_url, urlpath)
def build_resources(self):
resources = []
if not self.root_dir:
return resources
for root, dirs, files in os.walk(self.root_dir, followlinks=True):
for file_name in files:
path = os.path.join(root, file_name)
if os.path.getsize(path) > MAX_FILESIZE_BYTES:
continue
with open(path, 'rb') as f:
content = f.read()
path_for_url = pathname2url(path.replace(self.root_dir, '', 1))
if self.base_url[-1] == '/' and path_for_url[0] == '/':
path_for_url = path_for_url.replace('/', '' , 1)
resource_url = "{0}{1}".format(self.base_url, path_for_url)
resource = percy.Resource(
resource_url=resource_url,
sha=utils.sha256hash(content),
local_path=os.path.abspath(path),
)
resources.append(resource)
return resources
sdl2ext_resources_test.py 文件源码
项目:inventwithpython_pysdl2
作者: rswinkle
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_open_url(self):
if sys.version_info[0] < 3:
p2url = urllib.pathname2url
else:
p2url = urllib2.pathname2url
fpath = os.path.join(os.path.dirname(__file__), "resources")
fpath = os.path.abspath(fpath)
tfile = os.path.join(fpath, "rwopstest.txt")
urlpath = "file:%s" % p2url(tfile)
resfile = resources.open_url(urlpath)
self.assertIsNotNone(resfile)
tfile = os.path.join(fpath, "invalid")
urlpath = "file:%s" % p2url(tfile)
self.assertRaises(urllib2.URLError, resources.open_url, urlpath)
def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
query_services_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
userservices_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))
managementservices_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
query_services_client = Client(query_services_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
user_services_client = Client(userservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
management_client = Client(managementservices_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
# get_user_info_result = query_services_client.service.getUserInfo(requestId="123123", userId="y1196293")
test_user_services_object = SymantecUserServices(user_services_client)
test_query_services_object = SymantecQueryServices(query_services_client)
test_management_services_object = SymantecManagementServices(management_client)
self.test_services = SymantecServices(query_services_client, management_client, user_services_client)
def setUp(self):
# the URLs for now which will have the WSDL files and the XSD file
import urllib
import os
from urllib.parse import urlparse
from urllib.request import pathname2url
query_services_url = urllib.parse.urljoin('file:', pathname2url(
os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
# query_services_url = 'http://webdev.cse.msu.edu/~yehanlin/vip/vipuserservices-query-1.7.wsdl'
# userservices_url = 'http://webdev.cse.msu.edu/~morcoteg/Symantec/WSDL/vipuserservices-auth-1.7.wsdl'
# managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'
# initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
self.query_services_client = Client(query_services_url,
transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
# user_services_client = Client(userservices_url,
# transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
# management_client = Client(managementservices_url,
# transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
# get_user_info_result = query_services_client.service.getUserInfo(requestId="123123", userId="y1196293")
# test_user_services_object = SymantecUserServices(user_services_client)
self.test_query_services = SymantecQueryServices(self.query_services_client)
# test_management_services_object = SymantecManagementServices(management_client)
# self.test_services = SymantecServices(query_services_client, management_client, user_services_client)
pass
def path2url(path):
return urljoin('file:', pathname2url(path))
def path2url(path):
return urljoin('file:', urllib.pathname2url(path))
def filename_to_uri(self, path: str) -> str:
return urljoin('file:', pathname2url(path))
def path2url(path):
return urlparse.urljoin(
'file:', pathname2url(path))
def path2url(path):
return urlparse.urljoin(
'file:', pathname2url(path))
def pathname2fileurl(pathname):
"""Returns a file:// URL for pathname. Handles OS-specific conversions."""
return urljoin('file:', pathname2url(pathname))
def pathname2fileurl(pathname):
"""Returns a file:// URL for pathname. Handles OS-specific conversions."""
return urljoin('file:', pathname2url(pathname))
def constructLocalFileUrl(self, filePath):
filePath = os.path.abspath(filePath)
try:
filePath.encode("utf-8")
except UnicodeEncodeError:
raise unittest.SkipTest("filePath is not encodable to utf8")
return "file://%s" % urllib_request.pathname2url(filePath)
def test_basic(self):
# Make sure simple tests pass
expected_path = os.path.join("parts", "of", "a", "path")
expected_url = "parts/of/a/path"
result = urllib_request.pathname2url(expected_path)
self.assertEqual(expected_url, result,
"pathname2url() failed; %s != %s" %
(result, expected_url))
result = urllib_request.url2pathname(expected_url)
self.assertEqual(expected_path, result,
"url2pathame() failed; %s != %s" %
(result, expected_path))
def test_quoting(self):
# Test automatic quoting and unquoting works for pathnam2url() and
# url2pathname() respectively
given = os.path.join("needs", "quot=ing", "here")
expect = "needs/%s/here" % urllib_parse.quote("quot=ing")
result = urllib_request.pathname2url(given)
self.assertEqual(expect, result,
"pathname2url() failed; %s != %s" %
(expect, result))
expect = given
result = urllib_request.url2pathname(result)
self.assertEqual(expect, result,
"url2pathname() failed; %s != %s" %
(expect, result))
given = os.path.join("make sure", "using_quote")
expect = "%s/using_quote" % urllib_parse.quote("make sure")
result = urllib_request.pathname2url(given)
self.assertEqual(expect, result,
"pathname2url() failed; %s != %s" %
(expect, result))
given = "make+sure/using_unquote"
expect = os.path.join("make+sure", "using_unquote")
result = urllib_request.url2pathname(given)
self.assertEqual(expect, result,
"url2pathname() failed; %s != %s" %
(expect, result))
def test_roundtrip_url2pathname(self):
list_of_paths = ['C:',
r'\\\C\test\\',
r'C:\foo\bar\spam.foo'
]
for path in list_of_paths:
self.assertEqual(url2pathname(pathname2url(path)), path)
def test_converting_drive_letter(self):
self.assertEqual(pathname2url("C:"), '///C:')
self.assertEqual(pathname2url("C:\\"), '///C:')
def test_simple_compare(self):
self.assertEqual(pathname2url(r'C:\foo\bar\spam.foo'),
"///C:/foo/bar/spam.foo" )
def test_long_drive_letter(self):
self.assertRaises(IOError, pathname2url, "XX:\\")
def test_roundtrip_pathname2url(self):
list_of_paths = ['///C:',
'/////folder/test/',
'///C:/foo/bar/spam.foo']
for path in list_of_paths:
self.assertEqual(pathname2url(url2pathname(path)), path)
def _browser(file_path):
import webbrowser
try:
from urllib import pathname2url
except:
from urllib.request import pathname2url
webbrowser.open("file://" + pathname2url(os.path.abspath(file_path)))
## -- cookiecutter -- ##
def EditSensor(self, name, description, device, args):
info('EditSensor: {}, {}, {}, {}'.format(name, description, device, args))
bVal = False
try:
sensorEdit= {}
name = req.pathname2url(name)
sensorEdit['name'] = name
sensorEdit['device'] = device
sensorEdit['description'] = description
sensorEdit['args'] = args
with self.sensorMutex:
retValue = manager.updateDevice(name, sensorEdit)
info('Edit device returned: {}'.format(retValue))
try:
hashKey = self.SHA_Calc_str(name+device)
with self.sensorMutex:
if self.currentSensorsInfo:
currentSensorsDictionary = dict((i['sensor'], i) for i in self.currentSensorsInfo)
sensorData = currentSensorsDictionary[hashKey]
sensor = sensorData[hashKey]
raspberryValue = {}
sensor['args'] = args
sensor['description'] = description
raspberryValue['SensorsInfo'] = []
raspberryValue['SensorsInfo'].append(sensor)
if self.onDataChanged != None:
self.onDataChanged(raspberryValue)
except:
pass
if retValue[0] == 200:
bVal = True
self.AddRefresh()
except:
exception ("Edit sensor failed")
bVal = False
return bVal
def DeleteSensor(self, name):
bVal = False
try:
sensorRemove = req.pathname2url(name)
with self.sensorMutex:
retValue = manager.removeDevice(sensorRemove)
info('Remove device returned: {}'.format(retValue))
if retValue[0] == 200:
bVal = True
self.AddRefresh()
except:
exception ("Remove sensor failed")
bVal = False
return bVal
def build_cdx_request(self, request):
cdx_url = self.cdx_url_template.format(url=pathname2url(request.url))
cdx_request = Request(cdx_url)
cdx_request.meta['wayback_machine_original_request'] = request
cdx_request.meta['wayback_machine_cdx_request'] = True
return cdx_request
def show(self, wait=1.2, scale=10, module_color=(0, 0, 0, 255),
background=(255, 255, 255, 255), quiet_zone=4):
"""Displays this QR code.
This method is mainly intended for debugging purposes.
This method saves the output of the :py:meth:`png` method (with a default
scaling factor of 10) to a temporary file and opens it with the
standard PNG viewer application or within the standard webbrowser. The
temporary file is deleted afterwards.
If this method does not show any result, try to increase the `wait`
parameter. This parameter specifies the time in seconds to wait till
the temporary file is deleted. Note, that this method does not return
until the provided amount of seconds (default: 1.2) has passed.
The other parameters are simply passed on to the `png` method.
"""
import os
import time
import tempfile
import webbrowser
try: # Python 2
from urlparse import urljoin
from urllib import pathname2url
except ImportError: # Python 3
from urllib.parse import urljoin
from urllib.request import pathname2url
f = tempfile.NamedTemporaryFile('wb', suffix='.png', delete=False)
self.png(f, scale=scale, module_color=module_color,
background=background, quiet_zone=quiet_zone)
f.close()
webbrowser.open_new_tab(urljoin('file:', pathname2url(f.name)))
time.sleep(wait)
os.unlink(f.name)