def _setToSlaveThread(logger,settings, cache, master,url, queue, hostState):
if(not master):
return
socket = zmq.Context.instance().socket(zmq.PUSH)
import time
while hostState["current"] is None:
logger.debug("cannot send to slave, net info: "+ str(hostState["current"]))
time.sleep(1)
slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())
socket.connect(slaveAddress)
oldAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())
logger.debug("Finally I'm configured")
while True:
objToSend = queue.get()
if(slaveAddress != None):
sended = False
while( not sended):
try:
slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())
if(slaveAddress != oldAddress):
oldAddress = slaveAddress
socket = zmq.Context.instance().socket(zmq.PUSH)
socket.connect(slaveAddress)
logger.debug("Change of slave:" + slaveAddress)
socket.send(dumps(Command(SETCOMMAND, objToSend.key, objToSend.value)))
if(settings.isVerbose()):
logger.debug("sended current key to slave: "+str(objToSend.key) +" to " + str(slaveAddress))
sended = True
except Exception as e:
logger.warning("error in slave: " + str(e))
评论列表
文章目录