def test_pauseProducing(self):
"""
L{FileBodyProducer.pauseProducing} temporarily suspends writing bytes
from the input file to the given L{IConsumer}.
"""
expectedResult = b"hello, world"
readSize = 5
output = BytesIO()
consumer = FileConsumer(output)
producer = FileBodyProducer(
BytesIO(expectedResult), self.cooperator, readSize)
complete = producer.startProducing(consumer)
self._scheduled.pop(0)()
self.assertEqual(output.getvalue(), expectedResult[:5])
producer.pauseProducing()
# Sort of depends on an implementation detail of Cooperator: even
# though the only task is paused, there's still a scheduled call. If
# this were to go away because Cooperator became smart enough to cancel
# this call in this case, that would be fine.
self._scheduled.pop(0)()
# Since the producer is paused, no new data should be here.
self.assertEqual(output.getvalue(), expectedResult[:5])
self.assertEqual([], self._scheduled)
self.assertNoResult(complete)
python类Cooperator()的实例源码
def testStoppedRejectsNewTasks(self):
"""
Test that Cooperators refuse new tasks when they have been stopped.
"""
def testwith(stuff):
c = task.Cooperator()
c.stop()
d = c.coiterate(iter(()), stuff)
d.addCallback(self.cbIter)
d.addErrback(self.ebIter)
return d.addCallback(lambda result:
self.assertEqual(result, self.RESULT))
return testwith(None).addCallback(lambda ign: testwith(defer.Deferred()))
def testUnexpectedError(self):
c = task.Cooperator()
def myiter():
if 0:
yield None
else:
raise RuntimeError()
d = c.coiterate(myiter())
return self.assertFailure(d, RuntimeError)
def testUnexpectedErrorActuallyLater(self):
def myiter():
D = defer.Deferred()
reactor.callLater(0, D.errback, RuntimeError())
yield D
c = task.Cooperator()
d = c.coiterate(myiter())
return self.assertFailure(d, RuntimeError)
def testCallbackReCoiterate(self):
"""
If a callback to a deferred returned by coiterate calls coiterate on
the same Cooperator, we should make sure to only do the minimal amount
of scheduling work. (This test was added to demonstrate a specific bug
that was found while writing the scheduler.)
"""
calls = []
class FakeCall:
def __init__(self, func):
self.func = func
def __repr__(self):
return '<FakeCall %r>' % (self.func,)
def sched(f):
self.assertFalse(calls, repr(calls))
calls.append(FakeCall(f))
return calls[-1]
c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda: lambda: True)
d = c.coiterate(iter(()))
done = []
def anotherTask(ign):
c.coiterate(iter(())).addBoth(done.append)
d.addCallback(anotherTask)
work = 0
while not done:
work += 1
while calls:
calls.pop(0).func()
work += 1
if work > 50:
self.fail("Cooperator took too long")
def test_removingLastTaskStopsScheduledCall(self):
"""
If the last task in a Cooperator is removed, the scheduled call for
the next tick is cancelled, since it is no longer necessary.
This behavior is useful for tests that want to assert they have left
no reactor state behind when they're done.
"""
calls = [None]
def sched(f):
calls[0] = FakeDelayedCall(f)
return calls[0]
coop = task.Cooperator(scheduler=sched)
# Add two task; this should schedule the tick:
task1 = coop.cooperate(iter([1, 2]))
task2 = coop.cooperate(iter([1, 2]))
self.assertEqual(calls[0].func, coop._tick)
# Remove first task; scheduled call should still be going:
task1.stop()
self.assertFalse(calls[0].cancelled)
self.assertEqual(coop._delayedCall, calls[0])
# Remove second task; scheduled call should be cancelled:
task2.stop()
self.assertTrue(calls[0].cancelled)
self.assertIsNone(coop._delayedCall)
# Add another task; scheduled call will be recreated:
coop.cooperate(iter([1, 2]))
self.assertFalse(calls[0].cancelled)
self.assertEqual(coop._delayedCall, calls[0])
def test_runningWhenStarted(self):
"""
L{Cooperator.running} reports C{True} if the L{Cooperator}
was started on creation.
"""
c = task.Cooperator()
self.assertTrue(c.running)
def test_runningWhenRunning(self):
"""
L{Cooperator.running} reports C{True} when the L{Cooperator}
is running.
"""
c = task.Cooperator(started=False)
c.start()
self.addCleanup(c.stop)
self.assertTrue(c.running)
def test_runningWhenStopped(self):
"""
L{Cooperator.running} reports C{False} after the L{Cooperator}
has been stopped.
"""
c = task.Cooperator(started=False)
c.start()
c.stop()
self.assertFalse(c.running)
def _main(reactor, args, config):
summary_hooks = stethoscope.plugins.utils.instantiate_plugins(config,
namespace='stethoscope.batch.plugins.summary')
if args.input is None:
emails = config['BATCH_GET_EMAILS']()
else:
emails = [email.strip().strip('"') for email in args.input.readlines()]
logger.info("retrieving devices for {:d} users", len(emails))
results = dict()
deferreds = list()
cooperator = task.Cooperator()
work = work_generator(args, config, emails, results)
for idx in six.moves.range(args.limit):
deferreds.append(cooperator.coiterate(work))
deferred = defer.gatherResults(deferreds)
def log_results(_):
num_devices = sum(len(values) for values in six.itervalues(results))
logger.info("retrieved {:d} unique devices for {:d} users", num_devices, len(emails))
return _
deferred.addCallback(log_results)
if not args.collect_only:
for summary_hook in summary_hooks:
def _hook(_):
summary_hook.obj.post(results)
return _
deferred.addCallback(_hook)
return deferred
def test_whenDone(self):
"""
L{CooperativeTask.whenDone} returns a Deferred which fires when the
Cooperator's iterator is exhausted. It returns a new Deferred each
time it is called; callbacks added to other invocations will not modify
the value that subsequent invocations will fire with.
"""
deferred1 = self.task.whenDone()
deferred2 = self.task.whenDone()
results1 = []
results2 = []
final1 = []
final2 = []
def callbackOne(result):
results1.append(result)
return 1
def callbackTwo(result):
results2.append(result)
return 2
deferred1.addCallback(callbackOne)
deferred2.addCallback(callbackTwo)
deferred1.addCallback(final1.append)
deferred2.addCallback(final2.append)
# exhaust the task iterator
# callbacks fire
self.stopNext()
self.scheduler.pump()
self.assertEqual(len(results1), 1)
self.assertEqual(len(results2), 1)
self.assertIs(results1[0], self.task._iterator)
self.assertIs(results2[0], self.task._iterator)
self.assertEqual(final1, [1])
self.assertEqual(final2, [2])
def getherproxy_req(self):
"""get proxy from gatherproxy.com"""
block = True
if not block:
# method1-nonblock
url = 'http://gatherproxy.com/proxylist/anonymity/?t=Elite'
settings = Settings()
@defer.inlineCallbacks
def getpage(request,page):
try:
print("Request {},pagenumber:{}".format(request,page))
response = yield HTTP11DownloadHandler(settings).download_request(request,spider=None)
if response.status==200:
self._get_proxy(response.body.decode(),country=self.country)
except Exception as e:
print(e)
print("[!] Failed: request {} of page:{}".format(request,page))
pass##
def iter_page():
work =(
getpage(FormRequest(url=url,
headers=self.headers,
formdata={'Type':'elite','PageIdx':str(page),'Uptime':'0'},
meta={'download_timeout':60}),page=page) for page in range(1,self.maxpage+1)
)
coop = task.Cooperator()
join = defer.DeferredList(coop.coiterate(work) for i in range(self.concurrent))
join.addBoth(lambda _: reactor.stop())
iter_page()
reactor.run()
else:
# method 2- block
url = 'http://gatherproxy.com/proxylist/anonymity/?t=Elite'
for pagenum in range(1,self.maxpage+1):
try:
data = {'Type':'elite','PageIdx':str(pagenum),'Uptime':'0'}
headers = copy.copy(self.headers)
r = requests.post(url, headers=headers, data=data)
except Exception as e:
print(str(e))
print('[!] Failed: %s' % url)
gatherproxy_list = []
return gatherproxy_list
self._get_proxy(r.text,country=self.country)
def proxy_checker(self):
""" Further test for proxy"""
def main():
success={}
settings = Settings()
@defer.inlineCallbacks
def getResponse(proxy,request):
try:
print("Request {} using proxy:{}".format(request,proxy))
response = yield HTTP11DownloadHandler(settings).download_request(request=request,spider=None)
if response.status==200:
success[proxy]=success.setdefault(proxy,0) + 1
print("Successful(+{}/{}) ip:{}".format(success[proxy],self.checknum,proxy))
if success[proxy]/self.checknum>= self.checkthreshold:
self.passproxy.add(proxy)
except Exception as e:
#print(e)
pass
def output_better_proxy(_):
""" writing proxies to file"""
with open('validProxy.txt','w') as f:
for p in self.passproxy:
print(p)
f.write(p+'\n')
def iter_proxy():
# work needs to be a generator, i tried to use list but failed to realize concurrent
work = ( getResponse(proxy,Request(url='http://myip.dnsdynamic.org',
headers=self.headers,
meta={ 'proxy':"http://"+proxy, 'download_timeout':self.timeout})) for proxy in self.proxy_list for times in range(self.checknum)
)
coop = task.Cooperator()
join = defer.DeferredList(coop.coiterate(work) for i in range(self.concurrent))
join.addCallback(output_better_proxy)
join.addCallback(lambda _: reactor.stop())
iter_proxy()
main()
reactor.run()