python类SimpleNamespace()的实例源码

test_wue.py 文件源码 项目:fluxpart 作者: usda-ars-ussl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_water_use_efficiency():
    """ Unit test does not quarantee the wue function is correct, but if
    this fails, something has changed, and better understand why.
    """

    hf_stats = SimpleNamespace(
        rho_vapor=9.607e-3,
        rho_co2=658.8e-6,
        T=28.56 + 273.15,
        P=100.1e3,
        cov_w_q=0.1443e-3,
        cov_w_c=-1.059e-6,
        cov_w_T=0.1359,
        ustar=0.4179,
        rho_totair=1.150)

    wue = water_use_efficiency(hf_stats, meas_ht=7.11, canopy_ht=4.42,
                               ppath='C3', ci_mod='const_ppm',
                               diff_ratio=(1 / 0.7))

    npt.assert_allclose(wue.wue, -6.45e-3, atol=0.005e-3)
    npt.assert_allclose(wue.ambient_h2o, 12.4e-3, atol=0.05e-3)
    npt.assert_allclose(wue.ambient_co2, 638.e-6, atol=0.5e-6)
    npt.assert_allclose(wue.inter_h2o, 28.3e-3, atol=0.05e-3)
    npt.assert_allclose(wue.inter_co2, 492.e-6, atol=0.5e-6)
test_calm.py 文件源码 项目:calm 作者: cygwin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_package_set(self):
        self.maxDiff = None

        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'dryrun', False)
        setattr(args, 'force', True)
        setattr(args, 'inifile', 'testdata/inifile/setup.ini')
        setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'release', 'testing')
        setattr(args, 'setup_version', '4.321')

        packages = package.read_packages(args.rel_area, args.arch)
        package.delete(packages, 'x86/release/nonexistent', 'nosuchfile-1.0.0.tar.xz')
        self.assertEqual(package.validate_packages(args, packages), True)
        package.write_setup_ini(args, packages, args.arch)
        with open(args.inifile) as inifile:
            results = inifile.read()
            # fix the timestamp to match expected
            results = re.sub('setup-timestamp: .*', 'setup-timestamp: 1458221800', results, 1)
            results = re.sub('generated at .*', 'generated at 2016-03-17 13:36:40 GMT', results, 1)
            compare_with_expected_file(self, 'testdata/inifile', (results,), 'setup.ini')

        # XXX: delete a needed package, and check validate fails
models.py 文件源码 项目:pyfc4 作者: ghukill 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _build_rdf(self, data=None):

        '''
        Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph

        Args:
            data (): payload from GET request, expected RDF content in various serialization formats

        Returns:
            None
        '''

        # recreate rdf data
        self.rdf = SimpleNamespace()
        self.rdf.data = data
        self.rdf.prefixes = SimpleNamespace()
        self.rdf.uris = SimpleNamespace()
        # populate prefixes
        for prefix,uri in self.repo.context.items():
            setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
        # graph
        self._parse_graph()
conftest.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def internal_api_patcher():
    """
    Fixture that patches certain internal app functions for the entire selenium suite execution
    """
    methods_to_patch = [
        'mail.api.MailgunClient._mailgun_request',
    ]
    patcher_mocks = []
    patchers = [patch(method_name) for method_name in methods_to_patch]
    for patcher in patchers:
        mock = patcher.start()
        mock.name = patcher.attribute
        patcher_mocks.append(mock)
    yield SimpleNamespace(
        patchers=patchers,
        patcher_mocks=patcher_mocks
    )
    for patcher in patchers:
        patcher.stop()
conftest.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def mocked_elasticsearch_module_patcher():
    """
    Fixture that patches all indexing API functions that communicate directly with ElasticSearch
    """
    patchers = []
    patcher_mocks = []
    for name, val in tasks.__dict__.items():
        # This looks for functions starting with _ because those are the functions which are imported
        # from indexing_api. The _ lets it prevent name collisions.
        if callable(val) and name.startswith("_"):
            patchers.append(patch('search.tasks.{0}'.format(name), autospec=True))
    for patcher in patchers:
        mock = patcher.start()
        mock.name = patcher.attribute
        patcher_mocks.append(mock)
    yield SimpleNamespace(
        patchers=patchers,
        patcher_mocks=patcher_mocks
    )
    for patcher in patchers:
        patcher.stop()
