python类resource_filename()的实例源码

deploy.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def prepare_zip():
    from pkg_resources import resource_filename as resource
    from config import config
    from json import dumps
    logger.info('creating/updating gimel.zip')
    with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
        info = ZipInfo('config.json')
        info.external_attr = 0o664 << 16
        zipf.writestr(info, dumps(config))
        zipf.write(resource('gimel', 'config.py'), 'config.py')
        zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
        zipf.write(resource('gimel', 'logger.py'), 'logger.py')
        for root, dirs, files in os.walk(resource('gimel', 'vendor')):
            for file in files:
                real_file = os.path.join(root, file)
                relative_file = os.path.relpath(real_file,
                                                resource('gimel', ''))
                zipf.write(real_file, relative_file)
test_basic.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
launch_gui.py 文件源码 项目:spyking-circus 作者: spyking-circus 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_params_file(self, fname):
        msg = QMessageBox()
        msg.setIcon(QMessageBox.Question)
        msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to "
                    "create it for you?" % fname)
        msg.setWindowTitle("Generate parameter file?")
        msg.setInformativeText("This will create a parameter file from a "
                               "template file and open it in your system's "
                               "standard text editor. Fill properly before "
                               "launching the code. See the documentation "
                               "for details")
        msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
        answer = msg.exec_()
        if answer == QMessageBox.Yes:
            user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus')
            if os.path.exists(user_path + 'config.params'):
                config_file = os.path.abspath(user_path + 'config.params')
            else:
                config_file = os.path.abspath(
                    pkg_resources.resource_filename('circus', 'config.params'))
            shutil.copyfile(config_file, fname)
            self.params = fname
            self.last_log_file = fname.replace('.params', '.log')
            self.update_params()
gui.py 文件源码 项目:spyking-circus 作者: spyking-circus 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init_gui_layout(self):
        gui_fname = pkg_resources.resource_filename('circus',
                                                    os.path.join('qt_GUI',
                                                                 'qt_merge.ui'))
        if comm.rank == 0:
            self.ui = uic.loadUi(gui_fname, self)
            # print dir(self.ui)
            self.score_ax1 = self.ui.score_1.axes
            self.score_ax2 = self.ui.score_2.axes
            self.score_ax3 = self.ui.score_3.axes
            self.waveforms_ax  = self.ui.waveforms.axes
            self.detail_ax     = self.ui.detail.axes
            self.data_ax       = self.ui.data_overview.axes
            self.current_order = self.ui.cmb_sorting.currentIndex()
            self.mpl_toolbar = NavigationToolbar(self.ui.waveforms, None)
            self.mpl_toolbar.pan()
            self.ui.show()
        else:
            self.ui = None
gui.py 文件源码 项目:spyking-circus 作者: spyking-circus 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init_gui_layout(self):
        gui_fname = pkg_resources.resource_filename('circus',
                                                    os.path.join('qt_GUI',
                                                                 'qt_preview.ui'))
        self.ui = uic.loadUi(gui_fname, self)
        self.electrode_ax = self.ui.electrodes.axes
        self.data_x = self.ui.raw_data.axes
        if self.show_fit:
            self.ui.btn_next.clicked.connect(self.increase_time)
            self.ui.btn_prev.clicked.connect(self.decrease_time)
        else:
            self.ui.btn_next.setVisible(False)
            self.ui.btn_prev.setVisible(False)
        # Toolbar will not be displayed
        self.mpl_toolbar = NavigationToolbar(self.ui.raw_data, None)
        self.mpl_toolbar.pan()
        self.ui.show()
render.py 文件源码 项目:eClaire 作者: kogan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def generate_pdf(card):
    """
    Make a PDF from a card

    :param card: dict from fetcher.py
    :return: Binary PDF buffer
    """
    from eclaire.base import SPECIAL_LABELS

    pdf = FPDF('L', 'mm', (62, 140))
    pdf.set_margins(2.8, 2.8, 2.8)
    pdf.set_auto_page_break(False, margin=0)

    pdf.add_page()

    font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
    pdf.add_font('Clairifont', fname=font, uni=True)
    pdf.set_font('Clairifont', size=48)

    pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')

    qrcode = generate_qr_code(card.url)
    qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
    qrcode.save(qrcode_file)
    pdf.image(qrcode_file, 118, 35, 20, 20)
    os.unlink(qrcode_file)

    # May we never speak of this again.
    pdf.set_fill_color(255, 255, 255)
    pdf.rect(0, 55, 140, 20, 'F')

    pdf.set_font('Clairifont', '', 16)
    pdf.set_y(-4)
    labels = ', '.join([label.name for label in card.labels
                        if label.name not in SPECIAL_LABELS])
    pdf.multi_cell(0, 0, labels, 0, 'R')

    return pdf.output(dest='S')
