def _authenticateUsernamePassword(self, dn, password):
"""
Open a secondary connection to the LDAP server and try binding to it
with the given credentials
@returns: True if the password is correct, False otherwise
@rtype: deferred C{bool}
@raises: L{LDAPConnectionError} if unable to connect.
"""
d = deferToThreadPool(
reactor, self.threadpool,
self._authenticateUsernamePassword_inThread, dn, password
)
qsize = self.threadpool._queue.qsize()
if qsize > 0:
self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize)
self.poolStats["connection-thread-blocked"] += 1
return d
python类deferToThreadPool()的实例源码
def _recordsFromQueryString(
self, queryString, recordTypes=None,
limitResults=None, timeoutSeconds=None
):
d = deferToThreadPool(
reactor, self.threadpool,
self._recordsFromQueryString_inThread,
queryString,
recordTypes,
limitResults=limitResults,
timeoutSeconds=timeoutSeconds
)
qsize = self.threadpool._queue.qsize()
if qsize > 0:
self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize)
self.poolStats["connection-thread-blocked"] += 1
return d
def _deferToThreadPool(self, f, *args, **kwargs):
"""Defer execution of ``f(*args, **kwargs)`` to the thread pool.
This returns a deferred which will callback with the result of
that expression, or errback with a failure wrapping the raised
exception.
"""
if self._pool.joined:
return fail(
ReactorNotRunning("This thimble's threadpool already stopped.")
)
if not self._pool.started:
self._pool.start()
self._reactor.addSystemEventTrigger(
'during', 'shutdown', self._pool.stop)
return deferToThreadPool(self._reactor, self._pool, f, *args, **kwargs)
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
"""
See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
Note that the elements of C{timeout} are summed and the result is used
as a timeout for the lookup. Any intermediate timeout or retry logic
is left up to the platform via L{socket.gethostbyname}.
"""
if timeout:
timeoutDelay = sum(timeout)
else:
timeoutDelay = 60
userDeferred = defer.Deferred()
lookupDeferred = threads.deferToThreadPool(
self.reactor, self.reactor.getThreadPool(),
socket.gethostbyname, name)
cancelCall = self.reactor.callLater(
timeoutDelay, self._cleanup, name, lookupDeferred)
self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
return userDeferred
def render_GET(self, request):
# pylint: disable=invalid-name
if len(self.thread_pool.working) > self.max_workers:
return self.error_response(
request, http.SERVICE_UNAVAILABLE,
'Service is unavailable at this time, Please try again later')
else:
d = threads.deferToThreadPool(reactor, self.thread_pool,
self.do_get, request)
d.addCallback(self.final, request)
d.addErrback(self.error_callback, request)
return server.NOT_DONE_YET
def render_POST(self, request):
# pylint: disable=invalid-name
if len(self.thread_pool.working) > self.max_workers:
return self.error_response(
request, http.SERVICE_UNAVAILABLE,
'Service is unavailable at this time, Please try again later')
else:
d = threads.deferToThreadPool(reactor, self.thread_pool,
self.do_post, request)
d.addCallback(self.final, request)
d.addErrback(self.error_callback, request)
return server.NOT_DONE_YET
def testGetQueryParserInThread(self):
"""
L{getQueryParser} is not thread-safe. A L{FeatureError} is raised if
its called outside the main thread.
"""
deferred = deferToThreadPool(reactor, self.threadPool, getQueryParser)
return self.assertFailure(deferred, FeatureError)
def run(self, function, *args, **kwargs):
"""Run C{function} in a thread.
C{function} is run in a thread within a transaction wrapper, which
commits the transaction if C{function} succeeds. If it raises an
exception the transaction is aborted.
@param function: The function to run.
@param args: Positional arguments to pass to C{function}.
@param kwargs: Keyword arguments to pass to C{function}.
@return: A C{Deferred} that will fire after the function has been run.
"""
return deferToThreadPool(reactor, self._threadPool, self._transact,
function, *args, **kwargs)
def _recordWithDN(self, dn):
d = deferToThreadPool(
reactor, self.threadpool,
self._recordWithDN_inThread, dn
)
qsize = self.threadpool._queue.qsize()
if qsize > 0:
self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize)
self.poolStats["connection-thread-blocked"] += 1
return d
def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
addressTypes=None, transportSemantics='TCP'):
"""
See L{IHostnameResolver.resolveHostName}
@param resolutionReceiver: see interface
@param hostName: see interface
@param portNumber: see interface
@param addressTypes: see interface
@param transportSemantics: see interface
@return: see interface
"""
pool = self._getThreadPool()
addressFamily = _typesToAF[_any if addressTypes is None
else frozenset(addressTypes)]
socketType = _transportToSocket[transportSemantics]
def get():
try:
return self._getaddrinfo(hostName, portNumber, addressFamily,
socketType)
except gaierror:
return []
d = deferToThreadPool(self._reactor, pool, get)
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)
@d.addCallback
def deliverResults(result):
for family, socktype, proto, cannoname, sockaddr in result:
addrType = _afToType[family]
resolutionReceiver.addressResolved(
addrType(_socktypeToType.get(socktype, 'TCP'), *sockaddr)
)
resolutionReceiver.resolutionComplete()
return resolution
def test_deferredResult(self):
"""
L{threads.deferToThreadPool} executes the function passed, and
correctly handles the positional and keyword arguments given.
"""
d = threads.deferToThreadPool(reactor, self.tp,
lambda x, y=5: x + y, 3, y=4)
d.addCallback(self.assertEqual, 7)
return d
def test_deferredFailure(self):
"""
Check that L{threads.deferToThreadPool} return a failure object with an
appropriate exception instance when the called function raises an
exception.
"""
class NewError(Exception):
pass
def raiseError():
raise NewError()
d = threads.deferToThreadPool(reactor, self.tp, raiseError)
return self.assertFailure(d, NewError)
def runWithConnection(self, func, *args, **kw):
"""
Execute a function with a database connection and return the result.
@param func: A callable object of one argument which will be executed
in a thread with a connection from the pool. It will be passed as
its first argument a L{Connection} instance (whose interface is
mostly identical to that of a connection object for your DB-API
module of choice), and its results will be returned as a
L{Deferred}. If the method raises an exception the transaction will
be rolled back. Otherwise, the transaction will be committed.
B{Note} that this function is B{not} run in the main thread: it
must be threadsafe.
@param *args: positional arguments to be passed to func
@param **kw: keyword arguments to be passed to func
@return: a L{Deferred} which will fire the return value of
C{func(Transaction(...), *args, **kw)}, or a
L{twisted.python.failure.Failure}.
"""
from twisted.internet import reactor
return threads.deferToThreadPool(reactor, self.threadpool,
self._runWithConnection,
func, *args, **kw)
def augment_twisted_deferToThreadPool():
"""Wrap every function deferred to a thread in `synchronous`."""
from twisted.internet import threads
from twisted.internet.threads import deferToThreadPool
from provisioningserver.utils.twisted import ISynchronous, synchronous
def new_deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
"""Variant of Twisted's that wraps all functions in `synchronous`."""
func = f if ISynchronous.providedBy(f) else synchronous(f)
return deferToThreadPool(reactor, threadpool, func, *args, **kwargs)
if threads.deferToThreadPool.__module__ != __name__:
threads.deferToThreadPool = new_deferToThreadPool