utils.py 文件源码 项目:hugo_jupyter 作者: knowsuchagency 项目源码 文件源码 阅读 91 收藏 0 点赞 0 评论 0
def get_packages_from_lockfile():
    """
    Return object that contains default and development packages from Pipfile.lock

    Returns: SimpleNamespace(default=[...], development=[...])

    """

    result = SimpleNamespace(default=list(), development=list())
    lockfile = Path('Pipfile.lock')
    lockfile_data = json.loads(lockfile.read_text())
    for key in ('default', 'develop'):
        for package, version_info in lockfile_data[key].items():
            packages = attrgetter('development' if key == 'develop' else key)(result)
            packages.append(package + version_info['version'])
    return result
test_pgantomizer.py 文件源码 项目:pgantomizer 作者: asgeirrr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_command_line_invokation(original_db, anonymized, monkeypatch):
    monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
        verbose=False,
        schema=SCHEMA_PATH,
        dump_file=DUMP_PATH,
        **{arg: ORIGINAL_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
    ))
    dump_main()

    assert os.path.getsize(DUMP_PATH) > 2000
    assert_db_empty(anonymized)
    monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
        verbose=False,
        leave_dump=False,
        schema=SCHEMA_PATH,
        dump_file=DUMP_PATH,
        **{arg: ANONYMIZED_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
    ))
    anonymize_main()
    assert_db_anonymized(anonymized)
database.py 文件源码 项目:http-observatory 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _connect(self):
        try:
            self._conn = psycopg2.connect(database=DATABASE_DB,
                                          host=DATABASE_HOST,
                                          password=DATABASE_PASSWORD,
                                          port=DATABASE_PORT,
                                          sslmode=DATABASE_SSL_MODE,
                                          sslrootcert=DATABASE_CA_CERT,
                                          user=DATABASE_USER)

            if not self._connected:
                print('INFO: Connected to PostgreSQL', file=sys.stderr)
            self._connected = True

        except Exception as e:
            print(e, file=sys.stderr)
            self._conn = SimpleNamespace(closed=1)

            if self._connected:
                print('WARNING: Disconnected from PostgreSQL', file=sys.stderr)
            self._connected = False
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __generate_reference__(self, triple_map, **kwargs):
        """Generates a RDF entity based on triple map

        Args:
            triple_map(SimpleNamespace): Triple Map
        """
        raw_value = self.source.get(str(triple_map.reference))
        if raw_value is None or len(raw_value) < 1:
            return
        if hasattr(triple_map, "datatype"):
            if triple_map.datatype == NS_MGR.xsd.anyURI:
                output = rdflib.URIRef(raw_value)
            else:
                output = rdflib.Literal(
                    raw_value,
                    datatype=triple_map.datatype)
        else:
            output = rdflib.Literal(raw_value)
        return output
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __reference_handler__(self, **kwargs):
        """Internal method for handling rr:reference in triples map

        Keyword Args:

        -------------
            predicate_obj_map: SimpleNamespace
            obj: dict
            subject: rdflib.URIRef
        """
        subjects = []
        pred_obj_map = kwargs.get("predicate_obj_map")
        obj = kwargs.get("obj")
        subject = kwargs.get("subject")
        if pred_obj_map.reference is None:
            return subjects
        predicate = pred_obj_map.predicate
        ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
        found_objects = [r.value for r in ref_exp(obj)]
        for row in found_objects:
            self.output.add((subject, predicate, rdflib.Literal(row)))
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __generate_reference__(self, triple_map, **kwargs):
        """Internal method takes a triple_map and returns the result of
        applying to XPath to the current DOM context

        Args:
        -----
            triple_map: SimpleNamespace
            element: etree.Element
        """
        element = kwargs.get("element")
        found_elements = element.xpath(
            triple_map.reference,
            namespaces=self.xml_ns)
        for elem in found_elements:
            raw_text = elem.text.strip()
            #! Quick and dirty test for valid URI
            if not raw_text.startswith("http"):
                continue
            return rdflib.URIRef(raw_text)
