ActiveMqReceiver.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:tangyuan2 作者:
@Override
public void start() throws Throwable {

    boolean durableSubscribers = false;// 持久化订阅
    String clientID = null;
    if (ChannelType.Topic == queue.getType()) {
        // durableSubscribers = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_DURABLESUBSCRIBERS);
        durableSubscribers = queue.isDurableSubscribers();
        if (durableSubscribers) {
            // clientID = (String) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_CLIENTID);
            clientID = queue.getClientID();
            if (null == clientID) {
                throw new XMLParseException("durable subscribers is missing a clientID: " + queue.getName());
            }
        }
    }

    ActiveMqSource mqSource = (ActiveMqSource) MqContainer.getInstance().getMqSourceManager().getMqSource(queue.getMsKey());

    Connection connection = null;
    if (!durableSubscribers) {
        connection = mqSource.getConnection();
    } else {
        connection = mqSource.getConnection(clientID);
        this.durableSubscriberConn = connection;
    }

    // boolean transacted = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_TRANSACTED);
    // int acknowledgeMode = (Integer) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ACKNOWLEDGEMODE);
    boolean transacted = queue.isTransacted();
    int acknowledgeMode = queue.getAcknowledgeMode();
    session = connection.createSession(transacted, acknowledgeMode);

    Destination destination = null;
    if (ChannelType.Queue == queue.getType()) {
        destination = session.createQueue(queue.getName());
        typeStr = "queue";
    } else if (ChannelType.Topic == queue.getType()) {
        destination = session.createTopic(queue.getName());
        typeStr = "topic";
    }

    MessageConsumer messageConsumer = null;
    if (!durableSubscribers) {
        messageConsumer = session.createConsumer(destination);
    } else {
        messageConsumer = session.createDurableSubscriber((Topic) destination, clientID);
    }

    running = true;

    // boolean asynReceiveMessages = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ASYNRECEIVEMESSAGES);
    boolean asynReceiveMessages = queue.isAsynReceive();
    if (asynReceiveMessages) {
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    // TODO 如果是session.commit();, 是否需要使用同步关键字, 防止提交别的线程的东西
                    // System.out.println("####################:" + Thread.currentThread().getName());
                    processMessage(message);
                } catch (Throwable e) {
                    log.error("listen to the [" + queue.getName() + "] error.", e);
                }
            }
        });
    } else {
        // long receiveTimeout = (Long) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_RECEIVETIMEOUT);
        long receiveTimeout = queue.getReceiveTimeout();
        startSyncReceiveThread(messageConsumer, receiveTimeout);
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号