def test_phrase_model_status(self, get_assigned_job):
from eea.corpus.processing.phrases.views import phrase_model_status
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
o_st = phrase_model_status.__globals__['CORPUS_STORAGE']
phrase_model_status.__globals__['CORPUS_STORAGE'] = base_path
req = Mock(matchdict={'phash_id': 'A'})
assert phrase_model_status(req) == {'status': 'OK'}
get_assigned_job.return_value = None
req = Mock(matchdict={'phash_id': 'X'})
assert phrase_model_status(req) == {'status': 'unavailable'}
job = Mock()
get_assigned_job.return_value = job
job.get_status.return_value = '_job_status_here_'
assert phrase_model_status(req) == {'status':
'preview__job_status_here_'}
phrase_model_status.__globals__['CORPUS_STORAGE'] = o_st
python类resource_filename()的实例源码
def test_build_pipeline_for_preview(self, upload_location):
from eea.corpus.processing import build_pipeline
from pkg_resources import resource_filename
file_name = 'test.csv'
upload_location.return_value = resource_filename(
'eea.corpus', 'tests/fixtures/test.csv')
text_column = 'text'
pipeline = [
('eea_corpus_processing_limit_process', 'ABC', {'max_count': 2})
]
stream = build_pipeline(file_name, text_column, pipeline,
preview_mode=True)
docs = list(stream)
assert len(docs) == 2
def dashboard(global_config, **settings):
""" WSGI entry point for the Flask app RQ Dashboard
"""
redis_uri = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
p = parse.urlparse(redis_uri)
host, port = p.netloc.split(':')
db = len(p.path) > 1 and p.path[1:] or '0'
redis_settings = {
'REDIS_URL': redis_uri,
'REDIS_DB': db,
'REDIS_HOST': host,
'REDIS_PORT': port,
}
app = Flask(__name__,
static_url_path="/static",
static_folder=resource_filename("rq_dashboard", "static")
)
app.config.from_object(rq_dashboard.default_settings)
app.config.update(redis_settings)
app.register_blueprint(rq_dashboard.blueprint)
return app.wsgi_app
def example_audio_file():
"""Get path of audio file.
Returns:
str: Path of the example audio file.
See also:
:func:`example_label_file`
Examples:
>>> from nnmnkwii.util import example_audio_file
>>> from scipy.io import wavfile
>>> fs, x = wavfile.read(example_audio_file())
"""
name = "arctic_a0009"
wav_path = pkg_resources.resource_filename(
__name__, '_example_data/{}.wav'.format(name))
return wav_path
def example_question_file():
"""Get path of example question file.
The question file was taken from Merlin_.
.. _Merlin: https://github.com/CSTR-Edinburgh/merlin
Returns:
str: Path of the example audio file.
Examples:
>>> from nnmnkwii.util import example_question_file
>>> from nnmnkwii.io import hts
>>> binary_dict, continuous_dict = hts.load_question_set(example_question_file())
"""
return pkg_resources.resource_filename(
__name__, '_example_data/questions-radio_dnn_416.hed')
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def __init__(self, weights_path=None,
vocab_path=None):
if weights_path is None:
weights_path = resource_filename(__name__,
'reactionrnn_weights.hdf5')
if vocab_path is None:
vocab_path = resource_filename(__name__,
'reactionrnn_vocab.json')
with open(vocab_path, 'r') as json_file:
self.vocab = json.load(json_file)
self.tokenizer = Tokenizer(filters='', char_level=True)
self.tokenizer.word_index = self.vocab
self.num_classes = len(self.vocab) + 1
self.model = reactionrnn_model(weights_path, self.num_classes)
self.model_enc = Model(inputs=self.model.input,
outputs=self.model.get_layer('rnn').output)
def main(argv=None):
args = docopt(__doc__, argv=argv, version='1.0.3')
assert 'config.toml' in (p.name for p in Path().iterdir()), "config.toml not found in directory. Are you sure you're in the project's root?"
if args['--init']:
notebooks_dir = Path('./notebooks/')
notebooks_dir.mkdir(exist_ok=True)
with open(resource_filename('hugo_jupyter', '__fabfile.py')) as fp:
fabfile = Path('fabfile.py')
fabfile.write_text(fp.read())
print(dedent("""
Successfully initialized. From this directory, the following commands are available.
Just remember to prepend them with `fab`
"""))
run(('fab', '-l'))
def test_copy_config(self):
tempdir = tempfile.mkdtemp()
server_root = pkg_resources.resource_filename(__name__, "testdata")
letshelp_le_apache.copy_config(server_root, tempdir)
temp_testdata = os.path.join(tempdir, "testdata")
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_PASSWD_FILE))))
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_KEY_FILE))))
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_SECRET_FILE))))
self.assertTrue(os.path.exists(os.path.join(
temp_testdata, _PARTIAL_CONF_PATH)))
self.assertTrue(os.path.exists(os.path.join(
temp_testdata, _PARTIAL_LINK_PATH)))
def pkg_file(path):
return pkg_resources.resource_filename(
pkg_resources.Requirement.parse("docker-utils-aba"),
os.path.join("docker_utils_aba", path))
def __init__(self, *args):
self.log = logging.getLogger(resource_filename(__name__, __file__))
self.cluster = Cluster(args[0])
self.connection = self.cluster.connect()
self.connection.row_factory = tuple_factory
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \
name text PRIMARY KEY, \
n text, \
e text, \
secret text);")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \
id uuid PRIMARY KEY, \
owner text, \
package text, \
template blob, \
example blob);")
def __init__(self, *args):
self.log = logging.getLogger(resource_filename(__name__, __file__))
self.engine = create_engine(args[0])
self.connection = self.engine.connect()
def __init__(self, round_number):
self.round_number = round_number
self.train_file_name = 'r' + str(round_number) + '_numerai_training_data.csv'
self.test_file_name = 'r' + str(round_number) + '_numerai_tournament_data.csv'
self.sorted_file_name = 'r' + str(round_number) + '_numerai_sorted_training_data.csv'
if not os.path.exists(resource_filename('numerai.data', self.train_file_name)):
raise IOError('File {} not found.'.format(self.train_file_name))
if not os.path.exists(resource_filename('numerai.data', self.test_file_name)):
raise IOError('File {} not found.'.format(self.test_file_name))
def training_set(self):
return pd.read_csv(resource_filename('numerai.data', self.train_file_name))
def test_set(self):
return pd.read_csv(resource_filename('numerai.data', self.test_file_name))
def sorted_training_set(self):
return pd.read_csv(resource_filename('numerai.data', self.sorted_file_name))
def teardown_module():
"""Delete eggs/wheels created by tests."""
base = pkg_resources.resource_filename('wheel.test', '')
for dist in test_distributions:
for subdir in ('build', 'dist'):
try:
rmtree(os.path.join(base, dist, subdir))
except OSError:
pass
def build_wheel():
"""Build wheels from test distributions."""
for dist in test_distributions:
pwd = os.path.abspath(os.curdir)
distdir = pkg_resources.resource_filename('wheel.test', dist)
os.chdir(distdir)
try:
sys.argv = ['', 'bdist_wheel']
exec(compile(open('setup.py').read(), 'setup.py', 'exec'))
finally:
os.chdir(pwd)
def build_egg():
"""Build eggs from test distributions."""
for dist in test_distributions:
pwd = os.path.abspath(os.curdir)
distdir = pkg_resources.resource_filename('wheel.test', dist)
os.chdir(distdir)
try:
sys.argv = ['', 'bdist_egg']
exec(compile(open('setup.py').read(), 'setup.py', 'exec'))
finally:
os.chdir(pwd)
def test_egg_re():
"""Make sure egg_info_re matches."""
egg_names = open(pkg_resources.resource_filename('wheel', 'eggnames.txt'))
for line in egg_names:
line = line.strip()
if not line:
continue
assert egg2wheel.egg_info_re.match(line), line