senti_models.py 文件源码 项目:senti 作者: stevenxxiu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _fit_embedding_word(self, embedding_type, construct_docs, tokenize_, d=None):
        if embedding_type == 'google':
            embeddings_ = joblib.load('data/google/GoogleNews-vectors-negative300.pickle')
            embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
        elif embedding_type == 'twitter':
            estimator = Pipeline([
                ('tokenize', MapCorporas(tokenize_)),
                ('word2vec', MergeSliceCorporas(CachedFitTransform(Word2Vec(
                    sg=1, size=d, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
                ), self.memory))),
            ]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs])
            embeddings_ = estimator.named_steps['word2vec'].estimator
            embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
        else:
            embeddings_ = SimpleNamespace(X=np.empty((0, d)), vocab={})
        estimator = Pipeline([
            ('tokenize', MapCorporas(tokenize_)),
            # 0.25 is chosen so the unknown vectors have approximately the same variance as google pre-trained ones
            ('embeddings', MapCorporas(Embeddings(
                embeddings_, rand=lambda shape: get_rng().uniform(-0.25, 0.25, shape).astype('float32'),
                include_zero=True
            ))),
        ])
        estimator.fit(construct_docs)
        return estimator.named_steps['embeddings'].estimator
api_status_test.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        self.rpc = FakeTransmissionRPC()
        self.torrent = FakeTorrentAPI()
        srvapi = SimpleNamespace(rpc=self.rpc,
                                 torrent=self.torrent,
                                 loop=self.loop)
        self.api = StatusAPI(srvapi, interval=1)

        self.rpc.fake_stats = {
            'downloadSpeed': 789,
            'uploadSpeed': 0,
            'activeTorrentCount': 1,
            'pausedTorrentCount': 2,
            'torrentCount': 3,
        }

        self.torrent.fake_tlist = SimpleNamespace(
            torrents=({'status': Status((Status.ISOLATED,)), 'rate-up': 0, 'rate-down': 0},
                      {'status': Status((Status.DOWNLOAD,)), 'rate-up': 0, 'rate-down': 456},
                      {'status': Status((Status.DOWNLOAD, Status.UPLOAD)), 'rate-up': 123, 'rate-down': 456}),
        )
helpers.py 文件源码 项目:ubuntu-image 作者: CanonicalLtd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self, command, *args, **kws):
        if ['lb', 'config'] == command[-2:]:
            self.call_args_list.append(command)
            return SimpleNamespace(returncode=1)
        elif ['lb', 'build'] == command[-2:]:
            self.call_args_list.append(command)
            # Create dummy top-level filesystem layout.
            chroot_dir = os.path.join(self.root_dir, 'chroot')
            for dir_name in DIRS_UNDER_ROOTFS:
                os.makedirs(os.path.join(chroot_dir, dir_name))
            return SimpleNamespace(returncode=1)
        elif command.startswith('dpkg -L'):
            self.call_args_list.append(command)
            stdout = kws.pop('stdout', PIPE)
            stderr = kws.pop('stderr', PIPE)
            return subprocess_run(
                command,
                stdout=stdout, stderr=stderr,
                universal_newlines=True,
                **kws)
test_types.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_attrdel(self):
        ns1 = types.SimpleNamespace()
        ns2 = types.SimpleNamespace(x=1, y=2, w=3)

        with self.assertRaises(AttributeError):
            del ns1.spam
        with self.assertRaises(AttributeError):
            del ns2.spam

        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))
        ns2.y = 'spam'
        self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))

        ns1.spam = 5
        self.assertEqual(vars(ns1), dict(spam=5))
        del ns1.spam
        self.assertEqual(vars(ns1), {})
