def src_proc_dispatcher(pkg_name, src_tbl_name, src_loc):
tobj = tempfile.mkdtemp(dir='/var/cache/acbs/build/', prefix='acbs.')
src_tbl_loc = os.path.join(src_loc, src_tbl_name)
shadow_ark_loc = os.path.join(tobj, src_tbl_name)
if os.path.isdir(src_tbl_loc):
print('[I] Making a copy of the source directory...', end='')
try:
shutil.copytree(src=src_tbl_loc, dst=shadow_ark_loc)
except:
print('Failed!')
return False
print('Done!')
return True, tobj
else:
os.symlink(src_tbl_loc, shadow_ark_loc)
# print('[D] Source location: {}, Shadow link: {}'.format(src_tbl_loc, shadow_ark_loc))
return decomp_file(shadow_ark_loc, tobj), tobj
python类symlink()的实例源码
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
try:
# For systems that support symbolic and hard links.
if tarinfo.issym():
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except symlink_exception:
try:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo), targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def execute(self):
from ranger.container.file import File
new_path = self.rest(1)
cf = self.fm.thisfile
if not new_path:
return self.fm.notify('Syntax: relink <newpath>', bad=True)
if not cf.is_link:
return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)
if new_path == os.readlink(cf.path):
return
try:
os.remove(cf.path)
os.symlink(new_path, cf.path)
except OSError as err:
self.fm.notify(err)
self.fm.reset()
self.fm.thisdir.pointed_obj = cf
self.fm.thisfile = cf
def copytree(src, dst, symlinks = False, ignore = None):
if not os.path.exists(dst):
os.makedirs(dst)
shutil.copystat(src, dst)
lst = os.listdir(src)
if ignore:
excl = ignore(src, lst)
lst = [x for x in lst if x not in excl]
for item in lst:
s = os.path.join(src, item)
d = os.path.join(dst, item)
if symlinks and os.path.islink(s):
if os.path.lexists(d):
os.remove(d)
os.symlink(os.readlink(s), d)
try:
st = os.lstat(s)
mode = stat.S_IMODE(st.st_mode)
os.lchmod(d, mode)
except:
pass # lchmod not available
elif os.path.isdir(s):
copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def setUp(self):
self.template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader("config")
)
# create working dir
self.working_dir = os.path.join(build_path + "run", self.id())
if os.path.exists(self.working_dir):
shutil.rmtree(self.working_dir)
os.makedirs(self.working_dir)
try:
# update the last_run link
if os.path.islink(build_path + "last_run"):
os.unlink(build_path + "last_run")
os.symlink(build_path + "run/{}".format(self.id()),
build_path + "last_run")
except:
# symlink is best effort and can fail when
# running tests in parallel
pass
def clone(self, name, user):
"""
create a clone of self with a given name, owned by user
"""
self.expiration = None
# create the branch on the database
new_branch = Branch(name, self.project, self, user)
db.session.add(new_branch)
db.session.commit()
# clone repository in file system
branch_path = os.path.abspath(join(os.getcwd(), 'repos',
self.project.name,
name, 'source'))
self.get_repo().clone(branch_path, branch=self.name)
os.symlink(os.path.abspath(join('repos', self.project.name,
'_resources/low_resolution')),
join(branch_path, '_resources'))
branch_repo = git.Repo(branch_path)
branch_repo.git.checkout('HEAD', b=name)
config_repo(branch_repo, user.username, user.email)
# build the source
new_branch.build(timeout=60)
return new_branch
def mk_tar_dir(issuer, test_profile):
wd = os.getcwd()
# Make sure there is a tar directory
tardirname = wd
for part in ["tar", issuer, test_profile]:
tardirname = os.path.join(tardirname, part)
if not os.path.isdir(tardirname):
os.mkdir(tardirname)
# Now walk through the log directory and make symlinks from
# the log files to links in the tar directory
logdirname = os.path.join(wd, "log", issuer, test_profile)
for item in os.listdir(logdirname):
if item.startswith("."):
continue
ln = os.path.join(logdirname, item)
tn = os.path.join(tardirname, "{}.txt".format(item))
if os.path.isfile(tn):
os.unlink(tn)
if not os.path.islink(tn):
os.symlink(ln, tn)
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False):
"""Closes the compressed hdf5_file that was opened with open_compressed.
When the file was opened for writing (using the 'w' flag in open_compressed), the created HDF5 file is compressed into the given file name.
To be able to read the data using the real tools, a link with the correct extension might is created, when create_link is set to True.
"""
hdf5_file_name = hdf5_file.filename
is_writable = hdf5_file.writable
hdf5_file.close()
if is_writable:
# create compressed tar file
tar = tarfile.open(filename, mode="w:" + compression_type)
tar.add(hdf5_file_name, os.path.basename(filename))
tar.close()
if create_link:
extension = {'': '.tar', 'bz2': '.tar.bz2',
'gz': 'tar.gz'}[compression_type]
link_file = filename + extension
if not os.path.exists(link_file):
os.symlink(os.path.basename(filename), link_file)
# clean up locally generated files
os.remove(hdf5_file_name)
def reorganize(dataset_dir):
dirs = {}
dirs['trainA'] = os.path.join(dataset_dir, 'link_trainA')
dirs['trainB'] = os.path.join(dataset_dir, 'link_trainB')
dirs['testA'] = os.path.join(dataset_dir, 'link_testA')
dirs['testB'] = os.path.join(dataset_dir, 'link_testB')
mkdir(dirs.values())
for key in dirs:
try:
os.remove(os.path.join(dirs[key], '0'))
except:
pass
os.symlink(os.path.abspath(os.path.join(dataset_dir, key)),
os.path.join(dirs[key], '0'))
return dirs
test_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def build_same_files_and_dirs(tmpdir, source_is_symlink, dest_is_symlink, use_files):
"""Build temporary files or directories to test indication of same source and destination.
:param bool source_is_symlink: Should the source be a symlink to the destination (both cannot be True)
:param bool dest is symlink: Should the destination be a symlink to the source (both cannot be True)
:param bool use_files: Should files be created (if False, directories are used instead)
"""
if use_files:
real = tmpdir.join('real')
real.write('some data')
else:
real = tmpdir.mkdir('real')
link = tmpdir.join('link')
if source_is_symlink or dest_is_symlink:
os.symlink(str(real), str(link))
if source_is_symlink:
return str(link), str(real)
elif dest_is_symlink:
return str(real), str(link)
return str(real), str(real)
def test_process_single_file_destination_is_symlink_to_source(
tmpdir,
patch_process_single_operation,
standard_handler
):
source = tmpdir.join('source')
source.write('some data')
destination = str(tmpdir.join('destination'))
os.symlink(str(source), destination)
with patch('aws_encryption_sdk_cli.internal.io_handling.open', create=True) as mock_open:
standard_handler.process_single_file(
stream_args=sentinel.stream_args,
source=str(source),
destination=destination
)
assert not mock_open.called
assert not patch_process_single_operation.called
test_i_aws_encryption_sdk_cli.py 文件源码
项目:aws-encryption-sdk-cli
作者: awslabs
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_file_to_file_cycle_target_through_symlink(tmpdir):
plaintext = tmpdir.join('source_plaintext')
output_dir = tmpdir.mkdir('output')
os.symlink(str(output_dir), str(tmpdir.join('output_link')))
ciphertext = tmpdir.join('output_link', 'ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), 'wb') as f:
f.write(os.urandom(1024))
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(ciphertext)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext),
target=str(decrypted)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))
assert filecmp.cmp(str(plaintext), str(decrypted))
def render(self):
# Render all instances
for instance in self.instances:
instance.render()
# Render human readable links to each instance
for instance in self.instances:
try:
os.symlink(instance.node_root, os.path.join(
self.fixture_root, instance.instance_name))
except Exception:
log.exception('Failed to create symlink to %s',
instance.instance_name)
# Render the master control script
self.write_file(self.fixture_control,
self.render_control(), perm=0o755)
# Render the fixture specification file
self.write_file(self.fixture_spec,
self.render_spec())
def run(self):
venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'cthulhu')
venv_cmd = [
'virtualenv',
venv_path
]
print('Creating virtual environment in ', venv_path)
subprocess.check_call(venv_cmd)
print('Linking `activate` to top level of project.\n')
print('To activate, simply run `source activate`.')
try:
os.symlink(
os.path.join(venv_path, 'bin', 'activate'),
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'activate')
)
except OSError:
print('Unable to create symlink, you may have a stale symlink from a previous invocation.')
def symlink_ms(source, linkname):
"""Python 2 doesn't have os.symlink on windows so we do it ourselfs
:param source: sourceFile
:type source: str
:param linkname: symlink path
:type linkname: str
:raises: WindowsError, raises when it fails to create the symlink if the user permissions
are incorrect
"""
import ctypes
csl = ctypes.windll.kernel32.CreateSymbolicLinkW
csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
csl.restype = ctypes.c_ubyte
flags = 1 if os.path.isdir(source) else 0
try:
if csl(linkname, source.replace('/', '\\'), flags) == 0:
raise ctypes.WinError()
except WindowsError:
raise WindowsError("Failed to create symbolicLink due to user permissions")
def symlinks_supported():
"""
A function to check if creating symlinks are supported in the
host platform and/or if they are allowed to be created (e.g.
on Windows it requires admin permissions).
"""
tmpdir = tempfile.mkdtemp()
original_path = os.path.join(tmpdir, 'original')
symlink_path = os.path.join(tmpdir, 'symlink')
os.makedirs(original_path)
try:
os.symlink(original_path, symlink_path)
supported = True
except (OSError, NotImplementedError, AttributeError):
supported = False
else:
os.remove(symlink_path)
finally:
os.rmdir(original_path)
os.rmdir(tmpdir)
return supported
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def enable_sv(sv, sv_dir, runsvdir):
"""
Enable the specified service, 'sv'.
Assemble 'sv_path' and 'service_path' from 'sv', 'sv_dir', and 'runsvdir'.
Make a symlink from 'sv_path' to 'service_path' and bail if we get one
of a couple different exceptions.
"""
sv_path = os.path.join(sv_dir, sv)
service_path = os.path.join(runsvdir, sv)
try:
check_sv_path(sv_path)
os.symlink(sv_path, service_path)
return True
except NoSuchSvError:
raise NoSuchSvError
except PermissionError:
raise NeedSudoError
except FileExistsError:
raise SvAlreadyEnabledError
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo), targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def link(repo):
cheat_dir = get_cheat_path()
sheet_dir = get_sheet_path(repo)
if not os.path.isdir(sheet_dir):
raise CheatExtException(
"%s hadn't been installed yet at %s" % (repo, sheet_dir))
sheets = get_available_sheets_at(sheet_dir)
state_sheets = get_sheets_with_state(cheat_dir, sheet_dir, sheets)
_check_sheets_availability(state_sheets)
for sheet, _ in filter_by_state(STATE_UNLINK, state_sheets):
os.symlink(
os.path.join(sheet_dir, sheet),
os.path.join(cheat_dir, sheet))
def gen_stream(self, i):
cwd = os.getcwd()+'/'
res, prob_id = self.gen_data(i)
if res == None or prob_id == None:
return
path = os.path.join('stream', prob_id)
path2 = os.path.join('json', 'todo')
path2 = os.path.join(d[prob_id], path2)
data = concat_data(res)
md5 = hashlib.md5(data).hexdigest()
outfname = os.path.join(path, self.basename + '_' + md5 + '.json')
if (md5 + '.json') in fset[prob_id]:
logging.info('Skip %s due to same' % md5)
return
logging.info('Save Packet Stream: %s' % outfname)
fset[prob_id].add(md5 + '.json')
f = file(outfname, 'w')
json.dump(res, f)
outfname2 = os.path.join(path2, self.basename + '_' + md5 + '.json')
outfname = cwd+outfname
outfname2 = cwd+outfname2
os.symlink(outfname,outfname2)
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def link_to_tutorials():
# Linking to the directory does not work well with
# nbsphinx. We link to the files themselves
from glob import glob
from plotnine_examples.tutorials import TUTPATH
dest_dir = os.path.join(CUR_PATH, 'tutorials')
# Unlink files from previous build
for old_file in glob(dest_dir + '/*.ipynb'):
os.unlink(old_file)
# Link files for this build
for file in glob(TUTPATH + '/*.ipynb'):
basename = os.path.basename(file)
dest = os.path.join(dest_dir, basename)
os.symlink(file, dest)
def config_mv(args):
"""[Not Supported Yet]
Relocate config DB from its current location
:return: None for success, string for error
"""
if not args.force:
return err_out(DB_REF + " move to {} ".format(args.to) +
"requires '--force' flag to execute the request.")
# TODO:
# this is pure convenience code, so it is very low priority; still, here are the steps:
# checks if target exists upfront, and fail if it does
# cp the DB instance 'to' , and flip the symlink.
# refresh service (not really needed as next vmci_command handlers will pick it up)
# need --dryrun or --confirm
# issue: works really with discovery only , as others need to find it out
printMessage(args.output_format, "Sorry, configuration move ('config mv' command) is not supported yet")
return None
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member