def testMathRandomFunctionWithSeed(self):
logging.info("RandomFunctionExtension TestCase, with seed")
# Creating SiddhiManager
siddhiManager = SiddhiManager()
# Creating Query
streamDefinition = "define stream inputStream (symbol string, price long, volume long);"
query = "@info(name = 'query1') from inputStream select symbol , math:rand(12) as randNumber " + \
"insert into outputStream;"
# Setting up Siddhi App
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streamDefinition + query)
# Setting up callback
_self_shaddow = self
class ConcreteQueryCallback(QueryCallback):
def receive(self, timestamp, inEvents, outEvents):
PrintEvent(timestamp, inEvents, outEvents)
_self_shaddow.count.addAndGet(len(inEvents))
_self_shaddow.eventArrived = True
if len(inEvents) == 3:
randNumbers = [0, 0, 0]
randNumbers[0] = inEvents[0].getData(1)
randNumbers[1] = inEvents[1].getData(1)
randNumbers[2] = inEvents[2].getData(1)
isDuplicatePresent = False
logging.info(randNumbers[0] + ", " + randNumbers[1])
if randNumbers[0] == randNumbers[1] or randNumbers[0] == randNumbers[2] or randNumbers[1] == \
randNumbers[2]:
isDuplicatePresent = True
_self_shaddow.assertEquals(False, isDuplicatePresent)
siddhiAppRuntime.addCallback("query1", ConcreteQueryCallback())
# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("inputStream")
# Starting event processing
siddhiAppRuntime.start()
# Sending events to Siddhi
inputHandler.send(["IBM", 700.0, LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["XYZ", 60.5, LongType(200)])
sleep(0.5)
self.assertEqual(self.count.get(), 3)
self.assertTrue(self.eventArrived)
siddhiManager.shutdown()
python类TestCase()的实例源码
def testMathRandomFunctionWithoutSeed(self):
logging.info("RandomFunctionExtension TestCase, without seed")
# Creating SiddhiManager
siddhiManager = SiddhiManager()
# Creating Query
streamDefinition = "define stream inputStream (symbol string, price long, volume long);"
query = "@info(name = 'query1') from inputStream select symbol , math:rand() as randNumber " + \
"insert into outputStream;"
# Setting up Siddhi App
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streamDefinition + query)
# Setting up callback
_self_shaddow = self
class ConcreteQueryCallback(QueryCallback):
def receive(self, timestamp, inEvents, outEvents):
PrintEvent(timestamp, inEvents, outEvents)
_self_shaddow.count.addAndGet(len(inEvents))
_self_shaddow.eventArrived = True
if len(inEvents) == 3:
randNumbers = [0, 0, 0]
randNumbers[0] = inEvents[0].getData(1)
randNumbers[1] = inEvents[1].getData(1)
randNumbers[2] = inEvents[2].getData(1)
isDuplicatePresent = False
if randNumbers[0] == randNumbers[1] or randNumbers[0] == randNumbers[2] or randNumbers[1] == \
randNumbers[2]:
isDuplicatePresent = True
_self_shaddow.assertEquals(False, isDuplicatePresent)
siddhiAppRuntime.addCallback("query1", ConcreteQueryCallback())
# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("inputStream")
# Starting event processing
siddhiAppRuntime.start()
# Sending events to Siddhi
inputHandler.send(["IBM", 700.0, LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["XYZ", 60.5, LongType(200)])
sleep(0.1)
self.assertEqual(self.count.get(), 3)
self.assertTrue(self.eventArrived)
siddhiManager.shutdown()
def testStringRegexpFunction(self):
logging.info("RegexpFunctionExtensionTestCase TestCase")
# Creating SiddhiManager
siddhiManager = SiddhiManager()
# Creating Query
streamDefinition = "define stream inputStream (symbol string, price long, regex string);"
query = "@info(name = 'query1') from inputStream select symbol , " + \
"str:regexp(symbol, regex) as beginsWithWSO2 " + \
"insert into outputStream"
# Setting up Siddhi App
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streamDefinition + query)
# Setting up callback
_self_shaddow = self
class ConcreteQueryCallback(QueryCallback):
def receive(self, timestamp, inEvents, outEvents):
PrintEvent(timestamp, inEvents, outEvents)
for inEvent in inEvents:
_self_shaddow.count.addAndGet(1)
if _self_shaddow.count.get() == 1:
_self_shaddow.assertEqual(False, inEvent.getData(1))
if _self_shaddow.count.get() == 2:
_self_shaddow.assertEqual(True, inEvent.getData(1))
if _self_shaddow.count.get() == 3:
_self_shaddow.assertEqual(False, inEvent.getData(1))
_self_shaddow.eventArrived = True
siddhiAppRuntime.addCallback("query1", ConcreteQueryCallback())
# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("inputStream")
# Starting event processing
siddhiAppRuntime.start()
# Sending events to Siddhi
inputHandler.send(["hello hi hello", 700.0, "^WSO2(.*)"])
inputHandler.send(["WSO2 abcdh", 60.5, "WSO(.*h)"])
inputHandler.send(["aaWSO2 hi hello", 60.5, "^WSO2(.*)"])
sleep(0.5)
self.assertEqual(self.count.get(), 3)
self.assertTrue(self.eventArrived)
siddhiManager.shutdown()
def testStringContainsFunction(self):
logging.info("ContainsFunctionExtensionTestCase TestCase")
# Creating SiddhiManager
siddhiManager = SiddhiManager()
# Creating Query
streamDefinition = "define stream inputStream (symbol string, price long, volume long);"
query = "@info(name = 'query1') " + \
"from inputStream " + \
"select symbol , str:contains(symbol, 'WSO2') as isContains " + \
"insert into outputStream;"
# Setting up Siddhi App
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streamDefinition + query)
# Setting up callback
_self_shaddow = self
class ConcreteQueryCallback(QueryCallback):
def receive(self, timestamp, inEvents, outEvents):
PrintEvent(timestamp, inEvents, outEvents)
for inEvent in inEvents:
_self_shaddow.count.addAndGet(1)
if _self_shaddow.count.get() == 1:
_self_shaddow.assertEqual(False, inEvent.getData(1))
if _self_shaddow.count.get() == 2:
_self_shaddow.assertEqual(True, inEvent.getData(1))
if _self_shaddow.count.get() == 3:
_self_shaddow.assertEqual(True, inEvent.getData(1))
_self_shaddow.eventArrived = True
siddhiAppRuntime.addCallback("query1", ConcreteQueryCallback())
# Retrieving input handler to push events into Siddhi
inputHandler = siddhiAppRuntime.getInputHandler("inputStream")
# Starting event processing
siddhiAppRuntime.start()
# Sending events to Siddhi
inputHandler.send(["IBM", 700.0, LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["One of the best middleware is from WSO2.", 60.5, LongType(200)])
sleep(0.5)
self.assertEqual(self.count.get(), 3)
self.assertTrue(self.eventArrived)
siddhiManager.shutdown()