def _split_file_list_raw(text):
lines = text.split("\r\n")
for line in lines:
groups = line.split(",")
if len(groups) == 6:
directory, filename, size, *_ = groups
path = str(PurePosixPath(directory, filename))
yield RawFileInfo(directory, filename, path, int(size))
python类PurePosixPath()的实例源码
def _possible_names(cls, package: PackageInfo) -> List[str]:
names = list()
for url_str in package.get_urls():
url = urlparse(url_str)
host = url.netloc
if not host.endswith('github.com'):
continue
path = PurePosixPath(url.path)
parts = path.parts
if path.is_absolute():
parts = parts[1:]
if len(parts) >= 2:
# Get the first 2 path components without extensions
# this should handle:
# - owner/project
# - owner/project.git
# - owner/project/releases
name = PurePosixPath(parts[0])
# strip off .git if the project name contains it
# don't just strip off any ext because "." is valid
name_project = parts[1]
if name_project.endswith('.git'):
name_project = name_project[:-len('.git')]
name = name.joinpath(name_project)
names.append(str(name))
return names
def __init__(self, name: str) -> None:
super().__init__(name)
self._log = logging.getLogger(f'{__name__}.LocalBucket')
# For local storage, the name is actually a partial path, relative
# to the local storage root.
self.root = pathlib.Path(current_app.config['STORAGE_DIR'])
self.bucket_path = pathlib.PurePosixPath(name[:2]) / name
self.abspath = self.root / self.bucket_path
def test_concrete_class(self):
p = self.cls('a')
self.assertIs(type(p),
pathlib.PureWindowsPath if os.name == 'nt' else pathlib.PurePosixPath)
def test_different_flavours_unequal(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
self.assertNotEqual(p, q)
def location_from_path_for_server(self, path):
"""Construct a location from a simple string path.
Path is just a reference into the bucket of the form:
{bucket_name}/{object_path}
"""
if not path:
path = self.bucket
posix_path = pathlib.PurePosixPath(path.lstrip("/"))
return self.service_account.create_oauth_location(
bucket=posix_path.parts[0],
path=utils.join_path(*posix_path.parts[1:]))
def name_to_asserted_group_path(name):
path = pathlib.PurePosixPath(name)
if path.is_absolute():
raise NotImplementedError(
"Absolute paths are currently not supported and unlikely to be implemented."
)
if len(path.parts) < 1 and str(name) != ".":
raise NotImplementedError(
"Getting an item on a group with path '" + name + "' " +
"is not supported and unlikely to be implemented."
)
return path
def remove_root(name):
path = pathlib.PurePosixPath(name)
if path.is_absolute():
path = path.relative_to(path.root)
return path
def test_raw_init(setup_teardown_folder):
raw = Raw(setup_teardown_folder[2], pathlib.PurePosixPath(""), "test_object", io_mode=None)
assert raw.root_directory == setup_teardown_folder[2]
assert raw.object_name == "test_object"
assert raw.parent_path == pathlib.PurePosixPath("")
assert raw.io_mode is None
assert raw.relative_path == pathlib.PurePosixPath("test_object")
assert raw.name == "/test_object"
def test_group_init(setup_teardown_folder):
group = Group(setup_teardown_folder[2], pathlib.PurePosixPath(""), "test_object", io_mode=None)
assert group.root_directory == setup_teardown_folder[2]
assert group.object_name == "test_object"
assert group.parent_path == pathlib.PurePosixPath("")
assert group.io_mode is None
assert group.relative_path == pathlib.PurePosixPath("test_object")
assert group.name == "/test_object"
# New groups can be created via .create_group method
def provision_single(self, guest):
""" Performs the provisioning operations using puppet. """
# Verify if `puppet` has been installed.
binary_path = self.options.get('binary_path')
if binary_path is not None:
puppet_bin = str(PurePosixPath(binary_path) / 'puppet')
retcode = guest.run(['test', '-x', puppet_bin])
fail_msg = (
"puppet executable is not found in the specified path {} in the "
"guest container. ".format(binary_path)
)
else:
retcode = guest.run(['which', 'puppet'])
fail_msg = (
"puppet was not automatically installed for this guest. "
"Please specify the command to install it in LXDock file using "
"a shell provisioner and try `lxdock provision` again. You may "
"also specify `binary_path` that contains the puppet executable "
"in LXDock file.")
if retcode != 0:
raise ProvisionFailed(fail_msg)
# Copy manifests dir
manifests_path = self.options.get('manifests_path')
if manifests_path is not None:
guest.copy_directory(
Path(manifests_path), PurePosixPath(self._guest_manifests_path))
# Copy module dir
module_path = self.options.get('module_path')
if module_path is not None:
guest.copy_directory(Path(module_path), PurePosixPath(self._guest_module_path))
# Copy environment dir
environment_path = self.options.get('environment_path')
if environment_path is not None:
guest.copy_directory(
Path(environment_path), PurePosixPath(self._guest_environment_path))
# Copy hiera file
hiera_file = self.options.get('hiera_config_path')
if hiera_file is not None:
guest.copy_file(Path(hiera_file), PurePosixPath(self._guest_hiera_file))
# Run puppet.
command = self._build_puppet_command()
if environment_path:
logger.info("Running Puppet with environment {}...".format(self.options['environment']))
else:
logger.info("Running Puppet with {}...".format(self.options['manifest_file']))
guest.run(['sh', '-c', ' '.join(command)])
##################################
# PRIVATE METHODS AND PROPERTIES #
##################################
def _build_puppet_command(self):
"""
Refs:
https://github.com/mitchellh/vagrant/blob/9c299a2a357fcf87f356bb9d56e18a037a53d138/
plugins/provisioners/puppet/provisioner/puppet.rb#L173
"""
options = self.options.get('options', '')
options = list(map(shlex.quote, shlex.split(options)))
module_path = self.options.get('module_path')
if module_path is not None:
options += ['--modulepath',
'{}:{}'.format(self._guest_module_path,
self._guest_default_module_path)]
hiera_path = self.options.get('hiera_config_path')
if hiera_path is not None:
options += ['--hiera_config={}'.format(self._guest_hiera_file)]
# TODO: we are not detecting console color support now, but please contribute if you need!
options += ['--detailed-exitcodes']
environment_path = self.options.get('environment_path')
if environment_path is not None:
options += ['--environmentpath', str(self._guest_environment_path),
'--environment', self.options['environment']]
else:
options += ['--manifestdir', str(self._guest_manifests_path)]
manifest_file = self.options.get('manifest_file')
if manifest_file is not None:
options += [str(
PurePosixPath(self._guest_manifests_path) / manifest_file)]
# Build up the custom facts if we have any
facter = []
facter_dict = self.options.get('facter')
if facter_dict is not None:
for key, value in sorted(facter_dict.items()):
facter.append("FACTER_{}={}".format(key, shlex.quote(value)))
binary_path = self.options.get('binary_path')
if binary_path is not None:
puppet_bin = str(PurePosixPath(binary_path) / 'puppet')
else:
puppet_bin = 'puppet'
# TODO: working_directory for hiera. Please contribute!
env = []
env_variables = self.options.get('environment_variables')
if env_variables is not None:
for key, value in sorted(env_variables.items()):
env.append("{}={}".format(key, shlex.quote(value)))
command = env + facter + [puppet_bin, 'apply'] + options
return command
def __getitem__(self, name):
path = utils.path.name_to_asserted_group_path(name)
if len(path.parts) > 1:
top_directory = path.parts[0]
sub_name = pathlib.PurePosixPath(*path.parts[1:])
return self[top_directory][sub_name]
if name not in self:
raise KeyError("No such object: '" + str(name) + "'")
directory = self.directory / path
if exob.is_raw_object_directory(directory): # TODO create one function that handles all Raw creation
return raw.Raw(
root_directory=self.root_directory,
parent_path=self.relative_path,
object_name=name,
io_mode=self.io_mode # TODO validate name?
# TODO plugin manager?
)
if not exob.is_nonraw_object_directory(directory):
raise IOError(
"Directory '" + directory +
"' is not a valid exdir object."
)
meta_filename = directory / exob.META_FILENAME
with meta_filename.open("r", encoding="utf-8") as meta_file:
meta_data = yaml.safe_load(meta_file)
if meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME] == exob.DATASET_TYPENAME:
return ds.Dataset(
root_directory=self.root_directory,
parent_path=self.relative_path,
object_name=name,
io_mode=self.io_mode,
validate_name=self.validate_name,
plugin_manager=self.plugin_manager
)
elif meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME] == exob.GROUP_TYPENAME:
return Group(
root_directory=self.root_directory,
parent_path=self.relative_path,
object_name=name,
io_mode=self.io_mode,
validate_name=self.validate_name,
plugin_manager=self.plugin_manager
)
else:
print(
"Object", name, "has data type",
meta_data[exob.EXDIR_METANAME][exob.TYPE_METANAME]
)
raise NotImplementedError("Cannot open objects of this type")