def _diff_graph(self):
'''
Uses rdflib.compare diff, https://github.com/RDFLib/rdflib/blob/master/rdflib/compare.py
When a resource is retrieved, the graph retrieved and parsed at that time is saved to self.rdf._orig_graph,
and all local modifications are made to self.rdf.graph. This method compares the two graphs and returns the diff
in the format of three graphs:
overlap - triples shared by both
removed - triples that exist ONLY in the original graph, self.rdf._orig_graph
added - triples that exist ONLY in the modified graph, self.rdf.graph
These are used for building a sparql update query for self.update.
Args:
None
Returns:
None: sets self.rdf.diffs and adds the three graphs mentioned, 'overlap', 'removed', and 'added'
'''
overlap, removed, added = graph_diff(
to_isomorphic(self.rdf._orig_graph),
to_isomorphic(self.rdf.graph))
diffs = SimpleNamespace()
diffs.overlap = overlap
diffs.removed = removed
diffs.added = added
self.rdf.diffs = diffs
python类SimpleNamespace()的实例源码
def model(self):
return types.SimpleNamespace()
def test_discovery_from_dotted_namespace_packages(self):
if not getattr(types, 'SimpleNamespace', None):
raise unittest.SkipTest('Namespaces not supported')
loader = unittest.TestLoader()
orig_import = __import__
package = types.ModuleType('package')
package.__path__ = ['/a', '/b']
package.__spec__ = types.SimpleNamespace(
loader=None,
submodule_search_locations=['/a', '/b']
)
def _import(packagename, *args, **kwargs):
sys.modules[packagename] = package
return package
def cleanup():
builtins.__import__ = orig_import
self.addCleanup(cleanup)
builtins.__import__ = _import
_find_tests_args = []
def _find_tests(start_dir, pattern, namespace=None):
_find_tests_args.append((start_dir, pattern))
return ['%s/tests' % start_dir]
loader._find_tests = _find_tests
loader.suiteClass = list
suite = loader.discover('package')
self.assertEqual(suite, ['/a/tests', '/b/tests'])
def get(self, url):
for matcher in self.url_matches:
if matcher[0].fullmatch(url):
return SimpleNamespace(data=json.load(
open(os.path.join(app.root_path, 'mock/fixtures/' + matcher[1]))
))
return SimpleNamespace(data={})
def __init__(self, numLed, hwBackend, port=6606):
self.ctx = zmq.Context()
self.numLed = numLed
self.port = port
self.loop = IOLoop.instance()
self.caller = PeriodicCallback(self._on_nextFrame, 1000/30, self.loop)
self.hwComm = hwBackend
self.hwComm.connect()
self.zmqCollector = GlinAppZmqCollector(self, self.ctx)
self.zmqPublisher = GlinAppZmqPublisher(self, self.ctx)
# server side configuration
self.config = SimpleNamespace()
self.config.maxFps = 60
# current state (somehow client side configuration)
self.state = SimpleNamespace()
self.state.animationClasses = []
self.state.activeSceneId = None
self.state.activeAnimation = None
self.state.scenes = {}
self.state.brightness = 1.0
self.state.sceneIdCtr = 0
self.state.mainswitch = True
self.state.targetFps = 0
self.state.lastFrameSent = None
def __init__(self, repo):
# Must have run git-annex init
if repo.lookup_branch('git-annex') is None:
fmt = 'Repository {} is not a git-annex repo.'
msg = fmt.format(repo)
raise NotAGitAnnexRepoError(msg)
self.repo = repo
self.processes = types.SimpleNamespace()
self.processes.metadata = \
GitAnnexMetadataBatchJsonProcess(self.repo.workdir)
self.processes.contentlocation = \
GitAnnexContentlocationBatchProcess(self.repo.workdir)
def base_test_data():
"""
Fixture for test data that should be available to any test case in the suite
"""
# Create a live program with valid prices and financial aid
program = ProgramFactory.create(
live=True,
financial_aid_availability=True,
price=1000,
)
CourseRunFactory.create(course__program=program)
TierProgramFactory.create_properly_configured_batch(2, program=program)
# Create users
staff_user, student_user = (create_user_for_login(is_staff=True), create_user_for_login(is_staff=False))
ProgramEnrollment.objects.create(program=program, user=staff_user)
ProgramEnrollment.objects.create(program=program, user=student_user)
Role.objects.create(
role=Staff.ROLE_ID,
user=staff_user,
program=program,
)
return SimpleNamespace(
staff_user=staff_user,
student_user=student_user,
program=program
)
def program_data():
"""
Fixture for program and tier_program test data
"""
program, tier_programs = create_program()
return SimpleNamespace(
program=program,
tier_programs=tier_programs,
)
def test_props(self, program_data):
"""
Fixture that provides test properties for FinancialAidDetailView test cases
"""
user = create_enrolled_profile(program_data.program).user
pending_fa = FinancialAidFactory.create(
user=user,
tier_program=program_data.tier_programs["25k"],
status=FinancialAidStatus.PENDING_DOCS
)
docs_sent_url = reverse(
"financial_aid",
kwargs={"financial_aid_id": pending_fa.id}
)
docs_sent_date = now_in_utc().date()
docs_sent_request_params = dict(
content_type="application/json",
data=json.dumps({
"date_documents_sent": docs_sent_date.strftime("%Y-%m-%d")
})
)
return SimpleNamespace(
user=user,
pending_fa=pending_fa,
docs_sent_url=docs_sent_url,
docs_sent_request_params=docs_sent_request_params,
docs_sent_date=docs_sent_date,
)
def __getattr__(self, name):
try:
return super(SimpleNamespace, self).__getitem__(name)
except KeyError:
raise AttributeError('{0} has no attribute {1}'
.format(self.__class__.__name__, name))
def __setattr__(self, name, value):
super(SimpleNamespace, self).__setitem__(name, value)
def record_to_dict(record: Record):
"""
Converts a record/namespace to a dictionary.
Notes:
If you want to convert a dict to a record, simply call ``record(**D)``.
"""
if isinstance(record, SimpleNamespace):
return dict(record.__dict__)
return dict(record._items())
def push_args():
args = types.SimpleNamespace()
args.accept = False
args.assignee = None
args.force = False
args.merge = False
args.mr_title = None
args.push_spec = None
args.ready = False
args.reviewers = None
args.target_branch = "master"
args.title = None
args.wip = False
return args
def load(name):
return SimpleNamespace(**_templates[name])
# autoload all templates
def read_config(cfgfile='settings.cfg', section='DEFAULT'):
"""Parse the Hydrus configuration file."""
cfg = SimpleNamespace()
parser = ConfigParser()
parser.read(cfgfile)
cfg.RAPIDCLUS = parser.getboolean(section, 'RAPIDCLUS')
cfg.QUADRATURE = parser.getboolean(section, 'QUADRATURE')
cfg.INFILE = parser.get(section, 'INFILE')
cfg.MEASURE_SETTINGS = parser.get(section, 'MEASURE_SETTINGS')
return cfg
def dispatch_custom_completer(self, text):
if not self.custom_completers:
return
line = self.line_buffer
if not line.strip():
return None
# Create a little structure to pass all the relevant information about
# the current completion to any custom completer.
event = SimpleNamespace()
event.line = line
event.symbol = text
cmd = line.split(None,1)[0]
event.command = cmd
event.text_until_cursor = self.text_until_cursor
# for foo etc, try also to find completer for %foo
if not cmd.startswith(self.magic_escape):
try_magic = self.custom_completers.s_matches(
self.magic_escape + cmd)
else:
try_magic = []
for c in itertools.chain(self.custom_completers.s_matches(cmd),
try_magic,
self.custom_completers.flat_matches(self.text_until_cursor)):
try:
res = c(event)
if res:
# first, try case sensitive match
withcase = [r for r in res if r.startswith(text)]
if withcase:
return withcase
# if none, then case insensitive ones are ok too
text_low = text.lower()
return [r for r in res if r.lower().startswith(text_low)]
except TryNext:
pass
return None
def get(self, edit_count):
request = types.SimpleNamespace()
request.user = UserFactory(edit_count=edit_count)
request.method = "GET"
return request
def post(self, edit_count):
request = types.SimpleNamespace()
request.user = UserFactory(edit_count=edit_count)
request.POST = {}
request.method = "POST"
return request
def get(self):
request = types.SimpleNamespace()
request.method = "GET"
return request
def post(self, post):
request = types.SimpleNamespace()
request.POST = post
request.method = "POST"
return request