def __start(self):
cls = self.__class__
logging.debug("{:s}.start : Starting execution queue thread. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
self.ev_terminating.clear(), self.ev_unpaused.clear()
self.thread.daemon = True
return self.thread.start()
python类Thread()的实例源码
def __stop(self):
cls = self.__class__
logging.debug("{:s}.stop : Terminating execution queue thread. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
if not self.thread.is_alive():
cls = self.__class__
logging.warn("{:s}.stop : Execution queue has already been terminated. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self))
return
self.ev_unpaused.set(), self.ev_terminating.set()
self.queue.acquire()
self.queue.notify_all()
self.queue.release()
return self.thread.join()
def start(self):
'''Start to dispatch callables in the execution queue.'''
cls = self.__class__
if not self.thread.is_alive():
logging.fatal("{:s}.start : Unable to resume an already terminated execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self))
return False
logging.info("{:s}.start : Resuming execution queue. :{!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
res, _ = self.ev_unpaused.is_set(), self.ev_unpaused.set()
self.queue.acquire()
self.queue.notify_all()
self.queue.release()
return not res
def stop(self):
'''Pause the execution queue.'''
cls = self.__class__
if not self.thread.is_alive():
logging.fatal("{:s}.stop : Unable to pause an already terminated execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self))
return False
logging.info("{:s}.stop : Pausing execution queue. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
res, _ = self.ev_unpaused.is_set(), self.ev_unpaused.clear()
self.queue.acquire()
self.queue.notify_all()
self.queue.release()
return res
def __run__(self):
cls = self.__class__
consumer = self.__consume(self.ev_terminating, self.queue, self.state)
executor = self.__dispatch(self.lock); next(executor)
logging.debug("{:s}.running : Execution queue is now running. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
while not self.ev_terminating.is_set():
# check if we're allowed to execute
if not self.ev_unpaused.is_set():
self.ev_unpaused.wait()
# pull a callable out of the queue
logging.debug("{:s}.running : Waiting for an item.. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), self.thread))
self.queue.acquire()
item = next(consumer)
self.queue.release()
if not self.ev_unpaused.is_set():
self.ev_unpaused.wait()
# check if we're terminating
if self.ev_terminating.is_set(): break
# now we can execute it
logging.debug("{:s}.running : Executing {!r} asynchronously. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), item, self.thread))
res, err = executor.send(item)
# and stash our result
logging.debug("{:s}.running : Received result {!r} from {!r}. : {!r}".format('.'.join(('internal',__name__,cls.__name__)), (res,err), item, self.thread))
self.result.put((item,res,err))
return
# FIXME: figure out how to match against a bounds
def stop(self):
"""
Stop a looping thread
"""
self._stop.set()
def kill(self):
"""Kill the main association thread loop."""
self._kill = True
self.is_established = False
while not self.dul.stop_dul():
time.sleep(0.001)
self.ae.cleanup_associations()
def __start_updater(self, daemon=True, timeout=0):
"""Start the updater thread. **used internally**"""
import Queue
def task_exec(emit, data):
if hasattr(emit,'send'):
res = emit.send(data)
res and P.write(res)
else: emit(data)
def task_get_timeout(P, timeout):
try:
emit,data = P.taskQueue.get(block=True, timeout=timeout)
except Queue.Empty:
_,_,tb = sys.exc_info()
P.exceptionQueue.put(StopIteration,StopIteration(),tb)
return ()
return emit,data
def task_get_notimeout(P, timeout):
return P.taskQueue.get(block=True)
task_get = task_get_timeout if timeout > 0 else task_get_notimeout
def update(P, timeout):
P.eventWorking.wait()
while P.eventWorking.is_set():
res = task_get(P, timeout)
if not res: continue
emit,data = res
try:
task_exec(emit,data)
except StopIteration:
P.eventWorking.clear()
except:
P.exceptionQueue.put(sys.exc_info())
finally:
P.taskQueue.task_done()
continue
return
self.__updater = updater = threading.Thread(target=update, name="thread-%x.update"% self.id, args=(self,timeout))
updater.daemon = daemon
updater.start()
return updater