test_edit.py 文件源码 项目:twittback 作者: dmerejkowsky 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup_edit_test(tweet_factory, repository, mock, *, nvim_returncode):
    tweet_1 = tweet_factory.make_tweet(42, "First tweet!", date="2017-07-07")
    tweet_2 = tweet_factory.make_tweet(57, "Second tweet", date="2017-08-02")
    repository.add_tweets([tweet_1, tweet_2])

    spy = types.SimpleNamespace()
    spy.cmd = None

    def fake_run(cmd):
        stub_process = mock.Mock()
        spy.cmd = cmd
        stub_process.returncode = nvim_returncode
        path = cmd[1]
        with open(path, "w") as stream:
            stream.write("changed")
        return stub_process

    mock.patch("subprocess.run", fake_run)
    return spy
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_walker_set_heat():
    foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
    bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)

    walker = mcmcmc._Walker(variables=[foo_var, bar_var], func=lambda *_: 1, heat=0.1, min_max=(1, 5), quiet=True)
    assert walker.heat == 0.1
    walker.set_heat(0.75)
    assert walker.heat == 0.75

    with pytest.raises(ValueError) as err:
        walker.set_heat(-1)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)

    with pytest.raises(ValueError) as err:
        walker.set_heat(0)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)

    with pytest.raises(ValueError) as err:
        walker.set_heat(1.01)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_chain_init():
    foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
    bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)
    walker1 = SimpleNamespace(variables=[foo_var, bar_var])
    walker2 = SimpleNamespace(variables=[foo_var, bar_var])

    tmp_file = br.TempFile()

    chain = mcmcmc._Chain(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.01, hot_heat=0.2)
    assert chain.walkers == [walker1, walker2]
    assert chain.outfile == tmp_file.path
    assert chain.cold_heat == 0.01
    assert chain.hot_heat == 0.2
    assert chain.step_counter == 0
    assert chain.best_score_ever_seen == 0
    assert tmp_file.read() == """\
Gen\tfoo\tbar\tresult
"""
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_chain_dump_obj():
    walker1 = SimpleNamespace(_dump_obj=lambda *_: "walker1")
    walker2 = SimpleNamespace(_dump_obj=lambda *_: "walker2")
    tmp_file = br.TempFile()
    tmp_file.write("outfile results")

    chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.1, hot_heat=0.2,
                            step_counter=20, best_score_ever_seen=100, _dump_obj=mcmcmc._Chain._dump_obj)

    dump = chain._dump_obj(chain)
    assert dump["walkers"] == ["walker1", "walker2"]
    assert dump["cold_heat"] == 0.1
    assert dump["hot_heat"] == 0.2
    assert dump["step_count"] == 20
    assert dump["best_score"] == 100
    assert dump["results"] == "outfile results"
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_chain_apply_dump(capsys):
    walker1 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker1"))
    walker2 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker2"))

    tmp_file = br.TempFile()
    chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=None, hot_heat=None,
                            step_counter=None, best_score_ever_seen=None, _apply_dump=mcmcmc._Chain._apply_dump)

    var_dict = {"walkers": [None, None], "cold_heat": 0.1, "hot_heat": 0.2,
                "step_count": 20, "best_score": 100, "results": "Some results"}
    chain._apply_dump(chain, var_dict)
    assert chain.walkers == [walker1, walker2]
    out, err = capsys.readouterr()
    assert out == "Applying dump to walker1\nApplying dump to walker2\n"
    assert chain.cold_heat == 0.1
    assert chain.hot_heat == 0.2
    assert chain.step_counter == 20
    assert chain.best_score_ever_seen == 100
    assert tmp_file.read() == "Some results"
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_mcmcmc_resume(capsys):
    mc_obj = SimpleNamespace(dumpfile="does_not_exist", resume=mcmcmc.MCMCMC.resume)
    assert mc_obj.resume(mc_obj) is False

    tmp_file = br.TempFile(byte_mode=True)
    dill.dump(["a", "b", "c"], tmp_file)

    mc_obj.dumpfile = tmp_file.path
    chain1 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain1"))
    chain2 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain2"))
    chain3 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain3"))
    mc_obj.chains = [chain1, chain2, chain3]
    mc_obj.run = lambda *_: print("Running")

    assert mc_obj.resume(mc_obj) is True
    out, err = capsys.readouterr()
    assert out == "applying chain1\napplying chain2\napplying chain3\nRunning\n", print(out)
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_mcmcmc_check_convergence(hf):
    csv_path = os.path.join(hf.resource_path, "mcmcmc", "chain")
    chain1 = SimpleNamespace(step_counter=99, outfile=csv_path + "1.csv")
    chain2 = SimpleNamespace(step_counter=99, outfile=csv_path + "2.csv")
    chain3 = SimpleNamespace(step_counter=99, outfile=csv_path + "3.csv")

    mc_obj = SimpleNamespace(_check_convergence=mcmcmc.MCMCMC._check_convergence, chains=[chain1, chain2, chain3],
                             convergence=1.01)

    # Return False when step_counter < 100
    assert mc_obj._check_convergence(mc_obj) is False

    # Return False when convergence is not met
    chain1.step_counter = chain2.step_counter = chain3.step_counter = 100
    assert mc_obj._check_convergence(mc_obj) is False

    # Return True on convergence
    mc_obj.convergence = 1.1
    assert mc_obj._check_convergence(mc_obj) is True
