def default(self, obj):
if hasattr(obj, "to_json"):
return self.default(obj.to_json())
elif hasattr(obj, "__dict__"):
d = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(d)
return obj
python类isgenerator()的实例源码
def test_excluding_predicates(self):
self.istest(inspect.isbuiltin, 'sys.exit')
if check_impl_detail():
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.__code__')
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.isfunction, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.istraceback, 'tb')
self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory')
self.istest(inspect.isgenerator, '(x for x in range(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor,
'type(lambda: None).__globals__')
else:
self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def _uid_str(uid_list: str or [str] or Generator) -> str:
"""
Prepare list of uid for use in commands: delete/copy/move/seen
uid_list can be: str, list, tuple, set, fetch generator
"""
if not uid_list:
raise MailBox.MailBoxUidParamError('uid_list should not be empty')
if type(uid_list) is str:
uid_list = uid_list.split(',')
if inspect.isgenerator(uid_list):
uid_list = [msg.uid for msg in uid_list if msg.uid]
if type(uid_list) not in (list, tuple, set):
raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list)))
for uid in uid_list:
if type(uid) is not str:
raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid)))
if not uid.strip().isdigit():
raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid))
return ','.join((i.strip() for i in uid_list))
def test_excluding_predicates(self):
self.istest(inspect.isbuiltin, 'sys.exit')
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.func_code')
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.istraceback, 'tb')
self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
self.istest(inspect.isgenerator, '(x for x in xrange(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
else:
self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def test_excluding_predicates(self):
self.istest(inspect.isbuiltin, 'sys.exit')
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.func_code')
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.istraceback, 'tb')
self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
self.istest(inspect.isgenerator, '(x for x in xrange(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
else:
self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def delete(self, paths):
"""Delete L{Namespace}s matching C{paths}.
@param paths: A sequence of L{Namespace.path}s.
@raises NotEmptyError: Raised if the L{Namespace} is not empty.
@return: A C{list} of C{(objectID, Namespace.path)} 2-tuples
representing the deleted L{Namespace}s.
"""
if isgenerator(paths):
paths = list(paths)
if getChildNamespaces(paths).any() or getChildTags(paths).any():
raise NotEmptyError("Can't delete non-empty namespaces.")
result = getNamespaces(paths=paths)
deletedNamespaces = list(result.values(Namespace.objectID,
Namespace.path))
values = [(objectID, systemTag)
for objectID, _ in deletedNamespaces
for systemTag in (u'fluiddb/namespaces/description',
u'fluiddb/namespaces/path')]
if values:
self._factory.tagValues(self._user).delete(values)
result.remove()
return deletedNamespaces
def delete(self, values):
"""Delete L{TagValue}s.
@param values: A sequence of C{(objectID, Tag.path)} 2-tuples to
delete values for.
@raise FeatureError: Raised if the given list of values is empty.
@return: The number of values deleted.
"""
if isgenerator(values):
values = list(values)
if not values:
raise FeatureError("Can't delete an empty list of tag values.")
paths = set([path for objectID, path in values])
objectIDs = set([objectID for objectID, path in values])
tagIDs = dict(getTags(paths).values(Tag.path, Tag.id))
values = [(objectID, tagIDs[path]) for objectID, path in values]
result = getTagValues(values).remove()
if result:
touchObjects(objectIDs)
return result
def delete(self, paths):
"""See L{TagAPI.delete}.
Permissions for deleted L{Tag}s are removed from the cache.
"""
if isgenerator(paths):
paths = list(paths)
# FIXME getObjectIDs is called twice--once here and once in
# TagAPI.delete. It would be better if we only did this once, not to
# mention that this breaks encapsulation by bypassing the model layer
# and accessing the data layer directly. -jkakar
objectIDs = set(getObjectIDs(paths))
RecentObjectActivityCache().clear(objectIDs)
usernames = set([path.split('/')[0] for path in paths])
RecentUserActivityCache().clear(usernames)
PermissionCache().clearTagPermissions(paths)
return self._api.delete(paths)
def update(self, db_name, coll_name, results, task, query, upsert=True):
self.db_coll = self.db_conn[db_name][coll_name]
if inspect.isgenerator(results):
pass
elif not isinstance(results, list):
results = [results]
for r in results:
assert isinstance(r, dict), 'result saved to mongodb must be dict.'
if not query:
query = {'taskid': task.get('taskid')}
self.db_coll.update(
query,
{'$set': r,
'$setOnInsert': {'updated_at': time.time()}},
upsert=upsert,
)
def is_closable_iterator(obj):
# Not an iterator.
if not is_iterator(obj):
return False
# A generator - the easiest thing to deal with.
import inspect
if inspect.isgenerator(obj):
return True
# A custom iterator. Look for a close method...
if not (hasattr(obj, 'close') and callable(obj.close)):
return False
# ... which doesn't require any arguments.
try:
inspect.getcallargs(obj.close)
except TypeError:
return False
else:
return True
def test_excluding_predicates(self):
self.istest(inspect.isbuiltin, 'sys.exit')
if check_impl_detail():
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.func_code')
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.istraceback, 'tb')
self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
self.istest(inspect.isgenerator, '(x for x in xrange(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor, 'type(lambda: None).func_globals')
else:
self.assertFalse(inspect.ismemberdescriptor(type(lambda: None).func_globals))
def test_excluding_predicates(self):
self.istest(inspect.isbuiltin, 'sys.exit')
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.func_code')
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.istraceback, 'tb')
self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
self.istest(inspect.isgenerator, '(x for x in xrange(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
else:
self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def default(self, obj):
# if hasattr(obj, "to_json"):
# return self.default(obj.to_json())
if isinstance(obj, Enum):
return obj.name
elif hasattr(obj, "__dict__"):
d = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
and not self.isempty(value)
and not value is None
)
return self.default(d)
return obj
def call_agents(self, data):
"""call a method on all agents"""
try:
d = {'args': [], 'kwargs': {}}
d.update(data)
if self.agents:
results = [getattr(agent, d['func'])(*d['args'], **d['kwargs']) for agent in self.agents.values()]
if inspect.isgenerator(results[0]):
results = yield from asyncio.gather(*results)
else:
results = []
return {'status': 'ok', 'results': results}
except Exception as e:
tb = traceback.format_exc()
logger.exception(e)
logger.exception(tb)
return {'status': 'failed', 'exception': e, 'traceback': tb}
def call_agent(self, data):
"""call a method on an agent and get the result"""
d = {'args': [], 'kwargs': {}}
d.update(data)
id = d['id']
# check locally
if id in self.agents:
agent = self.agents[id]
result = getattr(agent, d['func'])(*d['args'], **d['kwargs'])
if inspect.isgenerator(result):
result = yield from result
return result
# pass request to the arbiter
else:
d['cmd'] = 'call_agent'
return (yield from self.arbiter.send_recv(d))
def __getattr__(self, item):
attr = getattr(self.delegate, item)
if inspect.iscoroutinefunction(attr) or hasattr(attr,
"_is_coroutine") and attr._is_coroutine or inspect.iscoroutine(
attr):
async def wrapper(*args, **kwargs):
return self._wrap(await attr(*args, **kwargs))
return wrapper() if inspect.iscoroutine(attr) else wrapper
elif inspect.isgeneratorfunction(attr) or inspect.isgenerator(attr):
def wrapper(*args, **kwargs):
for entry in attr(*args, **kwargs):
yield self._wrap(entry)
return wrapper if inspect.isgeneratorfunction(attr) else wrapper()
elif inspect.isfunction(attr):
def wrapper(*args, **kwargs):
return self._wrap(attr(*args, **kwargs))
return wrapper
else:
return self._wrap(attr)
def is_closable_iterator(obj):
# Not an iterator.
if not is_iterator(obj):
return False
# A generator - the easiest thing to deal with.
import inspect
if inspect.isgenerator(obj):
return True
# A custom iterator. Look for a close method...
if not (hasattr(obj, 'close') and callable(obj.close)):
return False
# ... which doesn't require any arguments.
try:
inspect.getcallargs(obj.close)
except TypeError:
return False
else:
return True
def test_tree(self):
prefix_tree = PrefixTree()
values = ['amy', 'ann', 'anne', 'emma', 'rob', 'roger', 'anna']
# Test setter
for value in values:
prefix_tree[value] = value
# Test getter
result = prefix_tree['ann']
self.assertTrue(result)
self.assertTrue(inspect.isgenerator(result))
# expect 'ann', 'anne' and 'anna'
self.assertEqual(len(list(result)), 3)
# Test containment
self.assertTrue('amy' in prefix_tree)
self.assertFalse('am' in prefix_tree)
def _send_code(self, code, wire_dir=None):
if not self.connected:
raise RuntimeError('HBI wire not connected')
# convert wire_dir as early as possible, will save conversions in case the code is of complex structure
if wire_dir is None:
wire_dir = b''
elif not isinstance(wire_dir, (bytes, bytearray)):
wire_dir = str(wire_dir).encode('utf-8')
# use a generator function to pull code from hierarchy
def pull_code(container):
for mc in container:
if inspect.isgenerator(mc):
yield from pull_code(mc)
else:
yield mc
if inspect.isgenerator(code):
for c in pull_code(code):
await self._send_text(c, wire_dir)
else:
await self._send_text(code, wire_dir)
def default(self, obj):
if hasattr(obj, "to_json"):
return self.default(obj.to_json())
elif hasattr(obj, "__dict__"):
data = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(data)
return obj
def default(self, obj):
if hasattr(obj, "to_json"):
return self.default(obj.to_json())
elif hasattr(obj, "__dict__"):
data = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(data)
return obj
def default(self, obj):
if hasattr(obj, "to_json"):
return self.default(obj.to_json())
elif hasattr(obj, "__dict__"):
data = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(data)
return obj
def post_process_extensions(self, extensions, resp_obj, request,
action_args):
for ext in extensions:
response = None
if inspect.isgenerator(ext):
# If it's a generator, run the second half of
# processing
try:
with ResourceExceptionHandler():
response = ext.send(resp_obj)
except StopIteration:
# Normal exit of generator
continue
except Fault as ex:
response = ex
else:
# Regular functions get post-processing...
try:
with ResourceExceptionHandler():
response = ext(req=request, resp_obj=resp_obj,
**action_args)
except Fault as ex:
response = ex
# We had a response...
if response:
return response
return None
def test_as_completed(monkeypatch):
service = RemoteService(DUMMY_SERVICE_URL)
monkeypatch.setattr(urllib.request, 'urlopen', dummy_urlopen)
items = [service.call_method_async("test", []) for _ in range(10)]
data = as_completed(*items)
assert inspect.isgenerator(data)
results = list(data)
assert len(results) == 10
def isgenerator(o):
if isinstance(o, UnboundMethod):
o = o._func
return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
def isgenerator(func):
try:
return func.func_code.co_flags & CO_GENERATOR != 0
except AttributeError:
return False
# Make a function to help check if an exception is derived from BaseException.
# In Python 2.4, we just use Exception instead.
def post_process_extensions(self, extensions, resp_obj, request,
action_args):
for ext in extensions:
response = None
if inspect.isgenerator(ext):
# If it's a generator, run the second half of
# processing
try:
with ResourceExceptionHandler():
response = ext.send(resp_obj)
except StopIteration:
# Normal exit of generator
continue
except Fault as ex:
response = ex
else:
# Regular functions get post-processing...
try:
with ResourceExceptionHandler():
response = ext(req=request, resp_obj=resp_obj,
**action_args)
except exception.VersionNotFoundForAPIMethod:
# If an attached extension (@wsgi.extends) for the
# method has no version match it is not an error. We
# just don't run the extends code
continue
except Fault as ex:
response = ex
# We had a response...
if response:
return response
return None
def underscore_memoization(func):
"""
Decorator for methods::
class A(object):
def x(self):
if self._x:
self._x = 10
return self._x
Becomes::
class A(object):
@underscore_memoization
def x(self):
return 10
A now has an attribute ``_x`` written by this decorator.
"""
name = '_' + func.__name__
def wrapper(self):
try:
return getattr(self, name)
except AttributeError:
result = func(self)
if inspect.isgenerator(result):
result = list(result)
setattr(self, name, result)
return result
return wrapper
def is_class_instance(obj):
"""Like inspect.* methods."""
return not (inspect.isclass(obj) or inspect.ismodule(obj)
or inspect.isbuiltin(obj) or inspect.ismethod(obj)
or inspect.ismethoddescriptor(obj) or inspect.iscode(obj)
or inspect.isgenerator(obj))
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
""" This is a typical memoization decorator, BUT there is one difference:
To prevent recursion it sets defaults.
Preventing recursion is in this case the much bigger use than speed. I
don't think, that there is a big speed difference, but there are many cases
where recursion could happen (think about a = b; b = a).
"""
def func(function):
def wrapper(obj, *args, **kwargs):
if evaluator_is_first_arg:
cache = obj.memoize_cache
elif second_arg_is_evaluator: # needed for meta classes
cache = args[0].memoize_cache
else:
cache = obj._evaluator.memoize_cache
try:
memo = cache[function]
except KeyError:
memo = {}
cache[function] = memo
key = (obj, args, frozenset(kwargs.items()))
if key in memo:
return memo[key]
else:
if default is not NO_DEFAULT:
memo[key] = default
rv = function(obj, *args, **kwargs)
if inspect.isgenerator(rv):
rv = list(rv)
memo[key] = rv
return rv
return wrapper
return func