def fetch_all(feeds):
BATCH_SIZE=5
batches = []
for feeds_batch in batch_gen(feeds, BATCH_SIZE):
sem = DeferredSemaphore(len(feeds_batch))
batch = []
for feed_ in feeds_batch:
batch.append(sem.run(fetch_single, feed_meta=feed_))
batchDef = gatherResults(batch, consumeErrors=False)
batchDef.addCallback(store_fetched_data)
batches.append(batchDef)
# rendez-vous for all feeds that were fetched
batchesDef = gatherResults(batches, consumeErrors=False)
batchesDef.addCallbacks(
clean_up_and_exit,
errback=lambda x: None,
)
return batchesDef
python类DeferredSemaphore()的实例源码
def __init__(self, settings):
self.options = settings.get('PHANTOMJS_OPTIONS', {}) # ???
max_run = settings.get('PHANTOMJS_MAXRUN', 10) # PhantomJS ???????????, ??10
self.sem = defer.DeferredSemaphore(max_run)
self.queue = Queue.LifoQueue(maxsize=max_run) # LifoQueue ??????
SignalManager(dispatcher.Any).connect(receiver=self._close, signal=signals.spider_closed)
def testSemaphore(self):
N = 13
sem = defer.DeferredSemaphore(N)
controlDeferred = defer.Deferred()
def helper(self, arg):
self.arg = arg
return controlDeferred
results = []
uniqueObject = object()
resultDeferred = sem.run(helper, self=self, arg=uniqueObject)
resultDeferred.addCallback(results.append)
resultDeferred.addCallback(self._incr)
self.assertEquals(results, [])
self.assertEquals(self.arg, uniqueObject)
controlDeferred.callback(None)
self.assertEquals(results.pop(), None)
self.assertEquals(self.counter, 1)
self.counter = 0
for i in range(1, 1 + N):
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, i)
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, N)
sem.release()
self.assertEquals(self.counter, N + 1)
for i in range(1, 1 + N):
sem.release()
self.assertEquals(self.counter, N + 1)
def scanForCovers(self, data):
self.start_time = time.clock()
self.guilist = []
self.counting = 0
self.found = 0
self.notfound = 0
self.error = 0
ds = defer.DeferredSemaphore(tokens=2)
downloads = [ds.run(self.download, url).addCallback(self.parseWebpage, which, type, id, filename, title, url, season, episode).addErrback(self.dataErrorInfo) for which, type, id, filename, title, url, season, episode in data]
finished = defer.DeferredList(downloads).addErrback(self.dataErrorInfo)
def testSemaphore(self):
N = 13
sem = defer.DeferredSemaphore(N)
controlDeferred = defer.Deferred()
def helper(self, arg):
self.arg = arg
return controlDeferred
results = []
uniqueObject = object()
resultDeferred = sem.run(helper, self=self, arg=uniqueObject)
resultDeferred.addCallback(results.append)
resultDeferred.addCallback(self._incr)
self.assertEquals(results, [])
self.assertEquals(self.arg, uniqueObject)
controlDeferred.callback(None)
self.assertEquals(results.pop(), None)
self.assertEquals(self.counter, 1)
self.counter = 0
for i in range(1, 1 + N):
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, i)
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, N)
sem.release()
self.assertEquals(self.counter, N + 1)
for i in range(1, 1 + N):
sem.release()
self.assertEquals(self.counter, N + 1)
def do_migration(sd_hashes):
def print_final_result(result):
num_successes = 0
num_fails = 0
num_blobs = 0
for (success,value) in result:
if success:
num_successes+=1
num_blobs += value['blobs']
print("Success:{}".format(value))
print('num success:{}, num fail:{}, total:{}'.format(num_successes,num_fails,len(sd_hashes)))
else:
num_fails+=1
print("Fail:{}".format(value))
time_taken = time.time() - start_time
sec_per_blob = num_blobs / time_taken
print("All Finished! Streams: {} Successes:{}, Fails:{}, Blobs moved:{}, Min to finish:{}, Sec per blob:{}".format(
len(sd_hashes), num_successes, num_fails, num_blobs, time_taken/60, sec_per_blob))
reactor.stop()
ds = []
sem = defer.DeferredSemaphore(4)
for host, sd_hash in sd_hashes:
d = sem.run(migrate_sd_hash, sd_hash, host)
ds.append(d)
d = defer.DeferredList(ds,consumeErrors=True)
d.addCallback(print_final_result)
reactor.run()
def runScript(self, commands):
"""
Run each command in sequence and return a Deferred that fires when all
commands are completed.
@param commands: A list of strings containing sftp commands.
@return: A C{Deferred} that fires when all commands are completed. The
payload is a list of response strings from the server, in the same
order as the commands.
"""
sem = defer.DeferredSemaphore(1)
dl = [sem.run(self.runCommand, command) for command in commands]
return defer.gatherResults(dl)
def test_semaphoreInvalidTokens(self):
"""
If the token count passed to L{DeferredSemaphore} is less than one
then L{ValueError} is raised.
"""
self.assertRaises(ValueError, defer.DeferredSemaphore, 0)
self.assertRaises(ValueError, defer.DeferredSemaphore, -1)
def test_cancelSemaphoreAfterAcquired(self):
"""
When canceling a L{Deferred} from a L{DeferredSemaphore} that
already has the semaphore, the cancel should have no effect.
"""
def _failOnErrback(_):
self.fail("Unexpected errback call!")
sem = defer.DeferredSemaphore(1)
d = sem.acquire()
d.addErrback(_failOnErrback)
d.cancel()
def test_cancelSemaphoreBeforeAcquired(self):
"""
When canceling a L{Deferred} from a L{DeferredSemaphore} that does
not yet have the semaphore (i.e., the L{Deferred} has not fired),
the cancel should cause a L{defer.CancelledError} failure.
"""
sem = defer.DeferredSemaphore(1)
sem.acquire()
d = sem.acquire()
d.cancel()
self.assertImmediateFailure(d, defer.CancelledError)
def getOverallDeferredSemaphore():
global OVERALL_SEMAPHORE
if OVERALL_SEMAPHORE is None:
preferences = zope.component.queryUtility(
ICollectorPreferences, 'zenpython')
if preferences:
OVERALL_SEMAPHORE = defer.DeferredSemaphore(preferences.options.twistedconcurrenthttp)
else:
# When we are running in a daemon other than zenpython, the preferences
# value will not be available
OVERALL_SEMAPHORE = defer.DeferredSemaphore(DEFAULT_TWISTEDCONCURRENTHTTP)
return OVERALL_SEMAPHORE
def getKeyedDeferredSemaphore(key, limit):
global KEYED_SEMAPHORES
if key not in KEYED_SEMAPHORES:
KEYED_SEMAPHORES[key] = defer.DeferredSemaphore(limit)
semaphore = KEYED_SEMAPHORES[key]
if semaphore.limit != limit:
if limit >= semaphore.tokens:
semaphore.limit = limit
log.info("Unable to lower maximum parallel query limit for %s to %d ", key, limit)
else:
log.warning("Unable to lower maximum parallel query limit for %s to %d at this time (%d connections currently active)", key, limit, semaphore.tokens)
return semaphore
def make_database_unpool(maxthreads=max_threads_for_database_pool):
"""Create a general non-thread-pool for database activity.
Its consumer are the old-school web application, i.e. the plain HTTP and
HTTP API services, and the WebSocket service, for the responsive web UI.
Each thread is fully connected to the database.
However, this is a :class:`ThreadUnpool`, which means that threads are not
actually pooled: a new thread is created for each task. This is ideal for
testing, to improve isolation between tests.
"""
return ThreadUnpool(DeferredSemaphore(maxthreads), ExclusivelyConnected)
def __init__(self, settings):
self.settings = settings
self.options = settings.get('PHANTOMJS_OPTIONS', {})\
max_run = settings.get('PHANTOMJS_MAXRUN', 5)
self.sem = defer.DeferredSemaphore(max_run) # as a means of limiting parallelism
self.queue = queue.LifoQueue(max_run) # last in first out, the content is driver not request
SignalManager(dispatcher.Any).connect(self._close, signal=signals.spider_closed)
def testSemaphore(self):
N = 13
sem = defer.DeferredSemaphore(N)
controlDeferred = defer.Deferred()
def helper(self, arg):
self.arg = arg
return controlDeferred
results = []
uniqueObject = object()
resultDeferred = sem.run(helper, self=self, arg=uniqueObject)
resultDeferred.addCallback(results.append)
resultDeferred.addCallback(self._incr)
self.assertEqual(results, [])
self.assertEqual(self.arg, uniqueObject)
controlDeferred.callback(None)
self.assertIsNone(results.pop())
self.assertEqual(self.counter, 1)
self.counter = 0
for i in range(1, 1 + N):
sem.acquire().addCallback(self._incr)
self.assertEqual(self.counter, i)
success = []
def fail(r):
success.append(False)
def succeed(r):
success.append(True)
d = sem.acquire().addCallbacks(fail, succeed)
d.cancel()
self.assertEqual(success, [True])
sem.acquire().addCallback(self._incr)
self.assertEqual(self.counter, N)
sem.release()
self.assertEqual(self.counter, N + 1)
for i in range(1, 1 + N):
sem.release()
self.assertEqual(self.counter, N + 1)
def request(self, method, uri, headers=None, bodyProducer=None):
"""
Issue a new request.
@param method: The request method to send.
@type method: C{str}
@param uri: The request URI send.
@type uri: C{str}
@param scheme: A string like C{'http'} or C{'https'} (the only two
supported values) to use to determine how to establish the
connection.
@param host: A C{str} giving the hostname which will be connected to in
order to issue a request.
@param port: An C{int} giving the port number the connection will be on.
@param path: A C{str} giving the path portion of the request URL.
@param headers: The request headers to send. If no I{Host} header is
included, one will be added based on the request URI.
@type headers: L{Headers}
@param bodyProducer: An object which will produce the request body or,
if the request body is to be empty, L{None}.
@type bodyProducer: L{IBodyProducer} provider
@return: A L{Deferred} which fires with the result of the request (a
L{Response} instance), or fails if there is a problem setting up a
connection over which to issue the request. It may also fail with
L{SchemeNotSupported} if the scheme of the given URI is not
supported.
@rtype: L{Deferred}
"""
scheme, host, port, path = _parse(uri)
if headers is None:
headers = Headers()
if not headers.hasHeader('host'):
# This is a lot of copying. It might be nice if there were a bit
# less.
headers = Headers(dict(headers.getAllRawHeaders()))
headers.addRawHeader(
'host', self._computeHostValue(scheme, host, port))
if self.persistent:
sem = self._semaphores.get((scheme, host, port))
if sem is None:
sem = DeferredSemaphore(self.maxConnectionsPerHostName)
self._semaphores[scheme, host, port] = sem
return sem.run(self._request, method, scheme, host, port, path,
headers, bodyProducer)
else:
return self._request(
method, scheme, host, port, path, headers, bodyProducer)