test_mcmcmc.py 文件源码 项目:RD-MCL 作者: biologyguy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_mcmcmc_reset_params():
    walker1 = SimpleNamespace(params=[1, 2])
    walker2 = SimpleNamespace(params=[3, 4])
    walker3 = SimpleNamespace(params=[5, 6])
    walker4 = SimpleNamespace(params=[7, 8])

    chain1 = SimpleNamespace(walkers=[walker1, walker2])
    chain2 = SimpleNamespace(walkers=[walker3, walker4])

    mc_obj = SimpleNamespace(reset_params=mcmcmc.MCMCMC.reset_params, chains=[chain1, chain2])
    with pytest.raises(AttributeError) as e:
        mc_obj.reset_params(mc_obj, ['a', 'b', 'c'])
    assert "Incorrect number of params supplied in reset_params(). 2 expected; 3 supplied; ['a', 'b', 'c']" in str(e)

    mc_obj.reset_params(mc_obj, ['a', 'b'])
    assert walker2.params == ['a', 'b']
settings.py 文件源码 项目:porcupine 作者: Akuli 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def add_option(self, key, default, *, reset=True):
        """Add a new option without adding widgets to the setting dialog.

        ``section[key]`` will be *default* unless something else is
        specified.

        If *reset* is True, the setting dialog's reset button sets this
        option to *default*.

        .. note::
            The *reset* argument should be False for settings that
            cannot be changed with the dialog. That way, clicking the
            reset button resets only the settings that are shown in the
            dialog.
        """
        self._infos[key] = types.SimpleNamespace(
            default=default,        # not validated
            reset=reset,
            callbacks=[],
            errorvar=tkinter.BooleanVar(),  # true when the triangle is showing
        )
dataset.py 文件源码 项目:echonet 作者: karoldvl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, data_dir, work_dir):
        self.RandomState = np.random.RandomState(seed=20161013)
        self.data_mean = None
        self.data_std = None
        self.class_count = None
        self.meta = None
        self.train_meta = None
        self.test_data = types.SimpleNamespace(X=None, y=None, meta=None)
        self.validation_data = types.SimpleNamespace(X=None, y=None, meta=None)
        self.data_dir = data_dir
        self.work_dir = work_dir

        self.data_dir = os.path.abspath(self.data_dir) + '/'
        self.work_dir = os.path.abspath(self.work_dir) + '/'

        if not os.path.isdir(self.data_dir):
            raise NotADirectoryError("{} is not a proper dataset directory.".format(self.data_dir))
        if not os.path.isdir(self.work_dir):
            raise NotADirectoryError("{} is not a proper working directory.".format(self.work_dir))
test_types.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_attrdel(self):
        ns1 = types.SimpleNamespace()
        ns2 = types.SimpleNamespace(x=1, y=2, w=3)

        with self.assertRaises(AttributeError):
            del ns1.spam
        with self.assertRaises(AttributeError):
            del ns2.spam

        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))
        ns2.y = 'spam'
        self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))

        ns1.spam = 5
        self.assertEqual(vars(ns1), dict(spam=5))
        del ns1.spam
        self.assertEqual(vars(ns1), {})
