def _load_sklearn_default_classifier():
if sys.version_info[0] == 2:
file_name = "sklearn_classifier_py2.pklz"
protocol = 2
else:
file_name = "sklearn_classifier_py3.pklz"
protocol = 3
file_path = resource_filename('sudokuextract.data', file_name)
if resource_exists('sudokuextract.data', file_name):
f = gzip.open(file_path, 'rb')
classifier = pickle.load(f)
f.close()
else:
classifier = KNeighborsClassifier(n_neighbors=10)
classifier = fit_combined_classifier(classifier)
f = gzip.open(file_path, 'wb')
pickle.dump(classifier, f, protocol=protocol)
f.close()
return classifier
python类resource_exists()的实例源码
def _load_sudokuextract_default_classifier():
file_name = "sudokuextract_classifier.pklz"
protocol = 2
file_path = resource_filename('sudokuextract.data', file_name)
if resource_exists('sudokuextract.data', file_name):
f = gzip.open(file_path, 'rb')
classifier_json = pickle.load(f)
classifier = KNeighborsClassifier(classifier_json.get('n_neighbors'),
classifier_json.get('weights'),
classifier_json.get('metric'),
classifier_json.get('p'))
classifier.fit(np.array(classifier_json.get('data')),
np.array(classifier_json.get('labels')))
f.close()
else:
classifier = KNeighborsClassifier(n_neighbors=10)
classifier = fit_combined_classifier(classifier)
f = gzip.open(file_path, 'wb')
pickle.dump(classifier.to_json(), f, protocol=protocol)
f.close()
return classifier
def validate_absolute_path(self, root, absolute_path):
"""
An override of
:meth:`tornado.web.StaticFileHandler.validate_absolute_path`;
Validate and returns the given *absolute_path* using `pkg_resources`
if ``self.use_pkg`` is set otherwise performs a normal filesystem
validation.
"""
# We have to generate the real absolute path in this method since the
# Tornado devs--for whatever reason--decided that get_absolute_path()
# must be a classmethod (we need access to self.use_pkg).
if self.use_pkg:
if not resource_exists(self.use_pkg, absolute_path):
raise HTTPError(404)
return resource_filename(self.use_pkg, absolute_path)
return super(
StaticHandler, self).validate_absolute_path(root, absolute_path)
def test_resource_api(easter_fixture):
test_file = NamedTemporaryFile(mode='wb+')
dirname, file_name = os.path.split(test_file.name)
easter_fixture.stub_egg.location = dirname
easter_fixture.stub_egg.activate()
assert pkg_resources.resource_exists(easter_fixture.stub_egg.as_requirement(), file_name)
assert not pkg_resources.resource_exists(easter_fixture.stub_egg.as_requirement(), 'IDoNotExist')
contents = b'asdd '
test_file.write(contents)
test_file.flush()
as_string = pkg_resources.resource_string(easter_fixture.stub_egg.as_requirement(), file_name)
assert as_string == contents
as_file = pkg_resources.resource_stream(easter_fixture.stub_egg.as_requirement(), file_name)
assert as_file.read() == contents
def resource_exists(package_or_requirement, resource_name):
return False
def getResource(identifier, pkgname=__name__):
"""
Acquire a readable object for a given package name and identifier.
An IOError will be raised if the resource can not be found.
For example:
mydata = getResource('mypkgdata.jpg').read()
Note that the package name must be fully qualified, if given, such
that it would be found in sys.modules.
In some cases, getResource will return a real file object. In that
case, it may be useful to use its name attribute to get the path
rather than use it as a file-like object. For example, you may
be handing data off to a C API.
"""
if resource_exists(pkgname, identifier):
return resource_stream(pkgname, identifier)
mod = sys.modules[pkgname]
fn = getattr(mod, '__file__', None)
if fn is None:
raise IOError("%s has no __file__!" % repr(mod))
path = os.path.join(os.path.dirname(fn), identifier)
if sys.version_info < (3, 3):
loader = getattr(mod, '__loader__', None)
if loader is not None:
try:
data = loader.get_data(path)
except IOError:
pass
else:
return BytesIO(data)
return open(os.path.normpath(path), 'rb')
def _mnist_raw_data():
fname = resource_filename('sudokuextract.data', "train-images-idx3-ubyte.gz")
if resource_exists('sudokuextract.data', "train-images-idx3-ubyte.gz"):
f = gzip.open(fname, mode='rb')
data = f.read()
f.close()
else:
sio = StringIO(urlopen(_url_to_mnist_train_data).read())
sio.seek(0)
f = gzip.GzipFile(fileobj=sio, mode='rb')
data = f.read()
f.close()
try:
sio.seek(0)
with open(fname, 'wb') as f:
f.write(sio.read())
except:
pass
correct_magic_number = 2051
magic_number = _toS32(data[:4])
if magic_number != correct_magic_number:
raise ValueError("Error parsing images file. Read magic number {0} != {1}!".format(
magic_number, correct_magic_number))
n_images = _toS32(data[4:8])
n_rows = _toS32(data[8:12])
n_cols = _toS32(data[12:16])
images = np.fromstring(data[16:], 'uint8').reshape(n_images, n_rows*n_cols)
return [imrow.reshape(28, 28) for imrow in images]
def _mnist_raw_labels():
fname = resource_filename('sudokuextract.data', "train-labels-idx1-ubyte.gz")
if resource_exists('sudokuextract.data', "train-labels-idx1-ubyte.gz"):
f = gzip.open(fname, mode='rb')
data = f.read()
f.close()
else:
sio = StringIO(urlopen(_url_to_mnist_train_labels).read())
sio.seek(0)
f = gzip.GzipFile(fileobj=sio, mode='rb')
data = f.read()
f.close()
try:
sio.seek(0)
with open(fname, 'wb') as f:
f.write(sio.read())
except:
pass
correct_magic_number = 2049
magic_number = _toS32(data[:4])
if magic_number != correct_magic_number:
raise ValueError("Error parsing labels file. Read magic number {0} != {1}!".format(
magic_number, correct_magic_number))
n_labels = _toS32(data[4:8])
return np.fromstring(data[8:], 'uint8')
def _sudokuextract_data():
fname = resource_filename('sudokuextract.data', "se-train-data.gz")
if resource_exists('sudokuextract.data', "se-train-data.gz"):
f = gzip.open(fname, mode='rb')
data = np.load(f)
f.close()
else:
raise IOError("SudokuExtract Training data file was not present!")
return data
def _mnist_data():
fname = resource_filename('sudokuextract.data', "mnist-train-data.gz")
if resource_exists('sudokuextract.data', "mnist-train-data.gz"):
f = gzip.open(fname, mode='rb')
data = np.load(f)
f.close()
else:
raise IOError("MNIST Training data file was not present!")
return data
def _mnist_labels():
fname = resource_filename('sudokuextract.data', "mnist-train-labels.gz")
if resource_exists('sudokuextract.data', "mnist-train-labels.gz"):
f = gzip.open(fname, mode='rb')
data = np.load(f)
f.close()
else:
raise IOError("MNIST Training labels file was not present!")
return data
def _get_env(self, template):
tmppath = os.path.join(_DEFAULT_RES_PATH,
self._get_template_filename(template))
if resource_exists(__package__, tmppath):
return Environment(
loader=FileSystemLoader(resource_filename(__package__, _DEFAULT_RES_PATH)),
trim_blocks=True
)
else:
raise FileNotFoundError('No such template')
def getwords(wordlists):
"""return a list of all unique words in all specified files"""
words = []
for wl in wordlists:
if os.path.exists(wl):
words.extend(slurp(wl))
continue
rp = 'wordlists/' + wl
if pkg_resources.resource_exists(__name__, rp):
words.extend(pkg_resources.resource_string(
__name__, rp).decode('utf-8').splitlines())
continue
click.echo('cannot find word list "{}"'.format(wl))
return list(set(words))
def send_extra(self):
"""
Sends any extra JS/CSS files placed in Gate One's 'static/extra'
directory. Can be useful if you want to use Gate One's file
synchronization and caching capabilities in your app.
.. note::
You may have to create the 'static/extra' directory before putting
files in there.
"""
#extra_path = resource_filename('gateone', 'static/extra')
extra_path = os.path.join(getsettings('BASE_DIR'), 'static/extra')
#if not resource_exists('gateone', '/static/extra'):
#return # Nothing to do
if not os.path.exists(extra_path):
return # Nothing to do
#for f in resource_listdir('gateone', '/static/extra'):
#filepath = resource_filename('gateone', '/static/extra/%s' % f)
#if filepath.endswith('.js'):
#self.send_js(filepath, force=True)
#elif filepath.endswith('.css'):
#self.send_css(filepath, force=True)
for f in os.listdir(extra_path):
filepath = os.path.join(extra_path,f)
if filepath.endswith('.js'):
self.send_js(filepath, force=True)
elif filepath.endswith('.css'):
self.send_css(filepath, force=True)
def resource_exists(package_or_requirement, resource_name):
return False
def getResource(identifier, pkgname=__name__):
"""
Acquire a readable object for a given package name and identifier.
An IOError will be raised if the resource can not be found.
For example:
mydata = getResource('mypkgdata.jpg').read()
Note that the package name must be fully qualified, if given, such
that it would be found in sys.modules.
In some cases, getResource will return a real file object. In that
case, it may be useful to use its name attribute to get the path
rather than use it as a file-like object. For example, you may
be handing data off to a C API.
"""
if resource_exists(pkgname, identifier):
return resource_stream(pkgname, identifier)
mod = sys.modules[pkgname]
fn = getattr(mod, '__file__', None)
if fn is None:
raise IOError("%s has no __file__!" % repr(mod))
path = os.path.join(os.path.dirname(fn), identifier)
loader = getattr(mod, '__loader__', None)
if loader is not None:
try:
data = loader.get_data(path)
except IOError:
pass
else:
return BytesIO(data)
return open(os.path.normpath(path), 'rb')
def test_resource_api_from_module_name(easter_fixture):
test_file = NamedTemporaryFile(mode='wb+', suffix='.py')
dirname, file_name = os.path.split(test_file.name)
easter_fixture.stub_egg.location = dirname
easter_fixture.stub_egg.activate()
module_name = file_name.split('.')[0]
assert pkg_resources.resource_exists(module_name, '')
assert pkg_resources.resource_filename(module_name, '') == dirname
def exists(self):
try:
return pkg_resources.resource_exists(self.containing_package, self.relative_path.replace(os.sep, '/'))
except ImportError as ex:
if six.PY2 and str(ex).endswith(self.name):
return False
elif six.PY3 and ex.name == '.'.join([self.containing_package, self.name]):
return False
raise
def asset_exists(name: str) -> bool:
"""
Return True only if the desired asset is known to exist.
"""
if name:
return pkg_resources.resource_exists(__name__, name)
else:
return False # don't allow access to directory
def get_release_info() -> str:
"""
Read the release file and return all lines bunched into one comma-separated value.
Life's exciting. And short. But mostly exciting.
"""
if pkg_resources.resource_exists(__name__, "release.txt"):
content = pkg_resources.resource_string(__name__, "release.txt")
text = content.decode(errors='ignore').strip()
lines = [line.strip() for line in text.split('\n')]
release_info = ", ".join(lines)
else:
release_info = "Not available. Hint: release info will be created by upload_env.sh"
return "Release information: " + release_info
def parameters_from_yaml(name, input_key=None, expected_key=None):
package_name, resource_name = name.split('.', 1)
resources = []
if resource_isdir(package_name, resource_name):
resources.extend([resource_name + '/' + r
for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))])
elif resource_exists(package_name, resource_name + '.yml'):
resources.append(resource_name + '.yml')
elif resource_exists(package_name, resource_name + '.yaml'):
resources.append(resource_name + '.yaml')
if not resources:
raise RuntimeError('Not able to load any yaml file for {0}'.format(name))
parameters = []
for resource_name in resources:
with resource_stream(package_name, resource_name) as stream:
data = yaml.load(stream, Loader=serializer.YAMLLoader)
if input_key and expected_key:
parameters.append((data[expected_key], data[input_key]))
continue
for root_key, root_value in data.items():
if isinstance(root_value, Mapping):
for expected, data_input in root_value.items():
for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]:
parameters.append((root_key, expected, properties))
else:
for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]:
parameters.append((root_key, properties))
return parameters
def _validate_config_files(self):
"""
Validates the configuration files necessary for the application. An exception is thrown
if any of the required files are inaccessible.
"""
# Determine the module of the derived class
mod = self.__class__.__module__
# If the configuration directory exists in the library, create config files as necessary
# This check also provides backwards compatibility for projects that don't have the
# configuration files in the library.
if pkg_resources.resource_exists(mod, self.LIB_CONFIG_DIR):
# Create configuration directory if not found
if not os.access(self._config_dir, os.R_OK):
logger.info("Configuration directory '{0}' not found, creating...".format(self._config_dir))
os.makedirs(self._config_dir)
# Count of current configuration files
config_files_count = len([name for name in os.listdir(self._config_dir)
if os.path.isfile(os.path.join(self._config_dir, name))])
# Create configuration files if not found
files = pkg_resources.resource_listdir(mod, self.LIB_APP_CONFIG_DIR)
for f in files:
config_path = os.path.join(self._config_dir, f)
if not os.access(config_path, os.R_OK):
f_lower = f.lower()
# Copy configuration file. Only copy logging file if the directory was empty
if not(f_lower.endswith(".py") or f_lower.endswith(".pyc")) and \
(f_lower != Application.LOGGING_CONFIG_FILE or config_files_count == 0):
logger.info("Configuration file '{0}' not found, creating...".format(f))
shutil.copyfile(pkg_resources.resource_filename(
mod, self.LIB_APP_CONFIG_DIR + "/" + f), config_path)
if not os.access(self._dxlclient_config_path, os.R_OK):
raise Exception(
"Unable to access client configuration file: {0}".format(
self._dxlclient_config_path))
if not os.access(self._app_config_path, os.R_OK):
raise Exception(
"Unable to access application configuration file: {0}".format(
self._app_config_path))