@Override
public void start() throws Throwable {
boolean durableSubscribers = false;// 持久化订阅
String clientID = null;
if (ChannelType.Topic == queue.getType()) {
// durableSubscribers = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_DURABLESUBSCRIBERS);
durableSubscribers = queue.isDurableSubscribers();
if (durableSubscribers) {
// clientID = (String) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_CLIENTID);
clientID = queue.getClientID();
if (null == clientID) {
throw new XMLParseException("durable subscribers is missing a clientID: " + queue.getName());
}
}
}
ActiveMqSource mqSource = (ActiveMqSource) MqContainer.getInstance().getMqSourceManager().getMqSource(queue.getMsKey());
Connection connection = null;
if (!durableSubscribers) {
connection = mqSource.getConnection();
} else {
connection = mqSource.getConnection(clientID);
this.durableSubscriberConn = connection;
}
// boolean transacted = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_TRANSACTED);
// int acknowledgeMode = (Integer) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ACKNOWLEDGEMODE);
boolean transacted = queue.isTransacted();
int acknowledgeMode = queue.getAcknowledgeMode();
session = connection.createSession(transacted, acknowledgeMode);
Destination destination = null;
if (ChannelType.Queue == queue.getType()) {
destination = session.createQueue(queue.getName());
typeStr = "queue";
} else if (ChannelType.Topic == queue.getType()) {
destination = session.createTopic(queue.getName());
typeStr = "topic";
}
MessageConsumer messageConsumer = null;
if (!durableSubscribers) {
messageConsumer = session.createConsumer(destination);
} else {
messageConsumer = session.createDurableSubscriber((Topic) destination, clientID);
}
running = true;
// boolean asynReceiveMessages = (Boolean) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_ASYNRECEIVEMESSAGES);
boolean asynReceiveMessages = queue.isAsynReceive();
if (asynReceiveMessages) {
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
// TODO 如果是session.commit();, 是否需要使用同步关键字, 防止提交别的线程的东西
// System.out.println("####################:" + Thread.currentThread().getName());
processMessage(message);
} catch (Throwable e) {
log.error("listen to the [" + queue.getName() + "] error.", e);
}
}
});
} else {
// long receiveTimeout = (Long) queue.getProperties().get(ActiveMqVo.ACTIVEMQ_C_RECEIVETIMEOUT);
long receiveTimeout = queue.getReceiveTimeout();
startSyncReceiveThread(messageConsumer, receiveTimeout);
}
}
ActiveMqReceiver.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:tangyuan2
作者:
评论列表
文章目录