def test_water_use_efficiency():
""" Unit test does not quarantee the wue function is correct, but if
this fails, something has changed, and better understand why.
"""
hf_stats = SimpleNamespace(
rho_vapor=9.607e-3,
rho_co2=658.8e-6,
T=28.56 + 273.15,
P=100.1e3,
cov_w_q=0.1443e-3,
cov_w_c=-1.059e-6,
cov_w_T=0.1359,
ustar=0.4179,
rho_totair=1.150)
wue = water_use_efficiency(hf_stats, meas_ht=7.11, canopy_ht=4.42,
ppath='C3', ci_mod='const_ppm',
diff_ratio=(1 / 0.7))
npt.assert_allclose(wue.wue, -6.45e-3, atol=0.005e-3)
npt.assert_allclose(wue.ambient_h2o, 12.4e-3, atol=0.05e-3)
npt.assert_allclose(wue.ambient_co2, 638.e-6, atol=0.5e-6)
npt.assert_allclose(wue.inter_h2o, 28.3e-3, atol=0.05e-3)
npt.assert_allclose(wue.inter_co2, 492.e-6, atol=0.5e-6)
python类SimpleNamespace()的实例源码
def test_package_set(self):
self.maxDiff = None
args = types.SimpleNamespace()
setattr(args, 'arch', 'x86')
setattr(args, 'dryrun', False)
setattr(args, 'force', True)
setattr(args, 'inifile', 'testdata/inifile/setup.ini')
setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')
setattr(args, 'rel_area', 'testdata/relarea')
setattr(args, 'release', 'testing')
setattr(args, 'setup_version', '4.321')
packages = package.read_packages(args.rel_area, args.arch)
package.delete(packages, 'x86/release/nonexistent', 'nosuchfile-1.0.0.tar.xz')
self.assertEqual(package.validate_packages(args, packages), True)
package.write_setup_ini(args, packages, args.arch)
with open(args.inifile) as inifile:
results = inifile.read()
# fix the timestamp to match expected
results = re.sub('setup-timestamp: .*', 'setup-timestamp: 1458221800', results, 1)
results = re.sub('generated at .*', 'generated at 2016-03-17 13:36:40 GMT', results, 1)
compare_with_expected_file(self, 'testdata/inifile', (results,), 'setup.ini')
# XXX: delete a needed package, and check validate fails
def _build_rdf(self, data=None):
'''
Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph
Args:
data (): payload from GET request, expected RDF content in various serialization formats
Returns:
None
'''
# recreate rdf data
self.rdf = SimpleNamespace()
self.rdf.data = data
self.rdf.prefixes = SimpleNamespace()
self.rdf.uris = SimpleNamespace()
# populate prefixes
for prefix,uri in self.repo.context.items():
setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
# graph
self._parse_graph()
def internal_api_patcher():
"""
Fixture that patches certain internal app functions for the entire selenium suite execution
"""
methods_to_patch = [
'mail.api.MailgunClient._mailgun_request',
]
patcher_mocks = []
patchers = [patch(method_name) for method_name in methods_to_patch]
for patcher in patchers:
mock = patcher.start()
mock.name = patcher.attribute
patcher_mocks.append(mock)
yield SimpleNamespace(
patchers=patchers,
patcher_mocks=patcher_mocks
)
for patcher in patchers:
patcher.stop()
def mocked_elasticsearch_module_patcher():
"""
Fixture that patches all indexing API functions that communicate directly with ElasticSearch
"""
patchers = []
patcher_mocks = []
for name, val in tasks.__dict__.items():
# This looks for functions starting with _ because those are the functions which are imported
# from indexing_api. The _ lets it prevent name collisions.
if callable(val) and name.startswith("_"):
patchers.append(patch('search.tasks.{0}'.format(name), autospec=True))
for patcher in patchers:
mock = patcher.start()
mock.name = patcher.attribute
patcher_mocks.append(mock)
yield SimpleNamespace(
patchers=patchers,
patcher_mocks=patcher_mocks
)
for patcher in patchers:
patcher.stop()
def get_packages_from_lockfile():
"""
Return object that contains default and development packages from Pipfile.lock
Returns: SimpleNamespace(default=[...], development=[...])
"""
result = SimpleNamespace(default=list(), development=list())
lockfile = Path('Pipfile.lock')
lockfile_data = json.loads(lockfile.read_text())
for key in ('default', 'develop'):
for package, version_info in lockfile_data[key].items():
packages = attrgetter('development' if key == 'develop' else key)(result)
packages.append(package + version_info['version'])
return result
def test_command_line_invokation(original_db, anonymized, monkeypatch):
monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
verbose=False,
schema=SCHEMA_PATH,
dump_file=DUMP_PATH,
**{arg: ORIGINAL_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
))
dump_main()
assert os.path.getsize(DUMP_PATH) > 2000
assert_db_empty(anonymized)
monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
verbose=False,
leave_dump=False,
schema=SCHEMA_PATH,
dump_file=DUMP_PATH,
**{arg: ANONYMIZED_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
))
anonymize_main()
assert_db_anonymized(anonymized)
def _connect(self):
try:
self._conn = psycopg2.connect(database=DATABASE_DB,
host=DATABASE_HOST,
password=DATABASE_PASSWORD,
port=DATABASE_PORT,
sslmode=DATABASE_SSL_MODE,
sslrootcert=DATABASE_CA_CERT,
user=DATABASE_USER)
if not self._connected:
print('INFO: Connected to PostgreSQL', file=sys.stderr)
self._connected = True
except Exception as e:
print(e, file=sys.stderr)
self._conn = SimpleNamespace(closed=1)
if self._connected:
print('WARNING: Disconnected from PostgreSQL', file=sys.stderr)
self._connected = False
def __generate_reference__(self, triple_map, **kwargs):
"""Generates a RDF entity based on triple map
Args:
triple_map(SimpleNamespace): Triple Map
"""
raw_value = self.source.get(str(triple_map.reference))
if raw_value is None or len(raw_value) < 1:
return
if hasattr(triple_map, "datatype"):
if triple_map.datatype == NS_MGR.xsd.anyURI:
output = rdflib.URIRef(raw_value)
else:
output = rdflib.Literal(
raw_value,
datatype=triple_map.datatype)
else:
output = rdflib.Literal(raw_value)
return output
def __reference_handler__(self, **kwargs):
"""Internal method for handling rr:reference in triples map
Keyword Args:
-------------
predicate_obj_map: SimpleNamespace
obj: dict
subject: rdflib.URIRef
"""
subjects = []
pred_obj_map = kwargs.get("predicate_obj_map")
obj = kwargs.get("obj")
subject = kwargs.get("subject")
if pred_obj_map.reference is None:
return subjects
predicate = pred_obj_map.predicate
ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
found_objects = [r.value for r in ref_exp(obj)]
for row in found_objects:
self.output.add((subject, predicate, rdflib.Literal(row)))
def __generate_reference__(self, triple_map, **kwargs):
"""Internal method takes a triple_map and returns the result of
applying to XPath to the current DOM context
Args:
-----
triple_map: SimpleNamespace
element: etree.Element
"""
element = kwargs.get("element")
found_elements = element.xpath(
triple_map.reference,
namespaces=self.xml_ns)
for elem in found_elements:
raw_text = elem.text.strip()
#! Quick and dirty test for valid URI
if not raw_text.startswith("http"):
continue
return rdflib.URIRef(raw_text)
def _fit_embedding_word(self, embedding_type, construct_docs, tokenize_, d=None):
if embedding_type == 'google':
embeddings_ = joblib.load('data/google/GoogleNews-vectors-negative300.pickle')
embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
elif embedding_type == 'twitter':
estimator = Pipeline([
('tokenize', MapCorporas(tokenize_)),
('word2vec', MergeSliceCorporas(CachedFitTransform(Word2Vec(
sg=1, size=d, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
), self.memory))),
]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs])
embeddings_ = estimator.named_steps['word2vec'].estimator
embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
else:
embeddings_ = SimpleNamespace(X=np.empty((0, d)), vocab={})
estimator = Pipeline([
('tokenize', MapCorporas(tokenize_)),
# 0.25 is chosen so the unknown vectors have approximately the same variance as google pre-trained ones
('embeddings', MapCorporas(Embeddings(
embeddings_, rand=lambda shape: get_rng().uniform(-0.25, 0.25, shape).astype('float32'),
include_zero=True
))),
])
estimator.fit(construct_docs)
return estimator.named_steps['embeddings'].estimator
def setUp(self):
self.rpc = FakeTransmissionRPC()
self.torrent = FakeTorrentAPI()
srvapi = SimpleNamespace(rpc=self.rpc,
torrent=self.torrent,
loop=self.loop)
self.api = StatusAPI(srvapi, interval=1)
self.rpc.fake_stats = {
'downloadSpeed': 789,
'uploadSpeed': 0,
'activeTorrentCount': 1,
'pausedTorrentCount': 2,
'torrentCount': 3,
}
self.torrent.fake_tlist = SimpleNamespace(
torrents=({'status': Status((Status.ISOLATED,)), 'rate-up': 0, 'rate-down': 0},
{'status': Status((Status.DOWNLOAD,)), 'rate-up': 0, 'rate-down': 456},
{'status': Status((Status.DOWNLOAD, Status.UPLOAD)), 'rate-up': 123, 'rate-down': 456}),
)
def run(self, command, *args, **kws):
if ['lb', 'config'] == command[-2:]:
self.call_args_list.append(command)
return SimpleNamespace(returncode=1)
elif ['lb', 'build'] == command[-2:]:
self.call_args_list.append(command)
# Create dummy top-level filesystem layout.
chroot_dir = os.path.join(self.root_dir, 'chroot')
for dir_name in DIRS_UNDER_ROOTFS:
os.makedirs(os.path.join(chroot_dir, dir_name))
return SimpleNamespace(returncode=1)
elif command.startswith('dpkg -L'):
self.call_args_list.append(command)
stdout = kws.pop('stdout', PIPE)
stderr = kws.pop('stderr', PIPE)
return subprocess_run(
command,
stdout=stdout, stderr=stderr,
universal_newlines=True,
**kws)
def test_attrdel(self):
ns1 = types.SimpleNamespace()
ns2 = types.SimpleNamespace(x=1, y=2, w=3)
with self.assertRaises(AttributeError):
del ns1.spam
with self.assertRaises(AttributeError):
del ns2.spam
del ns2.y
self.assertEqual(vars(ns2), dict(w=3, x=1))
ns2.y = 'spam'
self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
del ns2.y
self.assertEqual(vars(ns2), dict(w=3, x=1))
ns1.spam = 5
self.assertEqual(vars(ns1), dict(spam=5))
del ns1.spam
self.assertEqual(vars(ns1), {})
def setup_edit_test(tweet_factory, repository, mock, *, nvim_returncode):
tweet_1 = tweet_factory.make_tweet(42, "First tweet!", date="2017-07-07")
tweet_2 = tweet_factory.make_tweet(57, "Second tweet", date="2017-08-02")
repository.add_tweets([tweet_1, tweet_2])
spy = types.SimpleNamespace()
spy.cmd = None
def fake_run(cmd):
stub_process = mock.Mock()
spy.cmd = cmd
stub_process.returncode = nvim_returncode
path = cmd[1]
with open(path, "w") as stream:
stream.write("changed")
return stub_process
mock.patch("subprocess.run", fake_run)
return spy
def test_walker_set_heat():
foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)
walker = mcmcmc._Walker(variables=[foo_var, bar_var], func=lambda *_: 1, heat=0.1, min_max=(1, 5), quiet=True)
assert walker.heat == 0.1
walker.set_heat(0.75)
assert walker.heat == 0.75
with pytest.raises(ValueError) as err:
walker.set_heat(-1)
assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
with pytest.raises(ValueError) as err:
walker.set_heat(0)
assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
with pytest.raises(ValueError) as err:
walker.set_heat(1.01)
assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
def test_chain_init():
foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)
walker1 = SimpleNamespace(variables=[foo_var, bar_var])
walker2 = SimpleNamespace(variables=[foo_var, bar_var])
tmp_file = br.TempFile()
chain = mcmcmc._Chain(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.01, hot_heat=0.2)
assert chain.walkers == [walker1, walker2]
assert chain.outfile == tmp_file.path
assert chain.cold_heat == 0.01
assert chain.hot_heat == 0.2
assert chain.step_counter == 0
assert chain.best_score_ever_seen == 0
assert tmp_file.read() == """\
Gen\tfoo\tbar\tresult
"""
def test_chain_dump_obj():
walker1 = SimpleNamespace(_dump_obj=lambda *_: "walker1")
walker2 = SimpleNamespace(_dump_obj=lambda *_: "walker2")
tmp_file = br.TempFile()
tmp_file.write("outfile results")
chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.1, hot_heat=0.2,
step_counter=20, best_score_ever_seen=100, _dump_obj=mcmcmc._Chain._dump_obj)
dump = chain._dump_obj(chain)
assert dump["walkers"] == ["walker1", "walker2"]
assert dump["cold_heat"] == 0.1
assert dump["hot_heat"] == 0.2
assert dump["step_count"] == 20
assert dump["best_score"] == 100
assert dump["results"] == "outfile results"
def test_chain_apply_dump(capsys):
walker1 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker1"))
walker2 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker2"))
tmp_file = br.TempFile()
chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=None, hot_heat=None,
step_counter=None, best_score_ever_seen=None, _apply_dump=mcmcmc._Chain._apply_dump)
var_dict = {"walkers": [None, None], "cold_heat": 0.1, "hot_heat": 0.2,
"step_count": 20, "best_score": 100, "results": "Some results"}
chain._apply_dump(chain, var_dict)
assert chain.walkers == [walker1, walker2]
out, err = capsys.readouterr()
assert out == "Applying dump to walker1\nApplying dump to walker2\n"
assert chain.cold_heat == 0.1
assert chain.hot_heat == 0.2
assert chain.step_counter == 20
assert chain.best_score_ever_seen == 100
assert tmp_file.read() == "Some results"
def test_mcmcmc_resume(capsys):
mc_obj = SimpleNamespace(dumpfile="does_not_exist", resume=mcmcmc.MCMCMC.resume)
assert mc_obj.resume(mc_obj) is False
tmp_file = br.TempFile(byte_mode=True)
dill.dump(["a", "b", "c"], tmp_file)
mc_obj.dumpfile = tmp_file.path
chain1 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain1"))
chain2 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain2"))
chain3 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain3"))
mc_obj.chains = [chain1, chain2, chain3]
mc_obj.run = lambda *_: print("Running")
assert mc_obj.resume(mc_obj) is True
out, err = capsys.readouterr()
assert out == "applying chain1\napplying chain2\napplying chain3\nRunning\n", print(out)
def test_mcmcmc_check_convergence(hf):
csv_path = os.path.join(hf.resource_path, "mcmcmc", "chain")
chain1 = SimpleNamespace(step_counter=99, outfile=csv_path + "1.csv")
chain2 = SimpleNamespace(step_counter=99, outfile=csv_path + "2.csv")
chain3 = SimpleNamespace(step_counter=99, outfile=csv_path + "3.csv")
mc_obj = SimpleNamespace(_check_convergence=mcmcmc.MCMCMC._check_convergence, chains=[chain1, chain2, chain3],
convergence=1.01)
# Return False when step_counter < 100
assert mc_obj._check_convergence(mc_obj) is False
# Return False when convergence is not met
chain1.step_counter = chain2.step_counter = chain3.step_counter = 100
assert mc_obj._check_convergence(mc_obj) is False
# Return True on convergence
mc_obj.convergence = 1.1
assert mc_obj._check_convergence(mc_obj) is True
def test_mcmcmc_reset_params():
walker1 = SimpleNamespace(params=[1, 2])
walker2 = SimpleNamespace(params=[3, 4])
walker3 = SimpleNamespace(params=[5, 6])
walker4 = SimpleNamespace(params=[7, 8])
chain1 = SimpleNamespace(walkers=[walker1, walker2])
chain2 = SimpleNamespace(walkers=[walker3, walker4])
mc_obj = SimpleNamespace(reset_params=mcmcmc.MCMCMC.reset_params, chains=[chain1, chain2])
with pytest.raises(AttributeError) as e:
mc_obj.reset_params(mc_obj, ['a', 'b', 'c'])
assert "Incorrect number of params supplied in reset_params(). 2 expected; 3 supplied; ['a', 'b', 'c']" in str(e)
mc_obj.reset_params(mc_obj, ['a', 'b'])
assert walker2.params == ['a', 'b']
def add_option(self, key, default, *, reset=True):
"""Add a new option without adding widgets to the setting dialog.
``section[key]`` will be *default* unless something else is
specified.
If *reset* is True, the setting dialog's reset button sets this
option to *default*.
.. note::
The *reset* argument should be False for settings that
cannot be changed with the dialog. That way, clicking the
reset button resets only the settings that are shown in the
dialog.
"""
self._infos[key] = types.SimpleNamespace(
default=default, # not validated
reset=reset,
callbacks=[],
errorvar=tkinter.BooleanVar(), # true when the triangle is showing
)
def __init__(self, data_dir, work_dir):
self.RandomState = np.random.RandomState(seed=20161013)
self.data_mean = None
self.data_std = None
self.class_count = None
self.meta = None
self.train_meta = None
self.test_data = types.SimpleNamespace(X=None, y=None, meta=None)
self.validation_data = types.SimpleNamespace(X=None, y=None, meta=None)
self.data_dir = data_dir
self.work_dir = work_dir
self.data_dir = os.path.abspath(self.data_dir) + '/'
self.work_dir = os.path.abspath(self.work_dir) + '/'
if not os.path.isdir(self.data_dir):
raise NotADirectoryError("{} is not a proper dataset directory.".format(self.data_dir))
if not os.path.isdir(self.work_dir):
raise NotADirectoryError("{} is not a proper working directory.".format(self.work_dir))
def test_attrdel(self):
ns1 = types.SimpleNamespace()
ns2 = types.SimpleNamespace(x=1, y=2, w=3)
with self.assertRaises(AttributeError):
del ns1.spam
with self.assertRaises(AttributeError):
del ns2.spam
del ns2.y
self.assertEqual(vars(ns2), dict(w=3, x=1))
ns2.y = 'spam'
self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
del ns2.y
self.assertEqual(vars(ns2), dict(w=3, x=1))
ns1.spam = 5
self.assertEqual(vars(ns1), dict(spam=5))
del ns1.spam
self.assertEqual(vars(ns1), {})
def test_html_writer(self):
self.maxDiff = None
htdocs = 'testdata/htdocs'
args = types.SimpleNamespace()
setattr(args, 'arch', 'x86')
setattr(args, 'htdocs', htdocs)
setattr(args, 'rel_area', 'testdata/relarea')
setattr(args, 'dryrun', False)
setattr(args, 'force', True)
setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')
packages = package.read_packages(args.rel_area, args.arch)
package.validate_packages(args, packages)
pkg2html.update_package_listings(args, packages, args.arch)
# compare the output files with expected
for (dirpath, subdirs, files) in os.walk(htdocs):
relpath = os.path.relpath(dirpath, htdocs)
for f in files:
results = os.path.join(htdocs, relpath, f)
expected = os.path.join('testdata/htdocs.expected', relpath, f)
if not filecmp.cmp(results, expected, shallow=False):
logging.info("%s different", os.path.join(relpath, f))
with open(results) as r, open(expected) as e:
self.assertMultiLineEqual(e.read(), r.read())
else:
logging.info("%s identical", os.path.join(relpath, f))
def test_scan_uploads(self):
self.maxDiff = None
args = types.SimpleNamespace()
setattr(args, 'arch', 'x86')
setattr(args, 'rel_area', 'testdata/relarea')
setattr(args, 'dryrun', False)
pkglist = ['after-ready', 'not-ready', 'testpackage', 'testpackage2']
mlist = {}
mlist = maintainers.Maintainer.add_directories(mlist, 'testdata/homes')
m = mlist['Blooey McFooey']
m.pkgs.extend(pkglist + ['not-on-package-list'])
ready_fns = [(os.path.join(m.homedir(), 'x86', 'release', 'testpackage', '!ready'), ''),
(os.path.join(m.homedir(), 'x86', 'release', 'testpackage2', 'testpackage2-subpackage', '!ready'), ''),
(os.path.join(m.homedir(), 'x86', 'release', 'after-ready', '!ready'), '-t 198709011700'),
(os.path.join(m.homedir(), 'x86', 'release', 'corrupt', '!ready'), '')]
for (f, t) in ready_fns:
os.system('touch %s "%s"' % (t, f))
scan_result = uploads.scan(m, pkglist + ['not-on-maintainer-list'], args.arch, args)
self.assertEqual(scan_result.error, False)
compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.to_relarea), 'move')
self.assertCountEqual(scan_result.to_vault, {'x86/release/testpackage': ['x86/release/testpackage/testpackage-0.1-1.tar.bz2']})
self.assertCountEqual(scan_result.remove_always, [f for (f, t) in ready_fns])
self.assertEqual(scan_result.remove_success, ['testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1-src.tar.bz2', 'testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1.tar.bz2'])
with pprint_patch():
compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.packages), 'pkglist')
def __init__(self,
repo,
uri=None,
response=None,
rdf_prefixes_mixins=None):
# repository handle is pinned to resource instance here
self.repo = repo
# parse uri with parse_uri() from repo instance
self.uri = self.repo.parse_uri(uri)
# parse response
# if response provided, parse and set to attributes
if response:
self.response = response
self.data = self.response.content
self.headers = self.response.headers
self.status_code = self.response.status_code
# if response, and status_code is 200, set True
if self.status_code == 200:
self.exists = True
# if not response, set all blank
else:
self.response = None
self.data = None
self.headers = {}
self.status_code = None
self.exists = False
# RDF
self._build_rdf(data=self.data)
# versions
self.versions = SimpleNamespace()
def parse_object_like_triples(self):
'''
method to parse triples from self.rdf.graph for object-like
access
Args:
None
Returns:
None: sets self.rdf.triples
'''
# parse triples as object-like attributes in self.rdf.triples
self.rdf.triples = SimpleNamespace() # prepare triples
for s,p,o in self.rdf.graph:
# get ns info
ns_prefix, ns_uri, predicate = self.rdf.graph.compute_qname(p)
# if prefix as list not yet added, add
if not hasattr(self.rdf.triples, ns_prefix):
setattr(self.rdf.triples, ns_prefix, SimpleNamespace())
# same for predicate
if not hasattr(getattr(self.rdf.triples, ns_prefix), predicate):
setattr(getattr(self.rdf.triples, ns_prefix), predicate, [])
# append object for this prefix
getattr(getattr(self.rdf.triples, ns_prefix), predicate).append(o)