import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
private final FcpConnection fcpConnection;
private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
private final AtomicReference<String> identifier = new AtomicReference<>();
+ private final AtomicBoolean connectionClosed = new AtomicBoolean();
+ private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = MoreExecutors.listeningDecorator(executorService);
messages.add(fcpMessage);
return executorService.submit(() -> {
synchronized (syncObject) {
- while (!isFinished() || !messages.isEmpty()) {
+ while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
while (messages.peek() != null) {
FcpMessage message = messages.poll();
fcpConnection.sendMessage(message);
}
- if (isFinished()) {
+ if (isFinished() || connectionClosed.get()) {
continue;
}
syncObject.wait();
}
}
+ Throwable throwable = connectionFailureReason.get();
+ if (throwable != null) {
+ throw new ExecutionException(throwable);
+ }
return getResult();
});
}
}
private void consumeUnknown(FcpMessage fcpMessage) {
- if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
- consumeUnknownMessage(fcpMessage);
- notifySyncObject();
- }
+ consumeUnknownMessage(fcpMessage);
+ notifySyncObject();
}
private void consumeClose(Throwable throwable) {
- consumeConnectionClosed(throwable);
+ connectionFailureReason.set(throwable);
+ connectionClosed.set(true);
notifySyncObject();
}
@Override
public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+ connectionFailureReason.set(new IOException("duplicate client name"));
+ connectionClosed.set(true);
+ notifySyncObject();
}
- protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
-
@Override
public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
consume(this::consumeSSKKeypair, sskKeypair);
consumeClose(throwable);
}
- protected void consumeConnectionClosed(Throwable throwable) { }
-
}