test_installation.py 文件源码 项目:pynufft 作者: jyhmiinlin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_installation():
    '''
    Test the installation
    '''
    import pkg_resources
    PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
    DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
    import os.path


    print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
    print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
    print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
    print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
    print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
    print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
    print('Does example_1D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_1D.py'))
    print('Does example_2D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_2D.py'))


    for pkgname in ('reikna', 'pyopencl', 'pycuda'):
        error_code = test_pkg(pkgname)
        if 1 == error_code:
            break
.pynufft_cpu.py 文件源码 项目:pynufft 作者: jyhmiinlin 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_installation():
    '''
    Test the installation
    '''
    import pkg_resources
    PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
    DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
    import os.path


    print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
    print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
    print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
    print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
    print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
    print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
    print('Does 1D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
    print('Does 2D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
vping_ssh.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _transfer_ping_script(self, ssh):
        """
        Transfert vping script to VM.

        Uses SCP to copy the ping script via the SSH client
        :param ssh: the SSH client
        :return:
        """
        self.logger.info("Trying to transfer ping.sh")
        scp = SCPClient(ssh.get_transport())
        ping_script = pkg_resources.resource_filename(
            'functest.opnfv_tests.openstack.vping', 'ping.sh')
        try:
            scp.put(ping_script, "~/")
        except Exception:  # pylint: disable=broad-except
            self.logger.error("Cannot SCP the file '%s'", ping_script)
            return False

        cmd = 'chmod 755 ~/ping.sh'
        # pylint: disable=unused-variable
        (stdin, stdout, stderr) = ssh.exec_command(cmd)
        for line in stdout.readlines():
            print line

        return True
refstack_client.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self):
        """Initialize helper object."""
        self.functest_test = pkg_resources.resource_filename(
            'functest', 'opnfv_tests')
        self.conf_path = pkg_resources.resource_filename(
            'functest',
            'opnfv_tests/openstack/refstack_client/refstack_tempest.conf')
        self.defcore_list = pkg_resources.resource_filename(
            'functest', 'opnfv_tests/openstack/refstack_client/defcore.txt')
        self.confpath = os.path.join(self.functest_test,
                                     self.conf_path)
        self.defcorelist = os.path.join(self.functest_test,
                                        self.defcore_list)
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            '-c', '--config',
            help='the file path of refstack_tempest.conf',
            default=self.confpath)
        self.parser.add_argument(
            '-t', '--testlist',
            help='Specify the file path or URL of a test list text file. '
                 'This test list will contain specific test cases that '
                 'should be tested.',
            default=self.defcorelist)
test_utils.py 文件源码 项目:bob.bio.base 作者: bioidiap 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_resources():
  # loading by resource
  cls = bob.bio.base.load_resource("pca", "algorithm")
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # loading by configuration file
  cls = bob.bio.base.load_resource(pkg_resources.resource_filename("bob.bio.base.config.algorithm", "pca.py"), "algorithm")
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # loading by instatiation
  cls = bob.bio.base.load_resource("bob.bio.base.algorithm.PCA(10, distance_function=scipy.spatial.distance.euclidean)", "algorithm", imports=['bob.bio.base', 'scipy.spatial'])
  assert isinstance (cls, bob.bio.base.algorithm.PCA)

  # get list of extensions
  extensions = bob.bio.base.extensions()
  assert isinstance(extensions, list)
  assert 'bob.bio.base' in extensions
