def _get_function_source(func):
if _PY34:
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
func = func.__wrapped__
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if _PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
python类partialmethod()的实例源码
def _define_field_accessors(cls):
if not cls.PREDEFINED_TAGS:
cls.PREDEFINED_TAGS = list(set(cls.DATATYPE.keys()) - set(cls.POSFIELDS))
fieldnames = cls.POSFIELDS + cls.PREDEFINED_TAGS
if cls.NAME_FIELD and cls.NAME_FIELD not in fieldnames:
fieldnames.append(cls.NAME_FIELD)
for fieldname in fieldnames:
def get_method(self, fieldname):
return self.get(fieldname)
def set_method(self, value, fieldname):
return self._set_existing_field(fieldname, value)
setattr(cls, fieldname,
DynamicField(partial(get_method, fieldname = fieldname),
partial(set_method, fieldname = fieldname)))
def try_get_method(self, fieldname):
return self.try_get(fieldname)
setattr(cls, "try_get_" + fieldname,
partialmethod(try_get_method, fieldname = fieldname))
def test_partialmethod_basic(check_lru, loop):
class Obj:
async def _coro(self, val):
return val
coro = alru_cache(partialmethod(_coro, 2), loop=loop)
obj = Obj()
coros = [obj.coro() for _ in range(5)]
check_lru(obj.coro, hits=0, misses=0, cache=0, tasks=0)
ret = await asyncio.gather(*coros, loop=loop)
check_lru(obj.coro, hits=4, misses=1, cache=1, tasks=0)
assert ret == [2, 2, 2, 2, 2]
def test_partialmethod_partial(check_lru, loop):
class Obj:
def __init__(self):
self.coro = alru_cache(partial(self._coro, 2), loop=loop)
async def __coro(self, val1, val2):
return val1 + val2
_coro = partialmethod(__coro, 1)
obj = Obj()
coros = [obj.coro() for _ in range(5)]
check_lru(obj.coro, hits=0, misses=0, cache=0, tasks=0)
ret = await asyncio.gather(*coros, loop=loop)
check_lru(obj.coro, hits=4, misses=1, cache=1, tasks=0)
assert ret == [3, 3, 3, 3, 3]
def test_partialmethod_cls_loop(check_lru, loop):
class Obj:
def __init__(self, loop):
self._loop = loop
async def _coro(self, val):
return val
coro = alru_cache(partialmethod(_coro, 2), cls=True, loop='_loop')
obj = Obj(loop=loop)
coros = [obj.coro() for _ in range(5)]
check_lru(obj.coro, hits=0, misses=0, cache=0, tasks=0)
ret = await asyncio.gather(*coros, loop=loop)
check_lru(obj.coro, hits=4, misses=1, cache=1, tasks=0)
assert ret == [2, 2, 2, 2, 2]
def test_partialmethod_kwargs_loop(check_lru, loop):
class Obj:
async def _coro(self, val, *, _loop):
return val
coro = alru_cache(partialmethod(_coro, 2), kwargs=True, loop='_loop')
obj = Obj()
coros = [obj.coro(_loop=loop) for _ in range(5)]
check_lru(obj.coro, hits=0, misses=0, cache=0, tasks=0)
ret = await asyncio.gather(*coros, loop=loop)
check_lru(obj.coro, hits=4, misses=1, cache=1, tasks=0)
assert ret == [2, 2, 2, 2, 2]
def test_handle_repr(self):
self.loop.get_debug.return_value = False
# simple function
h = asyncio.Handle(noop, (1, 2), self.loop)
filename, lineno = test_utils.get_function_source(noop)
self.assertEqual(repr(h),
'<Handle noop(1, 2) at %s:%s>'
% (filename, lineno))
# cancelled handle
h.cancel()
self.assertEqual(repr(h),
'<Handle cancelled>')
# decorated function
cb = asyncio.coroutine(noop)
h = asyncio.Handle(cb, (), self.loop)
self.assertEqual(repr(h),
'<Handle noop() at %s:%s>'
% (filename, lineno))
# partial function
cb = functools.partial(noop, 1, 2)
h = asyncio.Handle(cb, (3,), self.loop)
regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
% (re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
# partial method
if sys.version_info >= (3, 4):
method = HandleTests.test_handle_repr
cb = functools.partialmethod(method)
filename, lineno = test_utils.get_function_source(method)
h = asyncio.Handle(cb, (), self.loop)
cb_regex = r'<function HandleTests.test_handle_repr .*>'
cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
regex = (r'^<Handle %s at %s:%s>$'
% (cb_regex, re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls._reaction_map = OrderedDict()
_suppressed_methods = set()
# We can't use inspect.getmembers because it returns the members in
# lexographical order, rather than definition order.
for name, member in itertools.chain.from_iterable(b.__dict__.items() for b in cls.__mro__):
if name.startswith('_'):
continue
# Support for using functools.partialmethod as a means of simplifying pages.
is_callable = callable(member) or isinstance(member, functools.partialmethod)
# Support suppressing page methods by assigning them to None
if not (member is None or is_callable):
continue
# Let sub-classes override the current methods.
if name in cls._reaction_map.values():
continue
# Let subclasses suppress page methods.
if name in _suppressed_methods:
continue
if member is None:
_suppressed_methods.add(name)
continue
emoji = getattr(member, '__reaction_emoji__', None)
if emoji:
cls._reaction_map[emoji] = name
_log.debug('Initializing emoji %s for method %s in class %s',
hex(ord(emoji)), member.__qualname__, cls.__name__)
# We need to move stop to the end (assuming it exists).
# Otherwise it will show up somewhere in the middle
with contextlib.suppress(StopIteration):
key = next(k for k, v in cls._reaction_map.items() if v == 'stop')
cls._reaction_map.move_to_end(key)
def _get_function_source(func):
if compat.PY34:
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
func = func.__wrapped__
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if compat.PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
def _inject_api_requests_methods(cls):
"""
requests.api is a module consisting of all the HTTP request types, defined as methods. If we're being called
with one of these types, then execute the call against the running server.
"""
for name in dir(requests.api):
if not name.startswith('_'):
func = getattr(requests.api, name)
if isinstance(func, types.FunctionType) and func.__module__ == requests.api.__name__:
setattr(cls, name, functools.partialmethod(cls._make_call, func))
def _get_function_source(func):
if compat.PY34:
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
func = func.__wrapped__
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if compat.PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
def test_invalid_args(self):
with self.assertRaises(TypeError):
class B(object):
method = functools.partialmethod(None, 1)
def test_repr(self):
self.assertEqual(repr(vars(self.A)['both']),
'functools.partialmethod({}, 3, b=4)'.format(capture))
def test_abstract(self):
class Abstract(abc.ABCMeta):
@abc.abstractmethod
def add(self, x, y):
pass
add5 = functools.partialmethod(add, 5)
self.assertTrue(Abstract.add.__isabstractmethod__)
self.assertTrue(Abstract.add5.__isabstractmethod__)
for func in [self.A.static, self.A.cls, self.A.over_partial, self.A.nested, self.A.both]:
self.assertFalse(getattr(func, '__isabstractmethod__', False))
def test_handle_repr(self):
self.loop.get_debug.return_value = False
# simple function
h = asyncio.Handle(noop, (1, 2), self.loop)
filename, lineno = test_utils.get_function_source(noop)
self.assertEqual(repr(h),
'<Handle noop(1, 2) at %s:%s>'
% (filename, lineno))
# cancelled handle
h.cancel()
self.assertEqual(repr(h),
'<Handle cancelled>')
# decorated function
cb = asyncio.coroutine(noop)
h = asyncio.Handle(cb, (), self.loop)
self.assertEqual(repr(h),
'<Handle noop() at %s:%s>'
% (filename, lineno))
# partial function
cb = functools.partial(noop, 1, 2)
h = asyncio.Handle(cb, (3,), self.loop)
regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
% (re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
# partial method
if sys.version_info >= (3, 4):
method = HandleTests.test_handle_repr
cb = functools.partialmethod(method)
filename, lineno = test_utils.get_function_source(method)
h = asyncio.Handle(cb, (), self.loop)
cb_regex = r'<function HandleTests.test_handle_repr .*>'
cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
regex = (r'^<Handle %s at %s:%s>$'
% (cb_regex, re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
def test_signature_on_partialmethod(self):
from functools import partialmethod
class Spam:
def test():
pass
ham = partialmethod(test)
with self.assertRaisesRegex(ValueError, "has incorrect arguments"):
inspect.signature(Spam.ham)
class Spam:
def test(it, a, *, c) -> 'spam':
pass
ham = partialmethod(test, c=1)
self.assertEqual(self.signature(Spam.ham),
((('it', ..., ..., 'positional_or_keyword'),
('a', ..., ..., 'positional_or_keyword'),
('c', 1, ..., 'keyword_only')),
'spam'))
self.assertEqual(self.signature(Spam().ham),
((('a', ..., ..., 'positional_or_keyword'),
('c', 1, ..., 'keyword_only')),
'spam'))
def partialmethod(method, **kwargs):
return lambda self: method(self, **kwargs)
def partialmethod(method, **kwargs):
return lambda self: method(self, **kwargs)
def _get_function_source(func):
if _PY34:
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
func = func.__wrapped__
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if _PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
def test_invalid_args(self):
with self.assertRaises(TypeError):
class B(object):
method = functools.partialmethod(None, 1)
def test_repr(self):
self.assertEqual(repr(vars(self.A)['both']),
'functools.partialmethod({}, 3, b=4)'.format(capture))
def test_abstract(self):
class Abstract(abc.ABCMeta):
@abc.abstractmethod
def add(self, x, y):
pass
add5 = functools.partialmethod(add, 5)
self.assertTrue(Abstract.add.__isabstractmethod__)
self.assertTrue(Abstract.add5.__isabstractmethod__)
for func in [self.A.static, self.A.cls, self.A.over_partial, self.A.nested, self.A.both]:
self.assertFalse(getattr(func, '__isabstractmethod__', False))
def test_handle_repr(self):
self.loop.get_debug.return_value = False
# simple function
h = asyncio.Handle(noop, (1, 2), self.loop)
filename, lineno = test_utils.get_function_source(noop)
self.assertEqual(repr(h),
'<Handle noop(1, 2) at %s:%s>'
% (filename, lineno))
# cancelled handle
h.cancel()
self.assertEqual(repr(h),
'<Handle cancelled>')
# decorated function
cb = asyncio.coroutine(noop)
h = asyncio.Handle(cb, (), self.loop)
self.assertEqual(repr(h),
'<Handle noop() at %s:%s>'
% (filename, lineno))
# partial function
cb = functools.partial(noop, 1, 2)
h = asyncio.Handle(cb, (3,), self.loop)
regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
% (re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
# partial method
if sys.version_info >= (3, 4):
method = HandleTests.test_handle_repr
cb = functools.partialmethod(method)
filename, lineno = test_utils.get_function_source(method)
h = asyncio.Handle(cb, (), self.loop)
cb_regex = r'<function HandleTests.test_handle_repr .*>'
cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
regex = (r'^<Handle %s at %s:%s>$'
% (cb_regex, re.escape(filename), lineno))
self.assertRegex(repr(h), regex)
def test_signature_on_partialmethod(self):
from functools import partialmethod
class Spam:
def test():
pass
ham = partialmethod(test)
with self.assertRaisesRegex(ValueError, "has incorrect arguments"):
inspect.signature(Spam.ham)
class Spam:
def test(it, a, *, c) -> 'spam':
pass
ham = partialmethod(test, c=1)
self.assertEqual(self.signature(Spam.ham),
((('it', ..., ..., 'positional_or_keyword'),
('a', ..., ..., 'positional_or_keyword'),
('c', 1, ..., 'keyword_only')),
'spam'))
self.assertEqual(self.signature(Spam().ham),
((('a', ..., ..., 'positional_or_keyword'),
('c', 1, ..., 'keyword_only')),
'spam'))