def _memoryTask(settings, logger,master, url_setFrontend, url_getFrontend, url_getBackend, url_setBackend):
from Cache import Slab, CacheSlubLRU
# grab settings
slabSize = settings.getSlabSize()
preallocatedPool = settings.getPreallocatedPool()
getterNumber = settings.getGetterThreadNumber()
# initialize cache
cache = CacheSlubLRU(preallocatedPool , slabSize, logger) #set as 10 mega, 1 mega per slab
#log
logger.debug("Memory Process initialized:" + str(preallocatedPool) + "B, get# = " + str(getterNumber))
# Prepare our context and sockets
context = zmq.Context.instance()
# Socket to talk to get
socketGetFrontend = context.socket(zmq.ROUTER)
socketGetFrontend.bind(url_getFrontend)
# Socket to talk to workers
socketGetBackend = context.socket(zmq.DEALER)
socketGetBackend.bind(url_getBackend)
timing = {}
timing["getters"] = []
timing["setters"] = [-1]
Thread(name='MemoryGetProxy',target=_proxyThread, args=(logger, master, socketGetFrontend, socketGetBackend, url_getFrontend, url_getBackend)).start()
for i in range(getterNumber):
timing["getters"].append(-1)
th = Thread(name='MemoryGetter',target=_getThread, args=(i,logger, settings, cache,master,url_getBackend, timing))
th.start()
slaveSetQueue = Queue.Queue()
hostState = {}
hostState["current"] = None
Thread(name='MemoryPerformanceMetricator',target=_memoryMetricatorThread, args=(logger, cache, settings, master, timing)).start()
Thread(name='MemorySlaveSetter',target=_setToSlaveThread, args=(logger,settings, cache,master,url_getBackend, slaveSetQueue, hostState)).start()
_setThread(logger, settings, cache,master,url_setFrontend,slaveSetQueue, hostState, timing)
评论列表
文章目录