def underscore_memoization(func):
"""
Decorator for methods::
class A(object):
def x(self):
if self._x:
self._x = 10
return self._x
Becomes::
class A(object):
@underscore_memoization
def x(self):
return 10
A now has an attribute ``_x`` written by this decorator.
"""
name = '_' + func.__name__
def wrapper(self):
try:
return getattr(self, name)
except AttributeError:
result = func(self)
if inspect.isgenerator(result):
result = list(result)
setattr(self, name, result)
return result
return wrapper
# for fast_parser, should not be deleted
python类isgenerator()的实例源码
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
""" This is a typical memoization decorator, BUT there is one difference:
To prevent recursion it sets defaults.
Preventing recursion is in this case the much bigger use than speed. I
don't think, that there is a big speed difference, but there are many cases
where recursion could happen (think about a = b; b = a).
"""
def func(function):
def wrapper(obj, *args, **kwargs):
if evaluator_is_first_arg:
cache = obj.memoize_cache
elif second_arg_is_evaluator: # needed for meta classes
cache = args[0].memoize_cache
else:
cache = obj._evaluator.memoize_cache
try:
memo = cache[function]
except KeyError:
memo = {}
cache[function] = memo
key = (obj, args, frozenset(kwargs.items()))
if key in memo:
return memo[key]
else:
if default is not NO_DEFAULT:
memo[key] = default
rv = function(obj, *args, **kwargs)
if inspect.isgenerator(rv):
rv = list(rv)
memo[key] = rv
return rv
return wrapper
return func
def _is_iterable(data):
return isinstance(data, list) or isinstance(data, tuple) or isinstance(data, set) or inspect.isgenerator(data)
def call(self, context, method, *args, **kwargs):
"""Call a glance client method."""
if self.client is None:
self.client = self._glance_client(context)
retry_excs = (glanceclient.exc.ServiceUnavailable,
glanceclient.exc.InvalidEndpoint,
glanceclient.exc.CommunicationError)
retries = CONF.glance_num_retries
if retries < 0:
LOG.warning("Treating negative retries as 0")
retries = 0
num_attempts = retries + 1
for attempt in range(1, num_attempts + 1):
client = self._glance_client(context)
try:
controller = getattr(client,
kwargs.pop('controller', 'images'))
result = getattr(controller, method)(*args, **kwargs)
if inspect.isgenerator(result):
# Convert generator results to a list, so that we can
# catch any potential exceptions now and retry the call.
return list(result)
return result
except retry_excs as e:
if attempt < num_attempts:
extra = "retrying"
else:
extra = "done trying"
LOG.exception("Error contacting glance server "
"'%(server)s' for '%(method)s', "
"%(extra)s.",
{'server': self.api_server,
'method': method, 'extra': extra})
if attempt == num_attempts:
raise exception.GlanceConnectionFailed(
server=str(self.api_server), reason=six.text_type(e))
time.sleep(1)
def fix_http_content_length():
""" Reverse operate done by cherrypy `_be_ie_unfriendly`. """
response = cherrypy.serving.response
if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
response.body = response.collapse_body().strip()
response.headers['Content-Length'] = str(len(response.collapse_body()))
def unescape_response():
""" Unescape the html body which escaped by `_cpcompat.escape_html()`. """
response = cherrypy.serving.response
if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
response.body = six.binary_type(unescape_html(response.collapse_body()))
def _json_stream_output(next_handler, *args, **kwargs):
""" Output JSON in stream mode. """
cherrypy.response.headers['Content-Type'] = "application/json"
_outputs = next_handler(*args, **kwargs)
if inspect.isgenerator(_outputs):
def _stream_outputs():
for _content in _outputs:
yield json.dumps(_content)
return _stream_outputs()
else:
return json.dumps(_outputs)
def test_filter_all(self, mock_filter_one):
mock_filter_one.side_effect = [True, False, True]
filter_obj_list = ['obj1', 'obj2', 'obj3']
container = {}
base_filter = base_filters.BaseFilter()
extra_spec = {}
result = base_filter.filter_all(filter_obj_list, container, extra_spec)
self.assertTrue(inspect.isgenerator(result))
self.assertEqual(['obj1', 'obj3'], list(result))
def post_process_extensions(self, extensions, resp_obj, request,
action_args):
for ext in extensions:
response = None
if inspect.isgenerator(ext):
# If it's a generator, run the second half of
# processing
try:
with ResourceExceptionHandler():
response = ext.send(resp_obj)
except StopIteration:
# Normal exit of generator
continue
except Fault as ex:
response = ex
else:
# Regular functions get post-processing...
try:
with ResourceExceptionHandler():
response = ext(req=request, resp_obj=resp_obj,
**action_args)
except Fault as ex:
response = ex
# We had a response...
if response:
return response
return None
def do(f):
@wraps(f)
def wrapper(*args, **kwargs):
gen = f(*args, **kwargs)
if not inspect.isgenerator(gen):
res = gen
def generator_no_yield():
return res
yield
gen = generator_no_yield()
return Effect(ChainedIntent(gen))
return wrapper
def run_task(self, module, task, response):
"""
Processing the task, catching exceptions and logs, return a `ProcessorResult` object
"""
self.logger = logger = module.logger
result = None
exception = None
stdout = sys.stdout
self.task = task
if isinstance(response, dict):
response = rebuild_response(response)
self.response = response
self.save = (task.get('track') or {}).get('save', {})
try:
if self.__env__.get('enable_stdout_capture', True):
sys.stdout = ListO(module.log_buffer)
self._reset()
result = self._run_task(task, response)
if inspect.isgenerator(result):
for r in result:
self._run_func(self.on_result, r, response, task)
else:
self._run_func(self.on_result, result, response, task)
except Exception as e:
logger.exception(e)
exception = e
finally:
follows = self._follows
messages = self._messages
logs = list(module.log_buffer)
extinfo = self._extinfo
save = self.save
sys.stdout = stdout
self.task = None
self.response = None
self.save = None
module.log_buffer[:] = []
return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
def underscore_memoization(func):
"""
Decorator for methods::
class A(object):
def x(self):
if self._x:
self._x = 10
return self._x
Becomes::
class A(object):
@underscore_memoization
def x(self):
return 10
A now has an attribute ``_x`` written by this decorator.
"""
name = '_' + func.__name__
def wrapper(self):
try:
return getattr(self, name)
except AttributeError:
result = func(self)
if inspect.isgenerator(result):
result = list(result)
setattr(self, name, result)
return result
return wrapper
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
""" This is a typical memoization decorator, BUT there is one difference:
To prevent recursion it sets defaults.
Preventing recursion is in this case the much bigger use than speed. I
don't think, that there is a big speed difference, but there are many cases
where recursion could happen (think about a = b; b = a).
"""
def func(function):
def wrapper(obj, *args, **kwargs):
if evaluator_is_first_arg:
cache = obj.memoize_cache
elif second_arg_is_evaluator: # needed for meta classes
cache = args[0].memoize_cache
else:
cache = obj.evaluator.memoize_cache
try:
memo = cache[function]
except KeyError:
memo = {}
cache[function] = memo
key = (obj, args, frozenset(kwargs.items()))
if key in memo:
return memo[key]
else:
if default is not NO_DEFAULT:
memo[key] = default
rv = function(obj, *args, **kwargs)
if inspect.isgenerator(rv):
rv = list(rv)
memo[key] = rv
return rv
return wrapper
return func
def is_generator(obj):
import inspect
return obj is not None and (inspect.isgeneratorfunction(obj) or inspect.isgenerator(obj) or hasattr(obj, 'next') or hasattr(obj, '__next__'))
def isgenerator(o):
if isinstance(o, UnboundMethod):
o = o._func
return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
def isgenerator(func):
try:
return func.func_code.co_flags & CO_GENERATOR != 0
except AttributeError:
return False
# Make a function to help check if an exception is derived from BaseException.
# In Python 2.4, we just use Exception instead.
def test_excluding_predicates(self):
global tb
self.istest(inspect.isbuiltin, 'sys.exit')
self.istest(inspect.isbuiltin, '[].append')
self.istest(inspect.iscode, 'mod.spam.__code__')
try:
1/0
except:
tb = sys.exc_info()[2]
self.istest(inspect.isframe, 'tb.tb_frame')
self.istest(inspect.istraceback, 'tb')
if hasattr(types, 'GetSetDescriptorType'):
self.istest(inspect.isgetsetdescriptor,
'type(tb.tb_frame).f_locals')
else:
self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
finally:
# Clear traceback and all the frames and local variables hanging to it.
tb = None
self.istest(inspect.isfunction, 'mod.spam')
self.istest(inspect.isfunction, 'mod.StupidGit.abuse')
self.istest(inspect.ismethod, 'git.argue')
self.istest(inspect.ismodule, 'mod')
self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory')
self.istest(inspect.isgenerator, '(x for x in range(2))')
self.istest(inspect.isgeneratorfunction, 'generator_function_example')
if hasattr(types, 'MemberDescriptorType'):
self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
else:
self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def delete(self, paths):
"""Delete L{Tag}s matching C{paths}.
L{TagValue}s and permissions associated with the deleted L{Tag}s are
removed by cascading deletes in the database schema.
@param paths: A sequence of L{Tag.path}s.
@return: A C{list} of C{(objectID, Tag.path)} 2-tuples representing the
L{Tag}s that were removed.
"""
if isgenerator(paths):
paths = list(paths)
result = getTags(paths=paths)
deletedTagPaths = list(result.values(Tag.objectID, Tag.path))
# Delete the fluiddb/tags/description tag values stored for removed
# tags. Associated TagValue's are removed by an ON DELETE CASCADE
# trigger.
self._factory.tagValues(self._user).delete(
[(objectID, path) for objectID, _ in deletedTagPaths
for path in [u'fluiddb/tags/description', u'fluiddb/tags/path']])
# Touch all the objects for the given tag paths.
objectIDs = list(getObjectIDs(paths))
touchObjects(objectIDs)
result.remove()
return deletedTagPaths
def delete(self, usernames):
"""Delete L{User}s matching C{username}s.
@param usernames: A sequence of L{User.username}s.
@raise FeatureError: Raised if no L{User.username}s are provided.
@raise UnknownUserError: Raised if one or more usernames don't match
existing L{User}s.
@return: A C{list} of C{(objectID, User.username)} 2-tuples
representing the L{User}s that that were removed.
"""
if isgenerator(usernames):
usernames = list(usernames)
if not usernames:
raise FeatureError('At least one username must be provided.')
usernames = set(usernames)
result = getUsers(usernames=usernames)
existingUsernames = set(result.values(User.username))
unknownUsernames = usernames - existingUsernames
if unknownUsernames:
raise UnknownUserError(list(unknownUsernames))
admin = getUser(u'fluiddb')
deletedUsers = list(result.values(User.objectID, User.username))
# FIXME: Deleting a user will leave the permission exception lists
# containing the user in a corrupt state.
result.remove()
self._factory.tagValues(admin).delete(
[(objectID, systemTag) for objectID, _ in deletedUsers
for systemTag in [u'fluiddb/users/username',
u'fluiddb/users/name',
u'fluiddb/users/email',
u'fluiddb/users/role']])
return deletedUsers
def delete(self, paths):
"""See L{NamespaceAPI.delete}.
@raise PermissionDeniedError: Raised if the user is not authorized to
delete a given L{Namespace}.
"""
if isgenerator(paths):
paths = list(paths)
pathsAndOperations = [(path, Operation.DELETE_NAMESPACE)
for path in paths]
deniedOperations = checkPermissions(self._user, pathsAndOperations)
if deniedOperations:
raise PermissionDeniedError(self._user.username, deniedOperations)
return self._api.delete(paths)