private <T> void eventSourceSubscription(Subscriber<T> s) {
final EventSource source = new EventSource(uri());
final QueuedProducer<T> producer = new QueuedProducer<>(s);
try {
s.add(subscribeEventListener(source, "message", evt -> {
producer.onNext(parse(Js.<MessageEvent<String>>cast(evt).data));
}));
s.add(subscribeEventListener(source, "open", evt -> {
log.fine("Connection opened: " + uri());
}));
s.add(subscribeEventListener(source, "error", evt -> {
log.log(Level.SEVERE, "Error: " + evt);
if (source.readyState == source.CLOSED) {
producer.onError(new RuntimeException("Event source error"));
}
}));
s.setProducer(producer);
s.add(Subscriptions.create(() -> {
// hack because elemental API EventSource.close is missing
Js.<MessagePort>uncheckedCast(source).close();
}));
} catch (Throwable e) {
log.log(Level.FINE, "Received http error for: " + uri(), e);
s.onError(new RuntimeException("Event source error", e));
}
}
XhrResourceBuilder.java 文件源码
java
阅读 100
收藏 0
点赞 0
评论 0
项目:autorest-streaming-example
作者:
评论列表
文章目录