test_basic.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 97 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        with open(filename, 'rb') as json_file:
            return json.loads(json_file.read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_basic.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_basic.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_basic.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 60 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_api_results.py 文件源码 项目:search_google 作者: rrwen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
    file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
    with open(file_path, 'r') as in_file:
      defaults = json.load(in_file)
    buildargs = {
      'serviceName': 'customsearch',
      'version': 'v1',
      'developerKey': defaults['build_developerKey']
    }
    cseargs = {
      'q': 'google',
      'num': 1,
      'fileType': 'png',
      'cx': defaults['cx']
    }
    self.results = search_google.api.results(buildargs, cseargs)
    tempfile = TemporaryFile()
    self.tempfile = str(tempfile.name)
    tempfile.close()
    self.tempdir = str(TemporaryDirectory().name)
__init__.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def register_adapters():
    global adapters_registered

    if adapters_registered is True:
        return

    try:
        import pkg_resources
        packageDir = pkg_resources.resource_filename('pyamf', 'adapters')
    except:
        packageDir = os.path.dirname(__file__)

    for f in glob.glob(os.path.join(packageDir, '*.py')):
        mod = os.path.basename(f).split(os.path.extsep, 1)[0]

        if mod == '__init__' or not mod.startswith('_'):
            continue

        try:
            register_adapter(mod[1:].replace('_', '.'), PackageImporter(mod))
        except ImportError:
            pass

    adapters_registered = True
base.py 文件源码 项目:crema 作者: bmcfee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _instantiate(self, rsc):

        # First, load the pump
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'pump.pkl')),
                  'rb') as fd:
            self.pump = pickle.load(fd)

        # Now load the model
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'model_spec.pkl')),
                  'rb') as fd:
            spec = pickle.load(fd)
            self.model = keras.models.model_from_config(spec)

        # And the model weights
        self.model.load_weights(resource_filename(__name__,
                                                  os.path.join(rsc,
                                                               'model.h5')))

        # And the version number
        with open(resource_filename(__name__,
                                    os.path.join(rsc, 'version.txt')),
                  'r') as fd:
            self.version = fd.read().strip()
test_basic.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
manage.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def populate_table(table, data_file, print_msg):
    """Method to populate a table with records stored in a dict loaded from a json file

    Args:
        table(api.aws.DynamoTable): the table to write to
        data_json(dict): the data to write. Should be an array in an object named 'data'

    Returns:
        None
    """
    with open(os.path.join(resource_filename("manage", "data"), data_file), 'rt') as df:
        data = json.load(df)
        if len(data["data"]) == 1:
            # Assume this is a campaign file for now.
            print(" - Example campaign loaded: https://<your_stack>/campaign.html?id={}".format(data["data"][0]["campaign_id"]))
        for item in data["data"]:
            table.put_item(item)
            print(print_msg)
cloudfront.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_code(self):
        """Zip up the code and return bytes

        Returns:
            bytes
        """
        with open(tempfile.NamedTemporaryFile().name, 'w') as zf:
            zfh = zipfile.ZipFile(zf.name, mode='w')

            old_path = os.getcwd()
            os.chdir(os.path.join(resource_filename("manage", "configs")))

            zfh.write("url_rewrite.js")
            zfh.close()
            zf.close()

            os.chdir(old_path)
            with open(zf.name, "rb") as zfr:
                return zfr.read()
upload_documents_to_discovery_collection.py 文件源码 项目:retrieve-and-rank-tuning 作者: rchaks 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    insurance_lib_data_dir = resource_filename('resources', 'insurance_lib_v2')
    print('Using data from {}'.format(insurance_lib_data_dir))

    # Either re-use an existing collection id by over riding the below, or leave as is to create one
    collection_id = "TestCollection-InsLibV2"

    discovery = DiscoveryProxy()

    collection_id = discovery.setup_collection(collection_id=collection_id,
                                               config_id="889a08c9-cad9-4287-a87d-2f0380363bff")
    discovery.print_collection_stats(collection_id)

    # This thing seems to misbehave when run from python notebooks due to its use of multiprocessing, so just
    # running in a script
    discovery.upload_documents(collection_id=collection_id,
                               corpus=document_corpus_as_iterable(
                                   path.join(insurance_lib_data_dir, 'document_corpus.solr.xml')))

    discovery.print_collection_stats(collection_id)
