def main(argv=None):
if len(sys.argv) <= 2:
print('Usage ./%s --exp=<exp_name> --ec2_settings=<relative_path_to_ec2_settings_file>'%sys.argv[0])
sys.exit(0)
import importlib
import os.path as osp
import shutil
module_name = 'sandbox.rocky.tf.launchers.%s'%(
osp.splitext(FLAGS.ec2_settings)[0].replace('/','.'))
mod = importlib.import_module(module_name)
dst_py = osp.join(osp.dirname(FLAGS.ec2_settings),FLAGS.exp+'.py')
try:
shutil.copy(FLAGS.ec2_settings, dst_py)
except shutil.SameFileError as e:
print(e)
if type(mod.params) != list: mod.params = [mod.params]
if hasattr(mod, 'base_params'): base_params = mod.base_params
else: base_params = dict()
N = 0
for params in mod.params:
ps = base_params.copy()
ps.update(params)
N += execute(params=ps, mode="ec2")
print('Launched %d jobs.'%N)
python类SameFileError()的实例源码
def setup_pfiles(tools):
"""
Copy the parameter files of the specified tools to the current
working directory, and setup the ``PFILES`` environment variable.
Parameters
----------
tools : list[str]
Name list of the tools to be set up
"""
for tool in tools:
pfile = subprocess.check_output([
"paccess", tool
]).decode("utf-8").strip()
subprocess.check_call(["punlearn", tool])
try:
shutil.copy(pfile, ".")
except shutil.SameFileError:
pass
# Setup the ``PFILES`` environment variable
os.environ["PFILES"] = "./:" + os.environ["PFILES"]
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def update_dict_files(path, shared_file_path):
dir_template = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'template'
)
try:
tei = next(os.path.join(path, f) for f in os.listdir(path))
os.remove(tei)
except (StopIteration, FileNotFoundError):
pass
def copy(files):
for file in files:
try:
shutil.copy2(file, path)
except shutil.SameFileError:
pass
copy(os.path.join(shared_file_path, f) for f in
('freedict-dictionary.css', 'freedict-P5.dtd', 'INSTALL',
'freedict-P5.rng', 'freedict-P5.xml'))
copy(os.path.join(dir_template, f) for f in os.listdir(dir_template))
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_dont_copy_file_onto_link_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_copyfile_same_file(self):
# copyfile() should raise SameFileError if the source and destination
# are the same.
src_dir = self.mkdtemp()
src_file = os.path.join(src_dir, 'foo')
write_file(src_file, 'foo')
self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
# But Error should work too, to stay backward compatible.
self.assertRaises(Error, shutil.copyfile, src_file, src_file)
def test_module_all_attribute(self):
self.assertTrue(hasattr(shutil, '__all__'))
target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
'SpecialFileError', 'ExecError', 'make_archive',
'get_archive_formats', 'register_archive_format',
'unregister_archive_format', 'get_unpack_formats',
'register_unpack_format', 'unregister_unpack_format',
'unpack_archive', 'ignore_patterns', 'chown', 'which',
'get_terminal_size', 'SameFileError']
if hasattr(os, 'statvfs') or os.name == 'nt':
target_api.append('disk_usage')
self.assertEqual(set(shutil.__all__), set(target_api))
def test_dont_copy_file_onto_link_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_copyfile_same_file(self):
# copyfile() should raise SameFileError if the source and destination
# are the same.
src_dir = self.mkdtemp()
src_file = os.path.join(src_dir, 'foo')
write_file(src_file, 'foo')
self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
# But Error should work too, to stay backward compatible.
self.assertRaises(Error, shutil.copyfile, src_file, src_file)
def merge_vcfs(input_dir, output_dir, project_name, raw_vcf_path_list=None, vcfs_gzipped=False):
"""Merge vcf files into single multisample vcf, bgzip and index merged vcf file."""
if raw_vcf_path_list is None:
vcf_file_extension = BGZIPPED_VCF_EXTENSION if vcfs_gzipped else VCF_EXTENSION
raw_vcf_path_list = _get_vcf_file_paths_list_in_directory(input_dir, vcf_file_extension)
if len(raw_vcf_path_list) == 0:
raise ValueError("No VCFs found with extension '{0}'.".format(vcf_file_extension))
elif len(raw_vcf_path_list) == 0:
raise ValueError("Input list of VCF files is empty.")
if len(raw_vcf_path_list) > 1:
bgzipped_vcf_path_list = set([bgzip_and_index_vcf(vcf_fp) for vcf_fp in raw_vcf_path_list])
single_vcf_path = os.path.join(output_dir, project_name + VCF_EXTENSION)
_merge_bgzipped_indexed_vcfs(bgzipped_vcf_path_list, single_vcf_path)
else:
file_name = os.path.basename(raw_vcf_path_list[0]) # w/o path
single_vcf_path = os.path.join(output_dir, file_name)
try:
# move to output dir with same file name
shutil.copyfile(raw_vcf_path_list[0], single_vcf_path)
except shutil.SameFileError:
# I ran into a case where there was a single input file, AND the input and output dirs were the same so it
# was already where it needed to be. In this case, an error is thrown because you can't copy a file to
# itself, but that's cool, so just ignore it.
pass
return single_vcf_path
def _parse_argument(self, scenario, key: str, value):
"""Some values of the scenario-file need to be changed upon writing,
such as the 'ta' (target algorithm), due to it's callback. Also,
the configspace, features, train_inst- and test-inst-lists are saved
to output_dir, if they exist.
Parameters:
-----------
scenario: Scenario
Scenario-file to be written
key: string
Name of the attribute in scenario-file
value: Any
Corresponding attribute
Returns:
--------
new value: string
The altered value, to be written to file
Sideeffects:
------------
- copies files pcs_fn, train_inst_fn, test_inst_fn and feature_fn to
output if possible, creates the files from attributes otherwise
"""
if key in ['pcs_fn', 'train_inst_fn', 'test_inst_fn', 'feature_fn']:
# Copy if file exists, else write to new file
if value is not None and os.path.isfile(value):
try:
return shutil.copy(value, scenario.output_dir_for_this_run)
except shutil.SameFileError:
return value # File is already in output_dir
elif key == 'pcs_fn' and scenario.cs is not None:
new_path = os.path.join(scenario.output_dir_for_this_run, "configspace.pcs")
self.write_pcs_file(scenario.cs, new_path)
elif key == 'train_inst_fn' and scenario.train_insts != [None]:
new_path = os.path.join(scenario.output_dir_for_this_run, 'train_insts.txt')
self.write_inst_file(scenario.train_insts, new_path)
elif key == 'test_inst_fn' and scenario.test_insts != [None]:
new_path = os.path.join(scenario.output_dir_for_this_run, 'test_insts.txt')
self.write_inst_file(scenario.test_insts, new_path)
elif key == 'feature_fn' and scenario.feature_dict != {}:
new_path = os.path.join(scenario.output_dir_for_this_run, 'features.txt')
self.write_inst_features_file(scenario.n_features,
scenario.feature_dict, new_path)
else:
return None
# New value -> new path
return new_path
elif key == 'ta' and value is not None:
# Reversing the callback on 'ta' (shlex.split)
return " ".join(value)
elif key in ['train_insts', 'test_insts', 'cs', 'feature_dict']:
# No need to log, recreated from files
return None
else:
return value