def __init__(
self,
REMOTE_IP,
REMOTE_PORT,
loop: asyncio.AbstractEventLoop() = None,
executor: futures.Executor() = None
):
self._input_list = []
self._input_queue = asyncio.Queue()
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
self.executor = executor
self.REMOTE_IP = REMOTE_IP
self.REMOTE_PORT = REMOTE_PORT
python类Executor()的实例源码
def __init__(
self,
device_list=[],
connection_list=[],
loop: asyncio.AbstractEventLoop() = None,
executor: futures.Executor() = None
):
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
if executor:
self.executor = executor
else:
self.executor = futures.ProcessPoolExecutor()
self.loop.set_default_executor(self.executor)
self.device_list = device_list
self.connection_list = connection_list
def open_async(file: Union[str, Path], *args, executor: Executor = None,
**kwargs) -> AsyncFileWrapper:
"""
Open a file and wrap it in an :class:`~AsyncFileWrapper`.
Example::
async def read_file_contents(path: str) -> bytes:
async with open_async(path, 'rb') as f:
return await f.read()
The file wrapper can also be asynchronously iterated line by line::
async def read_file_lines(path: str):
async for line in open_async(path):
print(line)
:param file: the file path to open
:param args: positional arguments to :func:`open`
:param executor: the ``executor`` argument to :class:`~AsyncFileWrapper`
:param kwargs: keyword arguments to :func:`open`
:return: the wrapped file object
"""
return AsyncFileWrapper(str(file), args, kwargs, executor)
def call_in_executor(func: Callable, *args, executor: Executor = None, **kwargs) -> Future:
"""
Call the given callable in an executor.
This is a nicer version of the following::
get_event_loop().run_in_executor(executor, func, *args)
If you need to pass keyword arguments named ``func`` or ``executor`` to the callable, use
:func:`functools.partial` for that.
:param func: a function
:param args: positional arguments to call with
:param executor: the executor to call the function in
:param kwargs: keyword arguments to call with
:return: a future that will resolve to the function call's return value
"""
callback = partial(func, *args, **kwargs)
return get_event_loop().run_in_executor(executor, callback)
def __init__(self, on_exit: ExitOption=ExitOption.ABANDON) -> None:
"""
Init the state.
Args:
on_exit: determines the behavior on exit when the executor is used as a context manager
ExitOption.ABANDON: this is the default. All remaining threads are abandoned.
ExitOption.NO_WAIT: shuts down without waiting for remaining threads to complete.
ExitOption.WAIT: shuts down and waits for remaining threads to complete. This is the
default behavior for a vanilla `futures.Executor`, but not for `ThreadExecutor`.
Returns: None
Raises: None
Required Tests: None
"""
# shared state
self._monitored_threads = {} # type: dict
self._monitor_thread = None # type: Optional[threading.Thread]
# lock for ^
self._monitored_thread_lock = threading.RLock()
# main thread state
self._is_shutdown = False
self._abandon = False
self._on_exit = on_exit
def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Any) -> bool:
"""
Exit the context manager.
The default behavior of a `futures.Executor` is to shutdown and wait. `ThreadExecutor`
does whatever is specified in the `on_exit` parameter to `__init__`
Args:
exc_type, exc_val, exc_tb: the exception data for any exception raised in the managed context, or None,
if no exception was raised.
Returns:
a bool indicating whether or not the function handled the exception, if one was raised in the managed
context. If False, and there was an exception, the python runtime will raise the exception.
Raises: None
Required Tests: None
"""
assert exc_type or exc_val or exc_tb or True # Vulture
self._exit_function(self._on_exit)()
return False
def __init__(
self,
CALLBACK=None,
BUF_SIZE=1,
FILTER={'data'},
WAIT_TIME=0.1,
loop: asyncio.AbstractEventLoop()=None,
executor: futures.Executor()=None,
step_time=0.01,
returnNone=False,
count=0
# Partially Timeout time stamps will be discarded on False
# None will be returned for timeout data on True.
):
self.WAIT_TIME = WAIT_TIME
self.FILTER = FILTER
self.BUF_SIZE = BUF_SIZE
self.step_time = step_time
self.buf_time_out = self.BUF_SIZE * self.WAIT_TIME * 2
self._input_list = []
self._output_list = []
self._input_queue = asyncio.Queue()
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
self.executor = executor
if CALLBACK:
self.CALLBACK = CALLBACK
self.returnNone = returnNone
self.count = count
def setNode(self, node: Node, items: List[Node]):
self.node = node
self.clear()
self.nodes_to_items.clear()
for ii, item in enumerate(filter(tz.identity, items)): # filter null items -- TODO: find better place for filter
self.insertCallListItem(ii, item)
# NOTE: previously `setNode` involved submitting a job to an Executor
# and appending the future to self.populate_futures. Currently
# self.populate_futures is always empty.
def __init__(self,
task_list: List[Task]=None,
config: Config=Config,
stats: Stats=Stats,
executor: Executor=None,
debug: bool=False) -> None:
'''Initialize Tasky and automatically start a list of tasks.
One of the following methods must be called on the resulting objects
to start the event loop: `run_forever()`, `run_until_complete()`, or
`run_for_time()`.'''
if uvloop:
Log.debug('using uvloop event loop')
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
self.loop = asyncio.new_event_loop()
self.loop.add_signal_handler(signal.SIGINT, self.sigint)
self.loop.add_signal_handler(signal.SIGTERM, self.sigterm)
self.loop.set_exception_handler(self.exception)
asyncio.set_event_loop(self.loop)
if debug:
Log.debug('enabling asyncio debug mode')
self.loop.set_debug(True)
self.all_tasks = {}
self.running_tasks = set()
self.initial_tasks = list(task_list)
self.configuration = config
self.stats = stats
self.executor = executor
self.monitor = False
self.terminate_on_finish = False
self.stop_attempts = 0
def execute(self, fn, *args, **kwargs) -> None:
'''Execute an arbitrary function outside the event loop using
a shared Executor.'''
fn = functools.partial(fn, *args, **kwargs)
return await self.loop.run_in_executor(self.executor, fn)
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = futures.Future()
t = threading.Thread(
target=_worker, args=(f, fn, args, kwargs),
name='Executor for %s args=%s kwargs=%s' % (fn, args, kwargs))
t.start()
return f
def __init__(self, path: str, args: tuple, kwargs: dict, executor: Optional[Executor]) -> None:
self._open_args = (path,) + args
self._open_kwargs = kwargs
self._executor = executor
self._raw_file = None # type: IOBase
def __init__(self, executor: Optional[Executor]) -> None:
self.executor = executor
self.exited = False
def threadpool(arg: Union[Executor, Callable] = None):
"""
Return a decorator/asynchronous context manager that guarantees that the wrapped function or
``with`` block is run in the given executor.
If no executor is given, the current event loop's default executor is used.
Otherwise, the executor must be a PEP 3148 compliant thread pool executor.
Callables wrapped with this must be used with ``await`` when called in the event loop thread.
They can also be called in worker threads, just by omitting the ``await``.
Example use as a decorator::
@threadpool
def this_runs_in_threadpool():
return do_something_cpu_intensive()
async def request_handler():
result = await this_runs_in_threadpool()
Example use as an asynchronous context manager::
async def request_handler(in_url, out_url):
page = await http_fetch(in_url)
async with threadpool():
data = transform_page(page)
await http_post(out_url, page)
:param arg: either a callable (when used as a decorator) or an executor in which to run the
wrapped callable or the ``with`` block (when used as a context manager)
"""
if callable(arg):
# When used like @threadpool
return _ThreadSwitcher(None)(arg)
else:
# When used like @threadpool(...) or async with threadpool(...)
return _ThreadSwitcher(arg)
def test_default_exit() -> None:
"""Test Thread Executor default exit."""
# When
executor = ThreadExecutor()
# Then
utaw.assertEqual(executor._on_exit, ExitOption.ABANDON) # pylint: disable=protected-access,no-member
def __init__(
self,
SERVER_IP='10.0.0.1',
SERVER_TCP_PORT=4712,
CLIENT_TCP_PORT='AUTO',
SERVER_UDP_PORT=4713,
CLIENT_UDP_PORT='AUTO',
MODE='TCP',
IDCODE=1,
loop: asyncio.AbstractEventLoop() = None,
executor: futures.Executor() = None,
parser: Parser() = None,
count=0
):
self.IDCODE = IDCODE
self.SERVER_IP = SERVER_IP
self.SERVER_TCP_PORT = SERVER_TCP_PORT
self.CLIENT_TCP_PORT = CLIENT_TCP_PORT
self.SERVER_UDP_PORT = SERVER_UDP_PORT
self.CLIENT_UDP_PORT = CLIENT_UDP_PORT
self.MODE = MODE
self.executor = executor
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
if parser:
self.parser = parser
else:
self.parser = Parser()
self._output_list = []
self.count = count
if self.MODE == 'TCP':
async def connect():
self._count = count
print('Connecting to:', self.SERVER_IP, '...')
await self.loop.create_connection(
lambda: self._TCP(self),
host=self.SERVER_IP, port=self.SERVER_TCP_PORT)
async def close():
if self.cmd_transport:
self.cmd_transport.close()
self.connect = connect
self.close = close