test_basic.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_basic.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pydist():
    """Make sure pydist.json exists and validates against our schema."""
    # XXX this test may need manual cleanup of older wheels

    import jsonschema

    def open_json(filename):
        return json.loads(open(filename, 'rb').read().decode('utf-8'))

    pymeta_schema = open_json(resource_filename('wheel.test',
                                                'pydist-schema.json'))
    valid = 0
    for dist in ("simple.dist", "complex-dist"):
        basedir = pkg_resources.resource_filename('wheel.test', dist)
        for (dirname, subdirs, filenames) in os.walk(basedir):
            for filename in filenames:
                if filename.endswith('.whl'):
                    whl = ZipFile(os.path.join(dirname, filename))
                    for entry in whl.infolist():
                        if entry.filename.endswith('/metadata.json'):
                            pymeta = json.loads(whl.read(entry).decode('utf-8'))
                            jsonschema.validate(pymeta, pymeta_schema)
                            valid += 1
    assert valid > 0, "No metadata.json found"
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_cached_phrases_no_files(self,
                                     corpus_base_path,
                                     doc_content_stream):
        from eea.corpus.processing.phrases.process import cached_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        # we want the B.phras.* files in fixtures
        env = {'phash_id': 'X', 'file_name': 'ignore'}
        settings = {}

        stream = cached_phrases(doc_content_stream, env, settings)
        with pytest.raises(StopIteration):
            next(stream)
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cached_phrases_cached_files(self,
                                         corpus_base_path,
                                         doc_content_stream):

        # TODO: this test should be improved. Text quality should be tested
        from eea.corpus.processing.phrases.process import cached_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        # we want the B.phras.* files in fixtures
        env = {'phash_id': 'B', 'file_name': 'ignore'}
        settings = {}

        stream = cached_phrases(doc_content_stream, env, settings)
        doc = next(stream)
        assert 'water_stress_conditions' in doc.text
        assert 'positive_development' in doc.text
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_preview_phrases_with_cache_files(self, corpus_base_path):
        from eea.corpus.processing.phrases.process import preview_phrases
        from pkg_resources import resource_filename

        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        content = ['hello', 'world']
        env = {
            'file_name': 'x.csv',
            'text_column': 'text',
            'phash_id': 'B',
        }

        stream = preview_phrases(content, env, {})
        assert list(stream) == []
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_preview_phrases_nocache_files_with_job(self,
                                                    corpus_base_path,
                                                    get_assigned_job):
        from eea.corpus.processing.phrases.process import preview_phrases
        from pkg_resources import resource_filename

        get_assigned_job.return_value = Mock(id='job1')
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')
        corpus_base_path.return_value = base_path

        content = ['hello', 'world']
        env = {
            'file_name': 'x.csv',
            'text_column': 'text',
            'phash_id': 'X',
        }

        stream = preview_phrases(content, env, {})
        assert list(stream) == ['hello', 'world']
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_produce_phrases_with_no_job(self,
                                         cached_phrases,
                                         corpus_base_path,
                                         get_pipeline_for_component,
                                         build_phrases
                                         ):
        from eea.corpus.processing.phrases.process import produce_phrases
        from pkg_resources import resource_filename

        content = ['hello', 'world']
        env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')

        corpus_base_path.return_value = base_path
        cached_phrases.return_value = ['something', 'else']

        stream = produce_phrases(content, env, {})

        assert list(stream) == ['something', 'else']
        assert corpus_base_path.call_count == 1
        assert get_pipeline_for_component.call_count == 1
        assert build_phrases.call_count == 1
        assert cached_phrases.call_count == 1
test_phrases.py 文件源码 项目:eea.corpus 作者: eea 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_produce_phrases_with_ok_job(self,
                                         cached_phrases,
                                         corpus_base_path,
                                         get_pipeline_for_component,
                                         build_phrases,
                                         get_job_finish_status
                                         ):
        from eea.corpus.processing.phrases.process import produce_phrases
        from pkg_resources import resource_filename

        content = ['hello', 'world']
        env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
        base_path = resource_filename('eea.corpus', 'tests/fixtures/')

        corpus_base_path.return_value = base_path
        cached_phrases.return_value = ['something', 'else']
        get_job_finish_status.return_value = True

        stream = produce_phrases(content, env, {})

        assert list(stream) == ['something', 'else']
        assert corpus_base_path.call_count == 1
        assert get_pipeline_for_component.call_count == 0
        assert build_phrases.call_count == 0
        assert cached_phrases.call_count == 1


问题


面经


文章

微信
公众号

扫码关注公众号