def getargspec(func):
"""Variation of inspect.getargspec that works for more functions.
This function works for Cythonized, non-cpdef functions, which expose argspec information but
are not accepted by getargspec. It also works for Python 3 functions that use annotations, which
are simply ignored. However, keyword-only arguments are not supported.
"""
if inspect.ismethod(func):
func = func.__func__
# Cythonized functions have a .__code__, but don't pass inspect.isfunction()
try:
code = func.__code__
except AttributeError:
raise TypeError('{!r} is not a Python function'.format(func))
if hasattr(code, 'co_kwonlyargcount') and code.co_kwonlyargcount > 0:
raise ValueError('keyword-only arguments are not supported by getargspec()')
args, varargs, varkw = inspect.getargs(code)
return inspect.ArgSpec(args, varargs, varkw, func.__defaults__)
python类isfunction()的实例源码
def _from_module(self, module, object):
"""
Return true if the given object is defined in the given
module.
"""
if module is None:
return True
elif inspect.getmodule(object) is not None:
return module is inspect.getmodule(object)
elif inspect.isfunction(object):
return module.__dict__ is object.func_globals
elif inspect.isclass(object):
return module.__name__ == object.__module__
elif hasattr(object, '__module__'):
return module.__name__ == object.__module__
elif isinstance(object, property):
return True # [XX] no way not be sure.
else:
raise ValueError("object must be a class or function")
def decorator(target):
"""A signature-matching decorator factory."""
def decorate(fn):
if not inspect.isfunction(fn):
raise Exception("not a decoratable function")
spec = compat.inspect_getfullargspec(fn)
names = tuple(spec[0]) + spec[1:3] + (fn.__name__,)
targ_name, fn_name = _unique_symbols(names, 'target', 'fn')
metadata = dict(target=targ_name, fn=fn_name)
metadata.update(format_argspec_plus(spec, grouped=False))
metadata['name'] = fn.__name__
code = """\
def %(name)s(%(args)s):
return %(target)s(%(fn)s, %(apply_kw)s)
""" % metadata
decorated = _exec_code_in_env(code,
{targ_name: target, fn_name: fn},
fn.__name__)
decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
decorated.__wrapped__ = fn
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
def _from_module(self, module, object):
"""
Return true if the given object is defined in the given
module.
"""
if module is None:
return True
elif inspect.isfunction(object):
return module.__dict__ is func_globals(object)
elif inspect.isclass(object):
return module.__name__ == object.__module__
elif inspect.getmodule(object) is not None:
return module is inspect.getmodule(object)
elif hasattr(object, '__module__'):
return module.__name__ == object.__module__
elif isinstance(object, property):
return True # [XX] no way not be sure.
else:
raise ValueError("object must be a class or function")
def __call__(self, func_or_cls):
condition = self.condition
reason = self.reason
if inspect.isfunction(func_or_cls):
@six.wraps(func_or_cls)
def wrapped(*args, **kwargs):
if condition:
raise testtools.TestCase.skipException(reason)
return func_or_cls(*args, **kwargs)
return wrapped
elif inspect.isclass(func_or_cls):
orig_func = getattr(func_or_cls, 'setUp')
@six.wraps(orig_func)
def new_func(self, *args, **kwargs):
if condition:
raise testtools.TestCase.skipException(reason)
orig_func(self, *args, **kwargs)
func_or_cls.setUp = new_func
return func_or_cls
else:
raise TypeError('skipUnless can be used only with functions or '
'classes')
def _get_provider_submodule_method(module_name: str, submodule_name: str, method_name: str) -> Optional[Callable]:
sub_module = "{}.{}".format(module_name, submodule_name)
try:
importlib.import_module(module_name, package='__path__')
except ImportError:
return None
if importlib.util.find_spec(sub_module):
site = importlib.import_module(sub_module, package=module_name)
if hasattr(site, method_name):
obj = getattr(site, method_name)
if inspect.isfunction(obj):
return obj
return None
# We should only create one ProviderContext over the program lifetime,
# to avoid having to search the file system every time it's created.
# This is why this should be outside Settings
def update_inputs(av_states, socket_send, *input_variable_names):
# From each input variable name, extract the function name, ids of AvStates to be passed as arguments,
# and the corresponding function from psaltlib.Inputs. Call the function with the id-key'd AvStates as
# arguments. Aggregate results in order in a list.
input_state = []
for term in input_variable_names:
term_elements = re.split('_', term)
function_name = term_elements[0]
av_id_args = term_elements[1:]
args = []
for av_id in av_id_args:
args.append(av_states[int(av_id)])
args.append(socket_send)
func = [o for o in getmembers(psaltlib.Inputs) if isfunction(o[1]) and
o[0] == function_name]
input_state.append(func[0][1](*args))
return input_state
def _from_module(self, module, object):
"""
Return true if the given object is defined in the given
module.
"""
if module is None:
return True
elif inspect.getmodule(object) is not None:
return module is inspect.getmodule(object)
elif inspect.isfunction(object):
return module.__dict__ is object.func_globals
elif inspect.isclass(object):
return module.__name__ == object.__module__
elif hasattr(object, '__module__'):
return module.__name__ == object.__module__
elif isinstance(object, property):
return True # [XX] no way not be sure.
else:
raise ValueError("object must be a class or function")
def get_category(attr):
if inspect.isclass(attr):
return EXCEPTION if issubclass(attr, Exception) else CLASS
elif inspect.isfunction(attr):
return FUNCTION
elif inspect.ismethod(attr):
return FUNCTION
elif inspect.isbuiltin(attr):
return FUNCTION
elif isinstance(attr, method_descriptor):
# Technically, method_descriptor is descriptor, but since they
# act as functions, let's treat them as functions.
return FUNCTION
elif is_descriptor(attr):
# Maybe add getsetdescriptor memberdescriptor in the future.
return DESCRIPTOR
else:
return DEFAULT_CATEGORY
def loadMacros():
path = os.path.abspath(os.path.dirname(__file__))
p = lambda x: os.path.splitext(x)[1] == ".py"
modules = [x for x in os.listdir(path) if p(x) and not x == "__init__.py"]
macros = {}
for module in modules:
name, _ = os.path.splitext(module)
moduleName = "%s.%s" % (__package__, name)
m = __import__(moduleName, globals(), locals(), __package__)
p = lambda x: isfunction(x) and getmodule(x) is m
for name, function in getmembers(m, p):
name = name.replace("_", "-")
try:
macros[name] = function
except Exception, e:
continue
return macros
def default(self, obj):
if hasattr(obj, "to_json"):
return self.default(obj.to_json())
elif hasattr(obj, "__dict__"):
d = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(d)
return obj
def command(*args, **kwargs):
def set_command(func):
if inspect.isfunction(func):
if not hasattr(func, '_command'):
func._command = []
for kw in kwargs:
setattr(func, '_' + kw, kwargs.get(kw, False))
for arg in args:
if arg not in func._command:
func._command.append(arg)
return func
return set_command
def event(*args, **kwargs):
def set_event(func):
if inspect.isfunction(func):
if not hasattr(func, '_event'):
func._event = []
for kw in kwargs:
setattr(func, '_' + kw, kwargs.get(kw, False))
for arg in args:
if arg not in func._event:
func._event.append(arg)
return func
return set_event
def __new__(cls, name, bases, attrs):
# A list of all functions which is marked as 'is_cronjob=True'
cron_jobs = []
# The min_tick is the greatest common divisor(GCD) of the interval of cronjobs
# this value would be queried by scheduler when the project initial loaded.
# Scheudler may only send _on_cronjob task every min_tick seconds. It can reduce
# the number of tasks sent from scheduler.
min_tick = 0
for each in attrs.values():
if inspect.isfunction(each) and getattr(each, 'is_cronjob', False):
cron_jobs.append(each)
min_tick = fractions.gcd(min_tick, each.tick)
newcls = type.__new__(cls, name, bases, attrs)
newcls._cron_jobs = cron_jobs
newcls._min_tick = min_tick
return newcls
def decorator(target):
"""A signature-matching decorator factory."""
def decorate(fn):
if not inspect.isfunction(fn):
raise Exception("not a decoratable function")
spec = compat.inspect_getfullargspec(fn)
names = tuple(spec[0]) + spec[1:3] + (fn.__name__,)
targ_name, fn_name = _unique_symbols(names, 'target', 'fn')
metadata = dict(target=targ_name, fn=fn_name)
metadata.update(format_argspec_plus(spec, grouped=False))
metadata['name'] = fn.__name__
code = """\
def %(name)s(%(args)s):
return %(target)s(%(fn)s, %(apply_kw)s)
""" % metadata
decorated = _exec_code_in_env(code,
{targ_name: target, fn_name: fn},
fn.__name__)
decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
decorated.__wrapped__ = fn
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
def test_stacking_instead(decor):
"""Test stacking when instead is specified last (which is WRONG)
Note that putting instead late in the stack WILL override
any previous decorators!
"""
tracker = Mock(name='tracker', return_value=None)
decorated = Mock(name='decorated', return_value='decorated')
tracker.__name__ = str('tracker')
decorated.__name__ = str('decorated')
fn = decor(tracker)(decorated)
fn = instead(tracker)(fn)
fn()
decorated.assert_not_called()
assert tracker.call_count == 1
assert tracker.call_args[0][:2] == ((), {})
assert isfunction(tracker.call_args[0][2])
def test_parse_identifiers():
"""pyessv-tests: parsing: identifiers
"""
def positive_test(parser, project, identifier):
parser(project, identifier)
@nose.tools.raises(LIB.TemplateParsingError)
def negative_test(parser, project, identifier):
parser(project, identifier)
# Iterate identifiers & perform +ve / -ve tests:
for project, parser, seperator, identifiers in _CONFIG:
assert inspect.isfunction(parser)
for identifier in identifiers:
# ... +ve test:
desc = 'identifier parsing test (+ve) --> {} :: {}'.format(project, identifier)
tu.init(positive_test, desc)
yield positive_test, parser, project, identifier
# ... -ve tests:
for invalid_identifier in _get_invalid_identifiers(identifier, seperator):
desc = 'identifier parsing test (-ve) --> {} :: {}'.format(project, invalid_identifier)
tu.init(negative_test, desc)
yield negative_test, parser, project, invalid_identifier
def __new__(cls, name, bases, attrs):
super_new = super(AccountActionMetaclass, cls).__new__
parents = [base for base in bases if isinstance(base, AccountActionMetaclass)]
if not parents:
# We stop here if we are considering AccountActionBase and not
# one of its subclasses
return super_new(cls, name, bases, attrs)
new_action = super_new(cls, name, bases, attrs)
# Performs some checks
action_name = getattr(new_action, 'name', None)
if action_name is None or not isinstance(action_name, six.string_types):
raise ImproperlyConfigured('The "name" attribute must be a string')
execute_method = getattr(new_action, 'execute', None)
if execute_method is None or \
not (inspect.ismethod(execute_method) or inspect.isfunction(execute_method)):
raise ImproperlyConfigured('The "execute" method must be configured')
return new_action
def extract_data_from_funcs(node_funcs):
print("node_funcs is:", node_funcs)
if isinstance(node_funcs, list):
node_data = []
for func in node_funcs:
if inspect.isfunction(func):
node_data.append(func())
else:
node_data.append(func)
return node_data
elif inspect.isfunction(node_funcs):
return node_funcs()
else:
return node_funcs
# Tells the dimensionality of the data (independent of number of samples or number of data arrays)
def append_dataset_arrays_or_functions(dataset1, dataset2, verbose=True):
"""Concatenates the features of two ndarrays (or functions returning ndarrays)."""
if inspect.isfunction(dataset1):
if verbose:
print("Executing dataset1")
d1 = dataset1()
else:
if verbose:
print("Dataset1 is an array")
d1 = dataset1
if inspect.isfunction(dataset2):
if verbose:
print("Executing dataset2")
d2 = dataset2()
else:
if verbose:
print("Dataset2 is an array")
d2 = dataset2
n1 = d1.shape[0]
n2 = d2.shape[0]
if n1 != n2:
er = "incompatible number of samples: ", n1, " and ", n2
raise Exception(er)
return numpy.concatenate((d1, d2), axis=1)