def test_check_closed(self):
f = dumbdbm.open(_fname, 'c')
f.close()
for meth in (partial(operator.delitem, f),
partial(operator.setitem, f, 'b'),
partial(operator.getitem, f),
partial(operator.contains, f)):
with self.assertRaises(dumbdbm.error) as cm:
meth('test')
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
for meth in (operator.methodcaller('keys'),
operator.methodcaller('iterkeys'),
operator.methodcaller('items'),
len):
with self.assertRaises(dumbdbm.error) as cm:
meth(f)
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
python类methodcaller()的实例源码
def test_methodcaller(self):
self.assertRaises(TypeError, operator.methodcaller)
class A:
def foo(self, *args, **kwds):
return args[0] + args[1]
def bar(self, f=42):
return f
a = A()
f = operator.methodcaller('foo')
self.assertRaises(IndexError, f, a)
f = operator.methodcaller('foo', 1, 2)
self.assertEqual(f(a), 3)
f = operator.methodcaller('bar')
self.assertEqual(f(a), 42)
self.assertRaises(TypeError, f, a, a)
f = operator.methodcaller('bar', f=5)
self.assertEqual(f(a), 5)
def get_vlan_graph_url(vlanid, family=4, timeframe="day"):
"""Returns a Graphite graph render URL for a VLAN"""
vlan = get_object_or_404(Vlan, pk=vlanid)
try:
family = int(family)
except ValueError:
family = 4
extra = {'where': ['family(netaddr) = %s' % family]}
prefixes = sorted(vlan.prefix_set.all().extra(**extra),
key=methodcaller('get_prefix_size'),
reverse=True)
if not prefixes:
return None
metrics = _vlan_metrics_from_prefixes(prefixes, family)
return get_simple_graph_url(
metrics, "1" + timeframe,
title="Total IPv{0} addresses on VLAN {1}".format(family, vlan),
width=597, height=251)
def test_check_closed(self):
f = dumbdbm.open(_fname, 'c')
f.close()
for meth in (partial(operator.delitem, f),
partial(operator.setitem, f, 'b'),
partial(operator.getitem, f),
partial(operator.contains, f)):
with self.assertRaises(dumbdbm.error) as cm:
meth('test')
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
for meth in (operator.methodcaller('keys'),
operator.methodcaller('iterkeys'),
operator.methodcaller('items'),
len):
with self.assertRaises(dumbdbm.error) as cm:
meth(f)
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def consensus(self):
"""
Select consensus MEI. Selection criteria (ordered by preference order):
1) Lowest CIPOS
2) Highest total number of supporting reads (+ plus - cluster supporting reads)
chr1 --------*---------------*-------------*------------
beg cluster1 cluster2 cluster3 end
(MEI1,MEI2,MEI3) (MEI4) (MEI5,MEI6)
consensus **** **** ****
"""
## 1. Sort by total number of supporting paired-ends (decreasing order)
sortedList1 = sorted(self.MEIlist, key=methodcaller('nbPE'), reverse=True)
## 2. Sort by CIPOS (ascending order)
MEItuples = [(MEIobj, int(MEIobj.infoDict["CIPOS"])) for MEIobj in sortedList1]
sortedMEItuples = sorted(MEItuples, key=itemgetter(1))
sortedList2 = [i[0] for i in sortedMEItuples]
## 3. Select consensus MEI (the one with lowest CIPOS and highest total number of supporting paired-ends)
consensusMEIobj = sortedList2[0]
return consensusMEIobj
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def _AnyMessageToJsonObject(self, message):
"""Converts Any message according to Proto3 JSON Specification."""
if not message.ListFields():
return {}
# Must print @type first, use OrderedDict instead of {}
js = OrderedDict()
type_url = message.type_url
js['@type'] = type_url
sub_message = _CreateMessageFromTypeUrl(type_url)
sub_message.ParseFromString(message.value)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
js['value'] = self._WrapperMessageToJsonObject(sub_message)
return js
if full_name in _WKTJSONMETHODS:
js['value'] = methodcaller(_WKTJSONMETHODS[full_name][0],
sub_message)(self)
return js
return self._RegularMessageToJsonObject(sub_message, js)
def ConvertMessage(self, value, message):
"""Convert a JSON object into a message.
Args:
value: A JSON object.
message: A WKT or regular protocol message to record the data.
Raises:
ParseError: In case of convert problems.
"""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value, message)
elif full_name in _WKTJSONMETHODS:
methodcaller(_WKTJSONMETHODS[full_name][1], value, message)(self)
else:
self._ConvertFieldValuePair(value, message)
def _ConvertAnyMessage(self, value, message):
"""Convert a JSON representation into Any message."""
if isinstance(value, dict) and not value:
return
try:
type_url = value['@type']
except KeyError:
raise ParseError('@type is missing when parsing any message.')
sub_message = _CreateMessageFromTypeUrl(type_url)
message_descriptor = sub_message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
self._ConvertWrapperMessage(value['value'], sub_message)
elif full_name in _WKTJSONMETHODS:
methodcaller(
_WKTJSONMETHODS[full_name][1], value['value'], sub_message)(self)
else:
del value['@type']
self._ConvertFieldValuePair(value, sub_message)
# Sets Any message
message.value = sub_message.SerializeToString()
message.type_url = type_url
def __init__(self, **kwargs):
do_all = kwargs.pop('all', False) is True
self.fixes = []
for fix, cls in all_fixes.items():
if do_all:
demand = fix not in kwargs
kwargs.pop(fix, None)
else:
demand = bool(kwargs.pop(fix, False))
if demand:
options = {}
for opt in cls.options.keys():
if opt in kwargs:
options[opt] = kwargs.pop(opt)
self.fixes.append(cls(**options))
self.fixes.sort(key=lambda fix: fix.order)
super(WikitextFixingBot, self).__init__(**kwargs)
for fix in self.fixes:
fix.site = self.site
if not self.generator:
pywikibot.output('No generator provided, making own generator...')
self.generator = pagegenerators.PreloadingGenerator(
chain.from_iterable(map(methodcaller('generator'), self.fixes)))
def formatter(self, textvalue):
prop = self.current_page
if prop.type not in ['commonsMedia', 'external-id', 'string']:
pywikibot.output('"%s" datatype doesn\'t make use of formatter'
'' % prop.type)
return True
for match in self.get_formatter_regex().findall(textvalue):
if any(map(methodcaller('target_equals', match),
prop.claims.get('P1630', []))):
pywikibot.output('"%s" already has "%s" as the formatter URL'
'' % (prop.title(), match))
continue
if match.strip() in ['http://', 'https://']:
continue # ???
claim = pywikibot.Claim(self.repo, 'P1630')
claim.setTarget(match)
self.user_edit_entity(prop, {'claims':[claim.toJSON()]},
summary=self.make_summary('P1630', match),
asynchronous=True)
return True
def from_mapping(self, *mapping, **kwargs) -> bool:
""" Updates the config like :meth:`update` but ignoring items with
non-upper keys.
"""
if len(mapping) > 1:
raise TypeError(
'expected at most 1 positional argument, got %d' % len(mapping)
)
dict_: dict = {}
if len(mapping) == 1:
dict_ = mapping[0]
if not isinstance(dict_, dict):
raise TypeError(
'expected dict type argument, got %s' % dict_.__class__
)
for key, value in chain(*map(methodcaller('items'),
(dict_, kwargs))):
if key.isupper():
self.data[key] = value
return True
json_format.py 文件源码
项目:ios-xr-grpc-python
作者: cisco-grpc-connection-libs
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def _MessageToJsonObject(self, message):
"""Converts message to an object according to Proto3 JSON Specification."""
message_descriptor = message.DESCRIPTOR
full_name = message_descriptor.full_name
if _IsWrapperMessage(message_descriptor):
return self._WrapperMessageToJsonObject(message)
if full_name in _WKTJSONMETHODS:
return methodcaller(_WKTJSONMETHODS[full_name][0], message)(self)
js = {}
return self._RegularMessageToJsonObject(message, js)
def WithErrorTypeAndMessage(error_type, message):
"""
Check that a Twisted failure was caused by a certain error type with a
certain message.
"""
return MatchesAll(
MatchesStructure(value=IsInstance(error_type)),
After(methodcaller('getErrorMessage'), Equals(message))
)
def request(content, endpoint, api, language=None, uri=False, **kwargs):
"""Request Rosette API results for the given content and endpoint.
This method gets the requested results from the Rosette API as JSON. If
api's output parameter has been set to "rosette" then the JSON will consist
of an A(nnotated) D(ata) M(odel) or ADM. An ADM is a Python dict
representing document content, annotations of the document content,
and document metadata.
content: path or URI of a document for the Rosette API to process
endpoint: a Rosette API endpoint string (e.g., 'entities')
(see https://developer.rosette.com/features-and-functions)
api: a rosette.api.API instance
(e.g., API(user_key=<key>, service_url=<url>))
language: an optional ISO 639-2 T language code
(the Rosette API will automatically detect the language of the
content by default)
uri: specify that the content is to be treated as a URI and the
the document content is to be extracted from the URI
kwargs: additional keyword arguments
(e.g., if endpoint is 'morphology' you can specify facet='lemmas';
see https://developer.rosette.com/features-and-functions for
complete documentation)
"""
parameters = DocumentParameters()
if uri:
parameters['contentUri'] = content
else:
parameters['content'] = content
parameters['language'] = language
adm = methodcaller(endpoint, parameters, **kwargs)(api)
return adm