def declaration_path(name):
"""Return the path to an included declaration"""
from os.path import dirname, join, exists
import metatabdecl
from metatab.exc import IncludeError
d = dirname(metatabdecl.__file__)
path = join(d, name)
if not exists(path):
path = join(d, name + '.csv')
if not exists(path):
raise IncludeError("No local declaration file for name '{}' ".format(name))
return path
# From http://stackoverflow.com/a/295466
python类exists()的实例源码
def make_dir_structure(base_dir):
"""Make the build directory structure. """
def maybe_makedir(*args):
p = join(base_dir, *args)
if exists(p) and not isdir(p):
raise IOError("File '{}' exists but is not a directory ".format(p))
if not exists(p):
makedirs(p)
maybe_makedir(DOWNLOAD_DIR)
maybe_makedir(PACKAGE_DIR)
maybe_makedir(OLD_DIR)
def check_atlas(atlas):
"""Validation of atlas input."""
# when its a name for pre-defined atlas
if isinstance(atlas, str):
if not pexists(atlas): # just a name
atlas = atlas.lower()
if atlas not in parcellate.atlas_list:
raise ValueError('Invalid choice of atlas. Accepted : {}'.format(parcellate.atlas_list))
elif os.path.isdir(atlas): # cortical atlas in Freesurfer org
if not parcellate.check_atlas_annot_exist(atlas):
raise ValueError('Given atlas folder does not contain Freesurfer label annot files. '
'Needed : given_atlas_dir/label/?h.aparc.annot')
elif pexists(atlas): # may be a volumetric atlas?
try:
atlas = nibabel.load(atlas)
except:
traceback.print_exc()
raise ValueError('Unable to read the provided image volume. Must be a nifti 2d volume, readable by nibabel.')
else:
raise ValueError('Unable to decipher or use the given atlas.')
else:
raise NotImplementedError('Atlas must be a string, providing a name or path to Freesurfer folder or a 3D nifti volume.')
return atlas
def make_output_path_graph(out_dir, subject, str_prefixes):
"Constructs path to save a multigraph to disk."
if out_dir is not None:
# get outpath returned from hiwenet, based on dist name and all other parameters
# choose out_dir name based on dist name and all other parameters
out_subject_dir = pjoin(out_dir, subject)
if not pexists(out_subject_dir):
os.mkdir(out_subject_dir)
if isinstance(str_prefixes, str):
str_prefixes = [str_prefixes, ]
out_file_name = '{}_graynet.graphml'.format('_'.join(str_prefixes))
out_weights_path = pjoin(out_subject_dir, out_file_name)
else:
out_weights_path = None
return out_weights_path
def check_subjects(subjects_info):
"Ensure subjects are provided and their data exist."
if isinstance(subjects_info, str):
if not pexists(subjects_info):
raise IOError('path to subject list does not exist: {}'.format(subjects_info))
subjects_list = np.genfromtxt(subjects_info, dtype=str)
elif isinstance(subjects_info, collections.Iterable):
if len(subjects_info) < 1:
raise ValueError('Empty subject list.')
subjects_list = subjects_info
else:
raise ValueError('Invalid value provided for subject list. \n '
'Must be a list of paths, or path to file containing list of paths, one for each subject.')
subject_id_list = np.atleast_1d(subjects_list)
num_subjects = subject_id_list.size
if num_subjects < 1:
raise ValueError('Input subject list is empty.')
num_digits_id_size = len(str(num_subjects))
max_id_width = max(map(len, subject_id_list))
return subject_id_list, num_subjects, max_id_width, num_digits_id_size
def check_params_single_edge(base_features, in_dir, atlas, smoothing_param, node_size, out_dir, return_results):
""""""
check_features(base_features)
if atlas.lower() not in parcellate.atlas_list:
raise ValueError('Invalid atlas choice. Use one of {}'.format(parcellate.atlas_list))
if not pexists(in_dir):
raise IOError('Input directory at {} does not exist.'.format(in_dir))
if out_dir is None and return_results is False:
raise ValueError('Results are neither saved to disk or being received when returned.\n'
'Specify out_dir (not None) or make return_results=True')
if out_dir is not None and not pexists(out_dir):
os.mkdir(out_dir)
# no checks on subdivison size yet, as its not implemented
return
def check_params_multiedge(base_feature_list, input_dir, atlas, smoothing_param,
node_size, out_dir, return_results):
"""Validation of parameters and appropriate type casting if necessary."""
check_features(base_feature_list)
if atlas.lower() not in parcellate.atlas_list:
raise ValueError('Invalid atlas choice. Use one of {}'.format(parcellate.atlas_list))
if not pexists(input_dir):
raise IOError('Input directory at {} does not exist.'.format(input_dir))
if out_dir is None and return_results is False:
raise ValueError('Results are neither saved to disk or being received when returned.\n'
'Specify out_dir (not None) or make return_results=True')
if out_dir is not None and not pexists(out_dir):
os.mkdir(out_dir)
# no checks on subdivison size yet, as its not implemented
return
def add_image(self, base_path, file):
if not os.path.exists(file):
file = os.path.join(base_path, file)
texture = Image(source=file).texture
print('loading', file)
texture.mag_filter = 'nearest'
if texture is None:
sys.exit('failed to locate image file %r' % file)
id = self.firstgid
th = self.tile_height + self.spacing
tw = self.tile_width + self.spacing
for j in range(texture.height // th):
for i in range(texture.width // tw):
x = (i * tw) + self.margin
# convert the y coordinate to OpenGL (0 at bottom of texture)
y = texture.height - ((j + 1) * th)
tile = texture.get_region(x, y, self.tile_width, self.tile_height)
self.tiles.append(Tile(id, tile, self))
id += 1
def save(config, is_system=False):
"""
Save configuration ``config``.
"""
ConfigurationManager.update(config)
location = SYSTEM_CONFIG if is_system else RUNTIME_CONFIG
try:
LOG.info("Saving config: " + location)
dir = location.replace("/jarbas_runtime.conf", "").replace(
"/jarbas.conf", "")
if not exists(dir):
mkdir(dir)
try:
loc_config = load_commented_json(location)
except:
loc_config = {}
with open(location, 'w') as f:
config = loc_config.update(config)
json.dump(config, f)
except Exception as e:
LOG.error(e)
def new_metatab_file(mt_file, template):
template = template if template else 'metatab'
if not exists(mt_file):
doc = make_metatab_file(template)
doc.write_csv(mt_file)
return True
else:
return False
def ensure_dir(path):
if path and not exists(path):
makedirs(path)
def get_eval_dir(self):
answer_dir = join(self.dir, "answers")
if not exists(answer_dir):
os.mkdir(answer_dir)
return answer_dir
def get_best_weights(self):
if exists(self.best_weight_dir):
return tf.train.latest_checkpoint(self.best_weight_dir)
return None
def restore_checkpoint(self, sess, var_list=None, load_ema=True):
"""
Restores either the best weights or the most recent checkpoint, assuming the correct
variables have already been added to the tf default graph e.g., .get_prediction()
has been called the model stored in `self`.
Automatically detects if EMA weights exists, and if they do loads them instead
"""
checkpoint = self.get_best_weights()
if checkpoint is None:
print("Loading most recent checkpoint")
checkpoint = self.get_latest_checkpoint()
else:
print("Loading best weights")
if load_ema:
if var_list is None:
# Same default used by `Saver`
var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) + \
tf.get_collection(tf.GraphKeys.SAVEABLE_OBJECTS)
# Automatically check if there are EMA variables, if so use those
reader = tf.train.NewCheckpointReader(checkpoint)
ema = tf.train.ExponentialMovingAverage(0)
ema_names = {ema.average_name(x): x for x in var_list
if reader.has_tensor(ema.average_name(x))}
if len(ema_names) > 0:
print("Found EMA weights, loading them")
ema_vars = set(x for x in ema_names.values())
var_list = {v.op.name: v for v in var_list if v not in ema_vars}
var_list.update(ema_names)
saver = tf.train.Saver(var_list)
saver.restore(sess, checkpoint)
def reseed(self):
# this will only work if app has permission to modify seed.db, for example NOT IOS.
# PS) hopefully it's obvious that this will only effect new games based off of new seed.db.
# all_tmx_files_to_db will just update seed.db if it exists, but for now, just wipe the slate clean to be sure
if exists('seed.db'):
print('deleting seed.db, existing...')
os.remove('seed.db')
all_tmx_files_to_db()
Notification(message='Reseed complete!').open()
def fromxml(cls, tag, tilemap, firstgid=None):
if 'source' in tag.attrib:
firstgid = int(tag.attrib['firstgid'])
path = tag.attrib['source']
if not os.path.exists(path):
path = os.path.join(tilemap.file_path, path)
with open(path) as f:
tileset = ElementTree.fromstring(f.read())
return cls.fromxml(tileset, firstgid)
name = tag.attrib['name']
if firstgid is None:
firstgid = int(tag.attrib['firstgid'])
tile_width = int(tag.attrib['tilewidth'])
tile_height = int(tag.attrib['tileheight'])
spacing = int(tag.get('spacing', 0))
margin = int(tag.get('margin', 0))
tileset = cls(name, tile_width, tile_height, firstgid,
spacing, margin)
for c in tag.getchildren():
if c.tag == "image":
# create a tileset
tileset.add_image(tilemap.file_path, c.attrib['source'])
elif c.tag == 'tile':
gid = tileset.firstgid + int(c.attrib['id'])
tileset.get_tile(gid).loadxml(c)
return tileset
def _create_if_not_exists(self):
if exists(self._file_path):
return
dist_file = join(dirname(__file__), '..', 'resources', 'config-dist.json')
copy(dist_file, self._file_path)
def __load(config, location):
if exists(location) and isfile(location):
try:
ConfigurationLoader.merge_conf(
config, load_commented_json(location))
LOG.debug("Configuration '%s' loaded" % location)
except Exception, e:
LOG.error("Error loading configuration '%s'" % location)
LOG.error(repr(e))
else:
LOG.debug("Configuration '%s' not found" % location)
return config
def download(self, url):
working_dir = self.working_dir if self.working_dir else ''
r = Resource()
# For local files, don't download, just reference in place.
if url.scheme == 'file':
r.cache_path = Url(url.resource_url).path
r.download_time = None
# Many places the file may exist
locations = { # What a mess ...
abspath(r.cache_path),
abspath(r.cache_path.lstrip('/')),
abspath(join(working_dir, r.cache_path)),
abspath(r.cache_path.lstrip('/'))
}
for l in locations:
if exists(l):
r.sys_path = l
break
else:
raise DownloadError(("File resource does not exist. Found none of:"
"\n{}\n\nWorking dir = {}\ncache_path={}\nspec_path={}")
.format('\n'.join(locations), working_dir, r.cache_path, url.path))
else:
# Not a local file, so actually need to download it.
try:
r.cache_path, r.download_time = self._download_with_lock(url.resource_url)
except AccessError as e:
# Try again, using a URL that we may have configured an account for. This is
# primarily S3 urls, with Boto or AWS credential
try:
r.cache_path, r.download_time = self._download_with_lock(url.auth_resource_url)
except AttributeError:
raise e
r.sys_path = self.cache.getsyspath(r.cache_path)
return r
def _download(self, url, cache_path):
import requests
def copy_callback(read, total):
if self.callback:
self.callback('copy_file', read, total)
if self.callback:
self.callback('download', url, 0)
if url.startswith('s3:'):
from appurl.url import Url
s3url = Url(url)
try:
with self.cache.open(cache_path, 'wb') as f:
s3url.object.download_fileobj(f)
except Exception as e:
raise DownloadError("Failed to fetch S3 url '{}': {}".format(url, e))
elif url.startswith('ftp:'):
from contextlib import closing
with closing(urlopen(url)) as fin:
with self.cache.open(cache_path, 'wb') as fout:
read_len = 16 * 1024
total_len = 0
while 1:
buf = fin.read(read_len)
if not buf:
break
fout.write(buf)
total_len += len(buf)
if self.callback:
copy_callback(len(buf), total_len)
else:
try:
r = requests.get(url, stream=True)
r.raise_for_status()
except SSLError as e:
raise DownloadError("Failed to GET {}: {} ".format(url, e))
# Requests will auto decode gzip responses, but not when streaming. This following
# monkey patch is recommended by a core developer at
# https://github.com/kennethreitz/requests/issues/2155
if r.headers.get('content-encoding') == 'gzip':
r.raw.read = functools.partial(r.raw.read, decode_content=True)
with self.cache.open(cache_path, 'wb') as f:
copy_file_or_flo(r.raw, f, cb=copy_callback)
assert self.cache.exists(cache_path)