+ public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+ setIdentifier(fcpMessage.getField("Identifier"));
+ fcpConnection.addFcpListener(this);
+ messages.add(fcpMessage);
+ return executorService.submit(() -> {
+ synchronized (syncObject) {
+ while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
+ while (messages.peek() != null) {
+ FcpMessage message = messages.poll();
+ fcpConnection.sendMessage(message);
+ }
+ if (isFinished() || connectionClosed.get()) {
+ continue;
+ }
+ syncObject.wait();
+ }
+ }
+ Throwable throwable = connectionFailureReason.get();
+ if (throwable != null) {
+ throw new ExecutionException(throwable);
+ }
+ return getResult();
+ });