test_calm.py 文件源码 项目:calm 作者: cygwin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_html_writer(self):
        self.maxDiff = None

        htdocs = 'testdata/htdocs'
        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'htdocs', htdocs)
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'dryrun', False)
        setattr(args, 'force', True)
        setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')

        packages = package.read_packages(args.rel_area, args.arch)
        package.validate_packages(args, packages)
        pkg2html.update_package_listings(args, packages, args.arch)

        # compare the output files with expected
        for (dirpath, subdirs, files) in os.walk(htdocs):
            relpath = os.path.relpath(dirpath, htdocs)
            for f in files:
                results = os.path.join(htdocs, relpath, f)
                expected = os.path.join('testdata/htdocs.expected', relpath, f)
                if not filecmp.cmp(results, expected, shallow=False):
                    logging.info("%s different", os.path.join(relpath, f))
                    with open(results) as r, open(expected) as e:
                        self.assertMultiLineEqual(e.read(), r.read())
                else:
                    logging.info("%s identical", os.path.join(relpath, f))
test_calm.py 文件源码 项目:calm 作者: cygwin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_scan_uploads(self):
        self.maxDiff = None

        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'dryrun', False)

        pkglist = ['after-ready', 'not-ready', 'testpackage', 'testpackage2']

        mlist = {}
        mlist = maintainers.Maintainer.add_directories(mlist, 'testdata/homes')
        m = mlist['Blooey McFooey']
        m.pkgs.extend(pkglist + ['not-on-package-list'])

        ready_fns = [(os.path.join(m.homedir(), 'x86', 'release', 'testpackage', '!ready'), ''),
                     (os.path.join(m.homedir(), 'x86', 'release', 'testpackage2', 'testpackage2-subpackage', '!ready'), ''),
                     (os.path.join(m.homedir(), 'x86', 'release', 'after-ready', '!ready'), '-t 198709011700'),
                     (os.path.join(m.homedir(), 'x86', 'release', 'corrupt', '!ready'), '')]
        for (f, t) in ready_fns:
            os.system('touch %s "%s"' % (t, f))

        scan_result = uploads.scan(m, pkglist + ['not-on-maintainer-list'], args.arch, args)

        self.assertEqual(scan_result.error, False)
        compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.to_relarea), 'move')
        self.assertCountEqual(scan_result.to_vault, {'x86/release/testpackage': ['x86/release/testpackage/testpackage-0.1-1.tar.bz2']})
        self.assertCountEqual(scan_result.remove_always, [f for (f, t) in ready_fns])
        self.assertEqual(scan_result.remove_success, ['testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1-src.tar.bz2', 'testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1.tar.bz2'])
        with pprint_patch():
            compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.packages), 'pkglist')
models.py 文件源码 项目:pyfc4 作者: ghukill 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self,
        repo,
        uri=None,
        response=None,
        rdf_prefixes_mixins=None):

        # repository handle is pinned to resource instance here
        self.repo = repo

        # parse uri with parse_uri() from repo instance
        self.uri = self.repo.parse_uri(uri)

        # parse response

        # if response provided, parse and set to attributes
        if response:
            self.response = response
            self.data = self.response.content
            self.headers = self.response.headers
            self.status_code = self.response.status_code
            # if response, and status_code is 200, set True
            if self.status_code == 200:
                self.exists = True

        # if not response, set all blank
        else:
            self.response = None
            self.data = None
            self.headers = {}
            self.status_code = None
            self.exists = False

        # RDF
        self._build_rdf(data=self.data)

        # versions
        self.versions = SimpleNamespace()
models.py 文件源码 项目:pyfc4 作者: ghukill 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def parse_object_like_triples(self):

        '''
        method to parse triples from self.rdf.graph for object-like 
        access

        Args:
            None

        Returns:
            None: sets self.rdf.triples
        '''

        # parse triples as object-like attributes in self.rdf.triples
        self.rdf.triples = SimpleNamespace() # prepare triples
        for s,p,o in self.rdf.graph:

            # get ns info
            ns_prefix, ns_uri, predicate = self.rdf.graph.compute_qname(p)

            # if prefix as list not yet added, add
            if not hasattr(self.rdf.triples, ns_prefix):
                setattr(self.rdf.triples, ns_prefix, SimpleNamespace())

            # same for predicate
            if not hasattr(getattr(self.rdf.triples, ns_prefix), predicate):
                setattr(getattr(self.rdf.triples, ns_prefix), predicate, [])            

            # append object for this prefix
            getattr(getattr(self.rdf.triples, ns_prefix), predicate).append(o)


问题


面经


文章

微信
公众号

扫码关注公众号