ChannelTest.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码
def server_behavior(settings, logger):

    internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)

    try:
        internal_channel.generate_internal_channel_server_side()
        msg = loads(internal_channel.wait_int_message(dont_wait=False))
        logger.debug("msg : ")
        logger.debug(msg.printable_message())
        internal_channel.reply_to_int_message(OK)
    except ZMQError as e:
        logger.debug(e)

    external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)

    external_channel.generate_external_channel_server_side()
    external_channel.external_channel_publish()

    message = Message()
    message.priority = ALIVE
    message.source_flag = EXT
    message.source_id = '1'
    message.target_id = '1'
    message.target_addr = '192.168.1.1'
    message.target_key = '{}:{}'.format(0, 19)

    sleep(1)

    external_channel.forward(dumps(message))

    logger.debug("try_to_connect TEST COMPLETED")

    stop = False
    while not stop:
        try:
            external_channel.forward(dumps(message))
            sleep(1)
        except zmq.Again:
            stop = True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号