def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
python类copyfile()的实例源码
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def create_params_file(self, fname):
msg = QMessageBox()
msg.setIcon(QMessageBox.Question)
msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to "
"create it for you?" % fname)
msg.setWindowTitle("Generate parameter file?")
msg.setInformativeText("This will create a parameter file from a "
"template file and open it in your system's "
"standard text editor. Fill properly before "
"launching the code. See the documentation "
"for details")
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
answer = msg.exec_()
if answer == QMessageBox.Yes:
user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus')
if os.path.exists(user_path + 'config.params'):
config_file = os.path.abspath(user_path + 'config.params')
else:
config_file = os.path.abspath(
pkg_resources.resource_filename('circus', 'config.params'))
shutil.copyfile(config_file, fname)
self.params = fname
self.last_log_file = fname.replace('.params', '.log')
self.update_params()
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def merge(bed1, bed2, bedOut):
if not bed2:
shutil.copyfile(bed1,bedOut)
return
with open(bed1) as f:
bed_dict1 = tk_io.get_target_regions(f)
with open(bed2) as f:
bed_dict2 = tk_io.get_target_regions(f)
for chrom in bed_dict2:
for start, end in bed_dict2[chrom]:
if chrom not in bed_dict1:
bed_dict1[chrom]=tk_regions.Regions([])
bed_dict1[chrom].add_region((start, end))
writeOut(bed_dict1, bedOut)
def intersect(bed1, bed2, bedOut):
if not bed2:
shutil.copyfile(bed1,bedOut)
return
with open(bed1) as f:
bed_dict1 = tk_io.get_target_regions(f)
with open(bed2) as f:
bed_dict2 = tk_io.get_target_regions(f)
all_common_chroms = [chrom for chrom in bed_dict1.keys() if chrom in bed_dict2]
bed_dict_intersect ={}
for chrom in all_common_chroms:
bed_dict_intersect[chrom] = bed_dict1[chrom].intersect(bed_dict2[chrom])
writeOut(bed_dict_intersect, bedOut)
def overlap(bed1, bed2, bedOut):
if not bed2:
shutil.copyfile(bed1,bedOut)
return
with open(bed1) as f:
bed_dict1 = tk_io.get_target_regions(f)
with open(bed2) as f:
bed_dict2 = tk_io.get_target_regions(f)
bed_dict_overlap = {}
for chrom in bed_dict1:
if not chrom in bed_dict_overlap:
bed_dict_overlap[chrom] = tk_regions.Regions([])
for start, end in bed_dict1[chrom]:
if chrom in bed_dict2 and \
bed_dict2[chrom].overlaps_region(start, end):
bed_dict_overlap[chrom].add_region((start,end))
writeOut(bed_dict_overlap, bedOut)
def no_overlap(bed1, bed2, bedOut):
if not bed2:
shutil.copyfile(bed1,bedOut)
return
with open(bed1) as f:
bed_dict1 = tk_io.get_target_regions(f)
with open(bed2) as f:
bed_dict2 = tk_io.get_target_regions(f)
bed_dict_no_overlap = {}
for chrom in bed_dict1:
if not chrom in bed_dict_no_overlap:
bed_dict_no_overlap[chrom] = tk_regions.Regions([])
for start, end in bed_dict1[chrom]:
if not chrom in bed_dict2 or \
not bed_dict2[chrom].overlaps_region(start, end):
bed_dict_no_overlap[chrom].add_region((start,end))
writeOut(bed_dict_no_overlap, bedOut)
def subtract(bed1, bed2, bedOut):
if not bed2:
shutil.copyfile(bed1,bedOut)
return
with open(bed1) as f:
bed_dict1 = tk_io.get_target_regions(f)
with open(bed2) as f:
bed_dict2 = tk_io.get_target_regions(f)
bed_dict_subtract = {}
for chrom in bed_dict1:
if not chrom in bed_dict_subtract:
bed_dict_subtract[chrom] = tk_regions.Regions([])
for start, end in bed_dict1[chrom]:
overlappings = []
if chrom in bed_dict2:
overlappings = bed_dict2[chrom].overlapping_regions(start, end)
for interval in interval_subtract(start, end, overlappings):
bed_dict_subtract[chrom].add_region(interval)
writeOut(bed_dict_subtract, bedOut)
def check():
"""Check composition."""
env = _get_vars("$HOME/.config/epiphyte/env")
if os.path.exists(FILE_NAME):
shutil.copyfile(FILE_NAME, PREV_FILE)
compose(env)
if os.path.exists(FILE_NAME):
print(get_file_hash(FILE_NAME))
output = None
with open(FILE_NAME, 'r') as f:
j = json.loads(f.read())
output = json.dumps(j,
sort_keys=True,
indent=4,
separators=(',', ': '))
with open(FILE_NAME, 'w') as f:
f.write(output)
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
"""Reformats sourcefile according to configfile and writes it to destfile.
This method is only used for testing.
"""
tmpdir = tempfile.mkdtemp(prefix='whatstyle_')
cfg = os.path.join(tmpdir, self.configfilename)
copyfile(configfile, cfg)
tmpfilename = os.path.join(tmpdir, os.path.basename(sourcefile))
copyfile(sourcefile, tmpfilename)
cmdargs = [tmpfilename]
exeresult = run_executable(self.exe, cmdargs)
writebinary(destfile, exeresult.stdout)
os.remove(tmpfilename)
os.remove(cfg)
os.rmdir(tmpdir)
def process_extract(extract):
extract_file = os.path.join(target_dir, extract.extract + '.mbtiles')
print('Create extract {}'.format(extract_file))
# Instead of patching copy over the patch source as target and
# write directly to it (since that works concurrently).
patch_src = args['--patch-from']
if patch_src:
print('Use patch from {} as base'.format(patch_src))
shutil.copyfile(patch_src, extract_file)
try:
create_extract(extract, source_file, extract_file)
except subprocess.CalledProcessError as e:
# Failing extracts should not interrupt
# the entire process
print(e, file=sys.stderr)
return
print('Update metadata {}'.format(extract_file))
update_metadata(extract_file, extract.metadata(extract_file))
def _new_group(self, id_group, nbClusters):
# generate filenames
fetfilename = os.path.join(self.filename,
self.basename + ('.fet.%d' % id_group))
clufilename = os.path.join(self.filename,
self.basename + ('.clu.%d' % id_group))
# back up before overwriting
if os.path.exists(fetfilename):
shutil.copyfile(fetfilename, fetfilename + '~')
if os.path.exists(clufilename):
shutil.copyfile(clufilename, clufilename + '~')
# create file handles
self._fetfilehandles[id_group] = file(fetfilename, 'w')
self._clufilehandles[id_group] = file(clufilename, 'w')
# write out first line
#self._fetfilehandles[id_group].write("0\n") # Number of features
self._clufilehandles[id_group].write("%d\n" % nbClusters)
def _new_group(self, id_group, nbClusters):
# generate filenames
fetfilename = os.path.join(self.filename,
self.basename + ('.fet.%d' % id_group))
clufilename = os.path.join(self.filename,
self.basename + ('.clu.%d' % id_group))
# back up before overwriting
if os.path.exists(fetfilename):
shutil.copyfile(fetfilename, fetfilename + '~')
if os.path.exists(clufilename):
shutil.copyfile(clufilename, clufilename + '~')
# create file handles
self._fetfilehandles[id_group] = file(fetfilename, 'w')
self._clufilehandles[id_group] = file(clufilename, 'w')
# write out first line
#self._fetfilehandles[id_group].write("0\n") # Number of features
self._clufilehandles[id_group].write("%d\n" % nbClusters)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % (filename,))
paths = {filename:('',extract_dir)}
for base, dirs, files in os.walk(filename):
src,dst = paths[base]
for d in dirs:
paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
for f in files:
name = src+f
target = os.path.join(dst,f)
target = progress_filter(src+f, target)
if not target:
continue # skip non-files
ensure_directory(target)
f = os.path.join(base,f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def init(self):
"""Check home directory"""
if not ConfigHandler.exists():
ColorPrint.print_info(message="Default configuration initialized: " + str(StateHolder.config_file))
if not os.path.exists(StateHolder.home_dir):
os.mkdir(StateHolder.home_dir)
if not os.path.exists(StateHolder.config_file):
src_file = os.path.join(os.path.dirname(__file__), 'resources/config')
shutil.copyfile(src=src_file, dst=StateHolder.config_file)
StateHolder.config_parsed = False
self.read()
'''Check file type catalog'''
for config in self.config:
conf = self.config[config]
if type(conf) is not dict:
continue
if conf.get("repositoryType", "file") is "file":
FileUtils.make_empty_file_with_empty_dict(directory=StateHolder.home_dir,
file=conf.get('file', 'poco-catalog.yml'))
def generate_test_list(self, verifier_repo_dir):
logger.debug("Generating test case list...")
if self.MODE == 'defcore':
shutil.copyfile(
conf_utils.TEMPEST_DEFCORE, conf_utils.TEMPEST_RAW_LIST)
elif self.MODE == 'custom':
if os.path.isfile(conf_utils.TEMPEST_CUSTOM):
shutil.copyfile(
conf_utils.TEMPEST_CUSTOM, conf_utils.TEMPEST_RAW_LIST)
else:
raise Exception("Tempest test list file %s NOT found."
% conf_utils.TEMPEST_CUSTOM)
else:
if self.MODE == 'smoke':
testr_mode = "smoke"
elif self.MODE == 'full':
testr_mode = ""
else:
testr_mode = 'tempest.api.' + self.MODE
cmd = ("cd {0};"
"testr list-tests {1} > {2};"
"cd -;".format(verifier_repo_dir,
testr_mode,
conf_utils.TEMPEST_RAW_LIST))
ft_utils.execute_command(cmd)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def copy_dir_contents_with_overwrite(input_dir_name, output_dir_name):
"""Copy the contents of a directory into another, overwriting files if they
exist."""
# if output_dir_name isn't a location, make it so.
if not os.path.exists(output_dir_name):
os.makedirs(output_dir_name)
dir_entries = os.listdir(input_dir_name)
for dir_entry in dir_entries:
input_path = os.path.join(input_dir_name, dir_entry)
output_path = os.path.join(output_dir_name, dir_entry)
if os.path.isdir(input_path):
copy_dir_contents_with_overwrite(input_path, output_path)
else:
shutil.copyfile(input_path, output_path)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def copy_file(self, infile, outfile, check=True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info('Copying %s to %s', infile, outfile)
if not self.dry_run:
msg = None
if check:
if os.path.islink(outfile):
msg = '%s is a symlink' % outfile
elif os.path.exists(outfile) and not os.path.isfile(outfile):
msg = '%s is a non-regular file' % outfile
if msg:
raise ValueError(msg + ' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def copy_images_for_classification():
ground_truth_dates = pickle.load(open(data_dir + 'ground_truth_dates.pickle', "rb"))
ground_truth_dates = sorted(ground_truth_dates, key=lambda x: x[3], reverse=False)
if not os.path.exists(classify_dir):
os.mkdir(classify_dir)
for seed_id, coin_id, result, labeled_date, bad_angle, bad_image in ground_truth_dates:
if labeled_date < 1900:
continue
dir = crop_dir + str(coin_id / 100) + '/'
new_dir = classify_dir + str(labeled_date) + '/'
if not os.path.exists(new_dir):
os.mkdir(new_dir)
for image_id in range(0,57):
filename = str(coin_id).zfill(5) + str(image_id).zfill(2) + '.png'
old_filename = dir + filename
new_filename = new_dir + filename
shutil.copyfile(old_filename,new_filename)
def create_single_lmdb(seed_image_id, filedata, test_id, multi_image_training=False, images_per_angle=500,
retraining=False):
start_time = time.time()
print 'create_single_lmdb for ' + str(seed_image_id)
if retraining:
weight_filename = 'snapshot_iter_16880.caffemodel'
shutil.copyfile(train_dir + str(seed_image_id) + '/' + weight_filename, train_dir + weight_filename)
else:
weight_filename = 'starting-weights.caffemodel'
shutil.copyfile(weight_filename, train_dir + weight_filename)
lmdb_dir = train_dir + str(seed_image_id) + '/'
create_lmdb_rotate_whole_image.create_lmdbs(filedata, lmdb_dir, images_per_angle, -1, True, False)
copy_train_files(lmdb_dir, multi_image_training)
create_train_script(lmdb_dir, train_dir + weight_filename, multi_image_training)
print 'Done in %s seconds' % (time.time() - start_time,)