private final int port;
private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
private final Supplier<String> clientName;
+ private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions();
public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
}
fcpConnection = createConnection();
this.fcpConnection.set(fcpConnection);
+ try {
+ activeSubscriptions.renew(fcpConnection::addFcpListener, this::subscribeUsk);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
return fcpConnection;
}
@Override
public SubscribeUskCommand subscribeUsk() {
- return new SubscribeUskCommandImpl(threadPool, this::connect);
+ return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions);
}
}