def test_runStopAfterTests(self):
"""
L{DistTrialRunner} calls C{reactor.stop} and unlocks the test directory
once the tests have run.
"""
functions = []
class FakeReactorWithSuccess(FakeReactor):
def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.succeedingRun
def succeedingRun(self, case, result):
return succeed(None)
def addSystemEventTrigger(oself, phase, event, function):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)
workingDirectory = self.runner._workingDirectory
fakeReactor = FakeReactorWithSuccess()
self.runner.run(TestCase(), fakeReactor)
def check():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
self.assertEqual(1, fakeReactor.stopCount)
# We don't wait for the process deferreds here, so nothing is
# returned by the function before shutdown
self.assertIdentical(None, functions[0]())
return deferLater(reactor, 0, check)
评论列表
文章目录