def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
python类resource_filename()的实例源码
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def create_params_file(self, fname):
msg = QMessageBox()
msg.setIcon(QMessageBox.Question)
msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to "
"create it for you?" % fname)
msg.setWindowTitle("Generate parameter file?")
msg.setInformativeText("This will create a parameter file from a "
"template file and open it in your system's "
"standard text editor. Fill properly before "
"launching the code. See the documentation "
"for details")
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
answer = msg.exec_()
if answer == QMessageBox.Yes:
user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus')
if os.path.exists(user_path + 'config.params'):
config_file = os.path.abspath(user_path + 'config.params')
else:
config_file = os.path.abspath(
pkg_resources.resource_filename('circus', 'config.params'))
shutil.copyfile(config_file, fname)
self.params = fname
self.last_log_file = fname.replace('.params', '.log')
self.update_params()
def init_gui_layout(self):
gui_fname = pkg_resources.resource_filename('circus',
os.path.join('qt_GUI',
'qt_merge.ui'))
if comm.rank == 0:
self.ui = uic.loadUi(gui_fname, self)
# print dir(self.ui)
self.score_ax1 = self.ui.score_1.axes
self.score_ax2 = self.ui.score_2.axes
self.score_ax3 = self.ui.score_3.axes
self.waveforms_ax = self.ui.waveforms.axes
self.detail_ax = self.ui.detail.axes
self.data_ax = self.ui.data_overview.axes
self.current_order = self.ui.cmb_sorting.currentIndex()
self.mpl_toolbar = NavigationToolbar(self.ui.waveforms, None)
self.mpl_toolbar.pan()
self.ui.show()
else:
self.ui = None
def init_gui_layout(self):
gui_fname = pkg_resources.resource_filename('circus',
os.path.join('qt_GUI',
'qt_preview.ui'))
self.ui = uic.loadUi(gui_fname, self)
self.electrode_ax = self.ui.electrodes.axes
self.data_x = self.ui.raw_data.axes
if self.show_fit:
self.ui.btn_next.clicked.connect(self.increase_time)
self.ui.btn_prev.clicked.connect(self.decrease_time)
else:
self.ui.btn_next.setVisible(False)
self.ui.btn_prev.setVisible(False)
# Toolbar will not be displayed
self.mpl_toolbar = NavigationToolbar(self.ui.raw_data, None)
self.mpl_toolbar.pan()
self.ui.show()
def generate_pdf(card):
"""
Make a PDF from a card
:param card: dict from fetcher.py
:return: Binary PDF buffer
"""
from eclaire.base import SPECIAL_LABELS
pdf = FPDF('L', 'mm', (62, 140))
pdf.set_margins(2.8, 2.8, 2.8)
pdf.set_auto_page_break(False, margin=0)
pdf.add_page()
font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
pdf.add_font('Clairifont', fname=font, uni=True)
pdf.set_font('Clairifont', size=48)
pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')
qrcode = generate_qr_code(card.url)
qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
qrcode.save(qrcode_file)
pdf.image(qrcode_file, 118, 35, 20, 20)
os.unlink(qrcode_file)
# May we never speak of this again.
pdf.set_fill_color(255, 255, 255)
pdf.rect(0, 55, 140, 20, 'F')
pdf.set_font('Clairifont', '', 16)
pdf.set_y(-4)
labels = ', '.join([label.name for label in card.labels
if label.name not in SPECIAL_LABELS])
pdf.multi_cell(0, 0, labels, 0, 'R')
return pdf.output(dest='S')
def test_installation():
'''
Test the installation
'''
import pkg_resources
PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
import os.path
print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
print('Does example_1D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_1D.py'))
print('Does example_2D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_2D.py'))
for pkgname in ('reikna', 'pyopencl', 'pycuda'):
error_code = test_pkg(pkgname)
if 1 == error_code:
break
def test_installation():
'''
Test the installation
'''
import pkg_resources
PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
import os.path
print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
print('Does 1D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
print('Does 2D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
def _transfer_ping_script(self, ssh):
"""
Transfert vping script to VM.
Uses SCP to copy the ping script via the SSH client
:param ssh: the SSH client
:return:
"""
self.logger.info("Trying to transfer ping.sh")
scp = SCPClient(ssh.get_transport())
ping_script = pkg_resources.resource_filename(
'functest.opnfv_tests.openstack.vping', 'ping.sh')
try:
scp.put(ping_script, "~/")
except Exception: # pylint: disable=broad-except
self.logger.error("Cannot SCP the file '%s'", ping_script)
return False
cmd = 'chmod 755 ~/ping.sh'
# pylint: disable=unused-variable
(stdin, stdout, stderr) = ssh.exec_command(cmd)
for line in stdout.readlines():
print line
return True
def __init__(self):
"""Initialize helper object."""
self.functest_test = pkg_resources.resource_filename(
'functest', 'opnfv_tests')
self.conf_path = pkg_resources.resource_filename(
'functest',
'opnfv_tests/openstack/refstack_client/refstack_tempest.conf')
self.defcore_list = pkg_resources.resource_filename(
'functest', 'opnfv_tests/openstack/refstack_client/defcore.txt')
self.confpath = os.path.join(self.functest_test,
self.conf_path)
self.defcorelist = os.path.join(self.functest_test,
self.defcore_list)
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
'-c', '--config',
help='the file path of refstack_tempest.conf',
default=self.confpath)
self.parser.add_argument(
'-t', '--testlist',
help='Specify the file path or URL of a test list text file. '
'This test list will contain specific test cases that '
'should be tested.',
default=self.defcorelist)
def test_resources():
# loading by resource
cls = bob.bio.base.load_resource("pca", "algorithm")
assert isinstance (cls, bob.bio.base.algorithm.PCA)
# loading by configuration file
cls = bob.bio.base.load_resource(pkg_resources.resource_filename("bob.bio.base.config.algorithm", "pca.py"), "algorithm")
assert isinstance (cls, bob.bio.base.algorithm.PCA)
# loading by instatiation
cls = bob.bio.base.load_resource("bob.bio.base.algorithm.PCA(10, distance_function=scipy.spatial.distance.euclidean)", "algorithm", imports=['bob.bio.base', 'scipy.spatial'])
assert isinstance (cls, bob.bio.base.algorithm.PCA)
# get list of extensions
extensions = bob.bio.base.extensions()
assert isinstance(extensions, list)
assert 'bob.bio.base' in extensions
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def setUp(self):
file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
with open(file_path, 'r') as in_file:
defaults = json.load(in_file)
buildargs = {
'serviceName': 'customsearch',
'version': 'v1',
'developerKey': defaults['build_developerKey']
}
cseargs = {
'q': 'google',
'num': 1,
'fileType': 'png',
'cx': defaults['cx']
}
self.results = search_google.api.results(buildargs, cseargs)
tempfile = TemporaryFile()
self.tempfile = str(tempfile.name)
tempfile.close()
self.tempdir = str(TemporaryDirectory().name)
def register_adapters():
global adapters_registered
if adapters_registered is True:
return
try:
import pkg_resources
packageDir = pkg_resources.resource_filename('pyamf', 'adapters')
except:
packageDir = os.path.dirname(__file__)
for f in glob.glob(os.path.join(packageDir, '*.py')):
mod = os.path.basename(f).split(os.path.extsep, 1)[0]
if mod == '__init__' or not mod.startswith('_'):
continue
try:
register_adapter(mod[1:].replace('_', '.'), PackageImporter(mod))
except ImportError:
pass
adapters_registered = True
def _instantiate(self, rsc):
# First, load the pump
with open(resource_filename(__name__,
os.path.join(rsc, 'pump.pkl')),
'rb') as fd:
self.pump = pickle.load(fd)
# Now load the model
with open(resource_filename(__name__,
os.path.join(rsc, 'model_spec.pkl')),
'rb') as fd:
spec = pickle.load(fd)
self.model = keras.models.model_from_config(spec)
# And the model weights
self.model.load_weights(resource_filename(__name__,
os.path.join(rsc,
'model.h5')))
# And the version number
with open(resource_filename(__name__,
os.path.join(rsc, 'version.txt')),
'r') as fd:
self.version = fd.read().strip()
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def populate_table(table, data_file, print_msg):
"""Method to populate a table with records stored in a dict loaded from a json file
Args:
table(api.aws.DynamoTable): the table to write to
data_json(dict): the data to write. Should be an array in an object named 'data'
Returns:
None
"""
with open(os.path.join(resource_filename("manage", "data"), data_file), 'rt') as df:
data = json.load(df)
if len(data["data"]) == 1:
# Assume this is a campaign file for now.
print(" - Example campaign loaded: https://<your_stack>/campaign.html?id={}".format(data["data"][0]["campaign_id"]))
for item in data["data"]:
table.put_item(item)
print(print_msg)
def get_code(self):
"""Zip up the code and return bytes
Returns:
bytes
"""
with open(tempfile.NamedTemporaryFile().name, 'w') as zf:
zfh = zipfile.ZipFile(zf.name, mode='w')
old_path = os.getcwd()
os.chdir(os.path.join(resource_filename("manage", "configs")))
zfh.write("url_rewrite.js")
zfh.close()
zf.close()
os.chdir(old_path)
with open(zf.name, "rb") as zfr:
return zfr.read()
upload_documents_to_discovery_collection.py 文件源码
项目:retrieve-and-rank-tuning
作者: rchaks
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def main():
insurance_lib_data_dir = resource_filename('resources', 'insurance_lib_v2')
print('Using data from {}'.format(insurance_lib_data_dir))
# Either re-use an existing collection id by over riding the below, or leave as is to create one
collection_id = "TestCollection-InsLibV2"
discovery = DiscoveryProxy()
collection_id = discovery.setup_collection(collection_id=collection_id,
config_id="889a08c9-cad9-4287-a87d-2f0380363bff")
discovery.print_collection_stats(collection_id)
# This thing seems to misbehave when run from python notebooks due to its use of multiprocessing, so just
# running in a script
discovery.upload_documents(collection_id=collection_id,
corpus=document_corpus_as_iterable(
path.join(insurance_lib_data_dir, 'document_corpus.solr.xml')))
discovery.print_collection_stats(collection_id)
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_cached_phrases_no_files(self,
corpus_base_path,
doc_content_stream):
from eea.corpus.processing.phrases.process import cached_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
# we want the B.phras.* files in fixtures
env = {'phash_id': 'X', 'file_name': 'ignore'}
settings = {}
stream = cached_phrases(doc_content_stream, env, settings)
with pytest.raises(StopIteration):
next(stream)
def test_cached_phrases_cached_files(self,
corpus_base_path,
doc_content_stream):
# TODO: this test should be improved. Text quality should be tested
from eea.corpus.processing.phrases.process import cached_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
# we want the B.phras.* files in fixtures
env = {'phash_id': 'B', 'file_name': 'ignore'}
settings = {}
stream = cached_phrases(doc_content_stream, env, settings)
doc = next(stream)
assert 'water_stress_conditions' in doc.text
assert 'positive_development' in doc.text
def test_preview_phrases_with_cache_files(self, corpus_base_path):
from eea.corpus.processing.phrases.process import preview_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
content = ['hello', 'world']
env = {
'file_name': 'x.csv',
'text_column': 'text',
'phash_id': 'B',
}
stream = preview_phrases(content, env, {})
assert list(stream) == []
def test_preview_phrases_nocache_files_with_job(self,
corpus_base_path,
get_assigned_job):
from eea.corpus.processing.phrases.process import preview_phrases
from pkg_resources import resource_filename
get_assigned_job.return_value = Mock(id='job1')
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
content = ['hello', 'world']
env = {
'file_name': 'x.csv',
'text_column': 'text',
'phash_id': 'X',
}
stream = preview_phrases(content, env, {})
assert list(stream) == ['hello', 'world']
def test_produce_phrases_with_no_job(self,
cached_phrases,
corpus_base_path,
get_pipeline_for_component,
build_phrases
):
from eea.corpus.processing.phrases.process import produce_phrases
from pkg_resources import resource_filename
content = ['hello', 'world']
env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
cached_phrases.return_value = ['something', 'else']
stream = produce_phrases(content, env, {})
assert list(stream) == ['something', 'else']
assert corpus_base_path.call_count == 1
assert get_pipeline_for_component.call_count == 1
assert build_phrases.call_count == 1
assert cached_phrases.call_count == 1
def test_produce_phrases_with_ok_job(self,
cached_phrases,
corpus_base_path,
get_pipeline_for_component,
build_phrases,
get_job_finish_status
):
from eea.corpus.processing.phrases.process import produce_phrases
from pkg_resources import resource_filename
content = ['hello', 'world']
env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
cached_phrases.return_value = ['something', 'else']
get_job_finish_status.return_value = True
stream = produce_phrases(content, env, {})
assert list(stream) == ['something', 'else']
assert corpus_base_path.call_count == 1
assert get_pipeline_for_component.call_count == 0
assert build_phrases.call_count == 0
assert cached_phrases.call_count == 1