def testAuth(self):
self.authComplete = False
outlist = []
ca = ConnectComponentAuthenticator("cjid", "secret")
xs = xmlstream.XmlStream(ca)
xs.transport = DummyTransport(outlist)
xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
self.authPassed)
# Go...
xs.connectionMade()
xs.dataReceived("<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' from='cjid' id='12345'>")
# Calculate what we expect the handshake value to be
hv = sha.new("%s%s" % ("12345", "secret")).hexdigest()
self.assertEquals(outlist[1], "<handshake>%s</handshake>" % (hv))
xs.dataReceived("<handshake/>")
self.assertEquals(self.authComplete, True)
评论列表
文章目录