def open(self, path, cwd=None, edit_check=True, allow_students=True):
if edit_check and hasattr(self, '_edit'):
self._do_edit()
if cwd == None:
work = self.root
else:
work = cwd
path = PurePosixPath(path)
for item in path.parts:
if item.find('/') >= 0:
continue
if not allow_students and work is self.root.students:
return False
work = work.access(item)
if not work.ready:
work.fetch()
return work
python类PurePosixPath()的实例源码
def print_file(self, output, path, recursive):
node = self.vfs.open(path)
if self.vfs.is_internal_link(node):
self.print_internal_link(output, path, node)
elif self.vfs.is_regular(node):
self.print_regular(output, path)
elif self.vfs.is_directory(node):
self.print_directory(output, path)
for child_name, child_node in node.list():
child_path = pathlib.PurePosixPath(path) / child_name
child_path = child_path.as_posix()
if not recursive and self.vfs.is_directory(child_node):
self.print_directory(output, child_path)
else:
self.print_file(output, child_path, recursive)
else:
assert False, '?????????'
def subindex(self, *path):
"""
Returns an `IndexFile` object listing only those files in or below the
directory given by ``*path``. The path keys in the resulting object
will be relative to ``*path``.
``*path`` may be any relative path specification accepted by
`PurePath.relative_to`, such as a string (e.g., ``"main"`` or
``"main/binary-amd64"``), a sequence of strings (e.g., ``"main",
"binary-amd64"``), or a `PurePosixPath` object.
"""
### TODO: Add an option for also updating the `filename` attributes of
### the IndexEntries?
### TODO: Add an option for controlling whether to keep `fields`?
subfiles = {}
for p, entry in self.files.items():
pathobj = PurePosixPath(p)
### TODO: Handle absolute paths and paths beginning with .. (or .?)
try:
subpath = pathobj.relative_to(*path)
except ValueError:
pass
else:
subfiles[str(subpath)] = entry
return type(self)(files=subfiles, fields=self.fields.copy())
def _get_entry(self, path_str: str) -> Entry:
"""
Gets an entry or raises ENOENT.
"""
path = PurePosixPath(path_str)
if len(path.parts) == 4:
# Synthesize a file!
_, table_name, row_ref, column_name = path.parts
return ColumnFile(table_name, row_ref, column_name)
elif len(path.parts) == 3:
# It's a row!
_, table_name, row_ref = path.parts
return RowDirectory(table_name, row_ref)
elif len(path.parts) == 2:
# Table directory
_, table_name = path.parts
return TableDirectory(table_name)
elif len(path.parts) == 1:
# Root directory
return RootDirectory()
raise FuseOSError(ENOENT)
def copy_directory(self, host_path, guest_path):
"""
Copies a directory from host_path (pathlib.Path) to guest_path (pathlib.PurePath).
This is natively supported since LXD 2.2 but we have to support 2.0+
Refs: https://github.com/lxc/lxd/issues/2401
Uses tar to pack/unpack the directory.
"""
guest_tar_path = self._guest_temporary_tar_path
self.run(['mkdir', '-p', str(guest_path)])
with tempfile.NamedTemporaryFile() as f:
logger.debug("Creating tar file from {}".format(host_path))
tar = tarfile.open(f.name, 'w')
tar.add(str(host_path), arcname='.')
tar.close()
self.copy_file(Path(f.name), PurePosixPath(guest_tar_path))
self.run(['tar', '-xf', guest_tar_path, '-C', str(guest_path)])
self.run(['rm', '-f', str(guest_tar_path)])
##################################
# PRIVATE METHODS AND PROPERTIES #
##################################
def test_can_run_puppet_manifest_mode(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'test_site.pp',
'manifests_path': 'test_manifests'})
provisioner.provision()
assert mock_copy_dir.call_count == 1
assert mock_copy_dir.call_args_list[0][0][0] == Path('test_manifests')
assert mock_copy_dir.call_args_list[0][0][1] == PurePosixPath(
provisioner._guest_manifests_path)
assert mock_run.call_count == 2
assert mock_run.call_args_list[0][0][0] == ['which', 'puppet']
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
'puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
def test_can_set_binary_path(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'test_site.pp',
'manifests_path': 'test_manifests',
'binary_path': '/test/path'})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[0][0][0] == ['test', '-x', '/test/path/puppet']
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
'/test/path/puppet apply --detailed-exitcodes --manifestdir {} {}'.format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'test_site.pp')]
def test_can_set_facter(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'facter': {'foo': 'bah', 'bar': 'baz baz'}})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"FACTER_bar='baz baz' FACTER_foo=bah "
"puppet apply --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_can_set_hiera_config_path(self, mock_run, mock_copy_dir, mock_copy_file):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'hiera_config_path': 'hiera.yaml'})
provisioner.provision()
assert mock_copy_file.call_count == 1
assert mock_copy_file.call_args_list[0][0][0] == Path('hiera.yaml')
assert mock_copy_file.call_args_list[0][0][1] == PurePosixPath(
provisioner._guest_hiera_file)
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"puppet apply --hiera_config={} --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_hiera_file),
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_can_set_environment_variables(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'site.pp',
'manifests_path': 'test_manifests',
'environment_variables': {'FOO': 'bah', 'BAR': 'baz baz'}})
provisioner.provision()
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"BAR='baz baz' FOO=bah "
"puppet apply --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'site.pp')]
def test_different_flavours_unordered(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
with self.assertRaises(TypeError):
p < q
with self.assertRaises(TypeError):
p <= q
with self.assertRaises(TypeError):
p > q
with self.assertRaises(TypeError):
p >= q
#
# Tests for the concrete classes
#
# Make sure any symbolic links in the base test path are resolved
def test_different_flavours_unordered(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
with self.assertRaises(TypeError):
p < q
with self.assertRaises(TypeError):
p <= q
with self.assertRaises(TypeError):
p > q
with self.assertRaises(TypeError):
p >= q
#
# Tests for the concrete classes
#
# Make sure any symbolic links in the base test path are resolved
def getTextLinesForCommit( self, filepath, commit_id ):
assert isinstance( filepath, pathlib.Path ), 'expecting pathlib.Path got %r' % (filepath,)
# git show wants a posix path, it does not work with '\' path seperators
git_filepath = pathlib.PurePosixPath( filepath )
text = self.cmdShow( '%s:%s' % (commit_id, git_filepath) )
all_lines = text.split('\n')
if all_lines[-1] == '':
return all_lines[:-1]
else:
return all_lines
def getTextLinesForCommit( self, commit_id ):
git_filepath = pathlib.PurePosixPath( self.__filepath )
text = self.__project.cmdShow( '%s:%s' % (commit_id, git_filepath) )
all_lines = text.split('\n')
if all_lines[-1] == '':
return all_lines[:-1]
else:
return all_lines
def __str__(self):
s = super().__str__()
# we have to reinstate the second / of http:// as PurePosixPath removes it
return re.sub('(https?:)/([^/])', r'\1//\2', s)
def _do_edit(self):
s = self.strings
# add_courses
for semester, sn in self._edit['add_courses']:
path = PurePosixPath('/', s['dir_root_courses'], semester)
node = self.open(path, edit_check=False)
course = WebCourseDirectory(self, node, semester, sn)
course.fetch()
assert course.ready == True
node.add(course.name, course)
node.add(sn, InternalLink(self, node, course.name))
# add_unenrolled_courses
for semester, sn in self._edit['add_unenrolled_courses']:
path = PurePosixPath('/', s['dir_root_courses'], semester)
node = self.open(path, edit_check=False)
course = UnenrolledCourseDirectory(self, node, sn)
course.fetch()
assert course.ready == True
node.add(course.name, course)
node.add(sn, InternalLink(self, node, course.name))
# delete_files
for path in self._edit['delete_files']:
node = self.open(path, edit_check=False, allow_students=False)
if node:
if node is self.root:
raise ValueError('?????????')
node.parent.unlink(PurePosixPath(path).name)
else:
self.root.students.queue_deletion_request(path)
# ????? open ??????
del self._edit
# ? JSON ??????????
def add_student(self, account, sn=None, pwd=None):
s = self.vfs.strings
if sn and (not hasattr(self, '_last_sn') or sn != self._last_sn) and \
self._is_student_function_enabled(sn):
self._last_sn = sn
if hasattr(self, '_last_sn'):
if hasattr(self, '_queued_addition_requests'):
queued_addition_requests = self._queued_addition_requests.keys()
del self._queued_addition_requests
for request in queued_addition_requests:
self.add(request, StudentsStudentDirectory(
self.vfs, self, request))
if hasattr(self, '_queued_deletion_requests'):
queued_deletion_requests = self._queued_deletion_requests.keys()
del self._queued_deletion_requests
for request in queued_deletion_requests:
node = self.vfs.open(request, edit_check=False)
node.parent.unlink(PurePosixPath(request).name)
if account not in map(lambda x: x[0], self._children):
self.add(account, StudentsStudentDirectory(
self.vfs, self, account))
else:
if not hasattr(self, '_queued_addition_requests'):
self._queued_addition_requests = OrderedDict()
self._queued_addition_requests[account] = None
if pwd:
depth = 0
while pwd is not self.vfs.root:
pwd = pwd.parent
depth += 1
return PurePosixPath('../' * depth,
s['dir_root_students'], account).as_posix()
def queue_deletion_request(self, path):
if not hasattr(self, '_last_sn'):
if not hasattr(self, '_queued_deletion_requests'):
self._queued_deletion_requests = OrderedDict()
self._queued_deletion_requests[path] = None
else:
node = self.vfs.open(path, edit_check=False)
node.parent.unlink(PurePosixPath(path).name)
def add_teacher(self, account, pwd=None):
s = self.vfs.strings
if account not in map(lambda x: x[0], self._children):
self.add(account, TeachersTeacherDirectory(self.vfs, self, account))
if pwd:
depth = 0
while pwd is not self.vfs.root:
pwd = pwd.parent
depth += 1
return PurePosixPath('../' * depth,
s['dir_root_teachers'], account).as_posix()
def unprefix(suite, component):
"""
If ``component`` begins with one or more path components (excluding the
last) that also occur at the end of ``suite`` (excluding the first), remove
the prefix from ``component`` and return the remainder.
This function is used for removing prefixes that may occur in component
names listed in suite Release files when the suite name contains multiple
path components.
>>> unprefix('stable', 'main')
'main'
>>> unprefix('stable/updates', 'updates/main')
'main'
>>> unprefix('stable/updates', 'main')
'main'
>>> unprefix('trusty', 'trusty/main')
'trusty/main'
>>> unprefix('foo', 'foo')
'foo'
>>> unprefix('foo', 'foo/bar')
'foo/bar'
>>> unprefix('foo/bar', 'bar')
'bar'
"""
suite = PurePosixPath(suite)
component = PurePosixPath(component)
for i in range(min(len(suite.parts), len(component.parts))):
if suite.parts[-(i+1):] != component.parts[:i+1]:
break
return str(PurePosixPath(*component.parts[i:]))
def _fetch_sources(self):
self._runner.mkdir_p(self._paths.src)
self._runner.cd(self._paths.src)
for project in self._profile.projects.values():
source = project.source
src_path = None
if type(source) is vlttng.profile.HttpFtpSource:
src_path = project.name
# download
posix_path = PurePosixPath(source.url)
filename = posix_path.name
self._runner.wget(source.url, filename)
# extract
self._runner.mkdir_p(self._paths.project_src(project.name))
self._runner.tar_x(filename, project.name)
elif type(source) is vlttng.profile.GitSource:
src_path = project.name
# clone
self._runner.git_clone(source.clone_url, project.name)
# checkout
self._runner.cd(self._paths.project_src(project.name))
self._runner.git_checkout(source.checkout)
self._runner.cd(self._paths.src)
# keep where the source of this project is
if src_path is not None:
src_path = self._paths.project_src(src_path)
self._src_paths[project.name] = src_path
check_file.py 文件源码
项目:FrackinUniverse-sChinese-Project
作者: ProjectSky
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def getFileExt(f):
ext = pathlib.PurePosixPath(f).suffix
return ext
def relative_iterdir(self, relpath=''):
relpath = self._as_zip_path(relpath)
seen = set()
with zipfile.ZipFile(str(self.path), mode='r') as zf:
for name in zf.namelist():
if name.startswith(relpath):
parts = pathlib.PurePosixPath(name).parts
if len(parts) > 0:
result = parts[0]
if result not in seen:
seen.add(result)
yield result
def _as_zip_path(self, path):
path = str(pathlib.PurePosixPath(path))
# zip files don't work well with '.' which is the identity of a Path
# obj, so just convert to empty string which is basically the identity
# of a zip's entry
if path == '.':
path = ''
return path
def shell(self, username=None, cmd_args=[]):
""" Opens a new interactive shell in the container. """
# We run this in case our lxdock.yml config was modified since our last `lxdock up`.
self._setup_env()
# For now, it's much easier to call `lxc`, but eventually, we might want to contribute
# to pylxd so it supports `interactive = True` in `exec()`.
shellcfg = self.options.get('shell', {})
shelluser = username or shellcfg.get('user')
if shelluser:
# This part is the result of quite a bit of `su` args trial-and-error.
shellhome = shellcfg.get('home') if not username else None
homearg = '--env HOME={}'.format(shellhome) if shellhome else ''
cmd = 'lxc exec {} {} -- su -m {}'.format(self.lxd_name, homearg, shelluser)
else:
cmd = 'lxc exec {} -- su -m root'.format(self.lxd_name)
if cmd_args:
# Again, a bit of trial-and-error.
# 1. Using `su -s SCRIPT_FILE` instead of `su -c COMMAND` because `su -c` cannot
# receive SIGINT (Ctrl-C).
# Ref: //sethmiller.org/it/su-forking-and-the-incorrect-trapping-of-sigint-ctrl-c/
# See also: //github.com/lxdock/lxdock/pull/67#issuecomment-299755944
# 2. Applying shlex.quote to every argument to protect special characters.
# e.g.: lxdock shell container_name -c echo "he re\"s" '$PATH'
self._guest.run(['mkdir', '-p',
str(PurePosixPath(self._guest_shell_script_file).parent)])
self._container.files.put(self._guest_shell_script_file, textwrap.dedent(
"""\
#!/bin/sh
{}
""".format(' '.join(map(shlex.quote, cmd_args)))))
self._guest.run(['chmod', 'a+rx', self._guest_shell_script_file])
cmd += ' -s {}'.format(self._guest_shell_script_file)
subprocess.call(cmd, shell=True)
def test_can_set_module_path(self, mock_run, mock_copy_dir):
class DummyGuest(Guest):
name = 'dummy'
host = Host()
guest = DummyGuest(unittest.mock.Mock())
mock_run.return_value = 0
provisioner = PuppetProvisioner('./', host, [guest], {
'manifest_file': 'mani.pp',
'manifests_path': 'test_manifests',
'module_path': 'test-puppet-modules'})
provisioner.provision()
assert mock_copy_dir.call_count == 2
assert (Path('test-puppet-modules'),
PurePosixPath(provisioner._guest_module_path)) in {
mock_copy_dir.call_args_list[0][0],
mock_copy_dir.call_args_list[1][0]}
assert mock_run.call_count == 2
assert mock_run.call_args_list[1][0][0] == [
'sh', '-c',
"puppet apply --modulepath {}:{} --detailed-exitcodes --manifestdir {} {}".format(
PurePosixPath(provisioner._guest_module_path),
PurePosixPath(provisioner._guest_default_module_path),
PurePosixPath(provisioner._guest_manifests_path),
PurePosixPath(provisioner._guest_manifests_path) / 'mani.pp')]
def test_can_copy_directory(self, mock_close, mock_add):
class DummyGuest(Guest):
name = 'dummy'
guest = DummyGuest(FakeContainer())
guest.lxd_container.files.put.return_value = True
with tempfile.TemporaryDirectory() as d:
os.mkdir('{}/d1'.format(d))
os.mkdir('{}/d1/d2'.format(d))
with open('{}/d1/f1'.format(d), 'wb') as f1:
f1.write(b'dummy f1')
with open('{}/d1/d2/f2'.format(d), 'wb') as f2:
f2.write(b'dummy f2')
with open('{}/f3'.format(d), 'wb') as f3:
f3.write(b'dummy f3')
guest.copy_directory(pathlib.Path(d), pathlib.PurePosixPath('/a/b/c'))
assert mock_add.call_count == 1
assert mock_add.call_args[0][0] == str(pathlib.Path(d))
assert mock_add.call_args[1]['arcname'] == '.'
assert mock_close.call_count == 1
assert guest.lxd_container.execute.call_count == 4
assert guest.lxd_container.execute.call_args_list[0][0] == (['mkdir', '-p', '/a/b/c'], )
assert guest.lxd_container.execute.call_args_list[1][0] == ([
'mkdir', '-p', str(pathlib.PurePosixPath(guest._guest_temporary_tar_path).parent)], )
assert guest.lxd_container.execute.call_args_list[2][0] == ([
'tar', '-xf', guest._guest_temporary_tar_path, '-C', '/a/b/c'], )
assert guest.lxd_container.execute.call_args_list[3][0] == ([
'rm', '-f', guest._guest_temporary_tar_path], )
assert guest.lxd_container.files.put.call_count == 1
assert guest.lxd_container.files.put.call_args[0][0] == guest._guest_temporary_tar_path
def test_concrete_class(self):
p = self.cls('a')
self.assertIs(type(p),
pathlib.PureWindowsPath if os.name == 'nt' else pathlib.PurePosixPath)
def test_different_flavours_unequal(self):
p = pathlib.PurePosixPath('a')
q = pathlib.PureWindowsPath('a')
self.assertNotEqual(p, q)
def _split_file_list(text):
lines = text.split("\r\n")
for line in lines:
groups = line.split(",")
if len(groups) == 6:
directory, filename, *remaining = groups
remaining = map(int, remaining)
size, attr_val, date_val, time_val = remaining
timeinfo = _decode_time(date_val, time_val)
attribute = _decode_attribute(attr_val)
path = str(PurePosixPath(directory, filename))
yield FileInfo(directory, filename, path,
size, attribute, timeinfo)