def register(self):
"""RTI must be registered so it can be located.
"""
if self._location:
return -1
if not inspect.isgeneratorfunction(self._method):
return -1
RTI._pycos._lock.acquire()
if RTI._pycos._rtis.get(self._name, None) is None:
RTI._pycos._rtis[self._name] = self
RTI._pycos._lock.release()
return 0
else:
RTI._pycos._lock.release()
return -1
python类isgeneratorfunction()的实例源码
def __get_generator(task, *args, **kwargs):
if args:
target = args[0]
args = args[1:]
else:
target = kwargs.pop('target', None)
args = kwargs.pop('args', ())
kwargs = kwargs.pop('kwargs', kwargs)
if not inspect.isgeneratorfunction(target):
raise Exception('%s is not a generator!' % target.__name__)
if target.func_defaults and \
'task' in target.func_code.co_varnames[:target.func_code.co_argcount][-len(target.func_defaults):]:
kwargs['task'] = task
return target(*args, **kwargs)
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
"""Internal use only.
"""
if isinstance(gen, str):
name = gen
else:
name = gen.__name__
if name in self._xfer_funcs:
code = None
else:
# if not inspect.isgeneratorfunction(gen):
# logger.warning('"%s" is not a valid generator function', name)
# raise StopIteration([])
code = inspect.getsource(gen).lstrip()
def _run_req(task=None):
msg = {'req': 'job', 'auth': self._auth,
'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
reply = yield task.receive()
if isinstance(reply, Task):
if self.status_task:
msg = DispycosTaskInfo(reply, args, kwargs, time.time())
self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
if not request.endswith('async'):
reply = yield task.receive()
else:
reply = None
raise StopIteration(reply)
yield Task(_run_req).finish()
def register(self):
"""RTI must be registered so it can be located.
"""
if self._location:
return -1
if not inspect.isgeneratorfunction(self._method):
return -1
RTI._pycos._lock.acquire()
if RTI._pycos._rtis.get(self._name, None) is None:
RTI._pycos._rtis[self._name] = self
RTI._pycos._lock.release()
return 0
else:
RTI._pycos._lock.release()
return -1
def __get_generator(task, *args, **kwargs):
if args:
target = args[0]
args = args[1:]
else:
target = kwargs.pop('target', None)
args = kwargs.pop('args', ())
kwargs = kwargs.pop('kwargs', kwargs)
if not inspect.isgeneratorfunction(target):
raise Exception('%s is not a generator!' % target.__name__)
if target.__defaults__ and \
'task' in target.__code__.co_varnames[:target.__code__.co_argcount][-len(target.__defaults__):]:
kwargs['task'] = task
return target(*args, **kwargs)
def is_awaitable(obj):
# There is no single method which can answer in any case, should wait or not - so need to create one
# for the suspected cases : func, coro, gen-coro, future,
# class with sync __call__, class with async __call__,
# sync method, async method
if inspect.isawaitable(obj) or inspect.iscoroutinefunction(obj) or inspect.iscoroutine(obj):
return True
elif inspect.isgeneratorfunction(obj):
return True
elif CallChain.is_user_defined_class(obj):
if hasattr(obj, '__call__'):
return CallChain.is_awaitable(obj.__call__)
return False
else:
return False
def pre_process_extensions(self, extensions, request, action_args):
# List of callables for post-processing extensions
post = []
for ext in extensions:
if inspect.isgeneratorfunction(ext):
response = None
# If it's a generator function, the part before the
# yield is the preprocessing stage
try:
with ResourceExceptionHandler():
gen = ext(req=request, **action_args)
response = next(gen)
except Fault as ex:
response = ex
# We had a response...
if response:
return response, []
# No response, queue up generator for post-processing
post.append(gen)
else:
# Regular functions only perform post-processing
post.append(ext)
# Run post-processing in the reverse order
return None, reversed(post)
def isgenerator(o):
if isinstance(o, UnboundMethod):
o = o._func
return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
def _validContext(func):
# Defined inside USBContext so we can access "self.__*".
@contextlib.contextmanager
def refcount(self):
with self.__context_cond:
if not self.__context_p and self.__auto_open:
# BBB
warnings.warn(
'Use "with USBContext() as context:" for safer cleanup'
' on interpreter shutdown. See also USBContext.open().',
DeprecationWarning,
)
self.open()
self.__context_refcount += 1
try:
yield
finally:
with self.__context_cond:
self.__context_refcount -= 1
if not self.__context_refcount:
self.__context_cond.notifyAll()
if inspect.isgeneratorfunction(func):
def wrapper(self, *args, **kw):
with refcount(self):
if self.__context_p:
# pylint: disable=not-callable
for value in func(self, *args, **kw):
# pylint: enable=not-callable
yield value
else:
def wrapper(self, *args, **kw):
with refcount(self):
if self.__context_p:
# pylint: disable=not-callable
return func(self, *args, **kw)
# pylint: enable=not-callable
functools.update_wrapper(wrapper, func)
return wrapper
# pylint: enable=no-self-argument,protected-access
def pre_process_extensions(self, extensions, request, action_args):
# List of callables for post-processing extensions
post = []
for ext in extensions:
if inspect.isgeneratorfunction(ext):
response = None
# If it's a generator function, the part before the
# yield is the preprocessing stage
try:
with ResourceExceptionHandler():
gen = ext(req=request, **action_args)
response = next(gen)
except Fault as ex:
response = ex
# We had a response...
if response:
return response, []
# No response, queue up generator for post-processing
post.append(gen)
else:
# Regular functions only perform post-processing
post.append(ext)
# Run post-processing in the reverse order
return None, reversed(post)
def _process_async_rep_event(self, socket, channel, data):
"""
Process a ASYNC_REP socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data, address = message
client_address = address.twin()
if not self.registered(client_address):
self.connect(address)
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
self.send(client_address, (address_uuid, request_uuid, reply))
if is_generator:
execute_code_after_yield(generator)
def _process_sync_pub_event(self, socket, channel, data):
"""
Process a SYNC_PUB socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data = message
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
message = (address_uuid, request_uuid, reply)
self._send_channel_sync_pub(channel=channel,
message=message,
topic=address_uuid,
general=False)
if is_generator:
execute_code_after_yield(generator)
def add_route(app,fn):
method = getattr(fn,'__method__',None)
path = getattr(fn,'__route__',None)
if path is None or method is None:
return ValueError('@get or @post not defined in %s' % str(fn))
if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
fn = asyncio.coroutine(fn)
logging.info('add route %s %s => %s(%s)' % (method,path,fn.__name__,','.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method,path,RequestHandler(app,fn))
#????URL??
def ticker_wrapper(handler):
async def decorator(data):
currency_pair = data[0]
last = data[1]
lowest_ask = data[2]
highest_bid = data[3]
percent_change = data[4]
base_volume = data[5]
quote_volume = data[6]
is_frozen = data[7]
day_high = data[8]
day_low = data[9]
kwargs = {
"currency_pair": currency_pair,
"last": last,
"lowest_ask": lowest_ask,
"highest_bid": highest_bid,
"percent_change": percent_change,
"base_volume": base_volume,
"quote_volume": quote_volume,
"is_frozen": is_frozen,
"day_high": day_high,
"day_low": day_low
}
if inspect.isgeneratorfunction(handler):
await handler(**kwargs)
else:
handler(**kwargs)
return decorator
def trades_wrapper(topic, handler):
async def decorator(data):
for event in data:
event["currency_pair"] = topic
if inspect.isgeneratorfunction(handler):
await handler(**event)
else:
handler(**event)
return decorator
def add_route(app, fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if path is None or method is None:
raise ValueError('@Get or @Post not defined in {}.'.format(str(fn)))
if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
fn = asyncio.coroutine(fn)
logging.info('add route {} {} => {}({})'.format(method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method, path, RequestHandler(app, fn))
def is_cython_or_generator(fn):
"""Returns whether this function is either a generator function or a Cythonized function."""
if hasattr(fn, '__func__'):
fn = fn.__func__ # Class method, static method
if inspect.isgeneratorfunction(fn):
return True
name = type(fn).__name__
return \
name == 'generator' or \
name == 'method_descriptor' or \
name == 'cython_function_or_method' or \
name == 'builtin_function_or_method'
def pre_process_extensions(self, extensions, request, action_args):
# List of callables for post-processing extensions
post = []
for ext in extensions:
if inspect.isgeneratorfunction(ext):
response = None
# If it's a generator function, the part before the
# yield is the preprocessing stage
try:
with ResourceExceptionHandler():
gen = ext(req=request, **action_args)
response = next(gen)
except Fault as ex:
response = ex
# We had a response...
if response:
return response, []
# No response, queue up generator for post-processing
post.append(gen)
else:
# Regular functions only perform post-processing
post.append(ext)
# Run post-processing in the reverse order
return None, reversed(post)
def return_list(func):
import inspect
from functools import wraps
assert inspect.isgeneratorfunction(func)
@wraps(func)
def wrapped(*args, **kargs):
return list(func(*args, **kargs))
return wrapped
def __enqueue(func, pipe, message):
assert isinstance(pipe, Pipe)
assert isinstance(message, simpype.Message)
message.location = pipe
message.resource = pipe.resource
if inspect.isgeneratorfunction(func):
result = yield pipe.env.process(func(pipe, message))
else:
result = func(pipe, message)
pipe.full()
return result