Handle connection failures in the reply sequence
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
index afaac4c..9b44b9a 100644 (file)
@@ -4,7 +4,9 @@ import java.io.IOException;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -66,6 +68,8 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final FcpConnection fcpConnection;
        private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
        private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicBoolean connectionClosed = new AtomicBoolean();
+       private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = MoreExecutors.listeningDecorator(executorService);
@@ -84,17 +88,21 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while (!isFinished() || !messages.isEmpty()) {
+                               while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
                                        while (messages.peek() != null) {
                                                FcpMessage message = messages.poll();
                                                fcpConnection.sendMessage(message);
                                        }
-                                       if (isFinished()) {
+                                       if (isFinished() || connectionClosed.get()) {
                                                continue;
                                        }
                                        syncObject.wait();
                                }
                        }
+                       Throwable throwable = connectionFailureReason.get();
+                       if (throwable != null) {
+                               throw new ExecutionException(throwable);
+                       }
                        return getResult();
                });
        }
@@ -126,20 +134,23 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
                        String identifier) {
                if (Objects.equals(message.getField(identifier), this.identifier.get())) {
-                       consumer.accept(message);
-                       notifySyncObject();
+                       consumeAlways(consumer, message);
                }
        }
 
+       private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
+               consumer.accept(message);
+               notifySyncObject();
+       }
+
        private void consumeUnknown(FcpMessage fcpMessage) {
-               if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
-                       consumeUnknownMessage(fcpMessage);
-                       notifySyncObject();
-               }
+               consumeUnknownMessage(fcpMessage);
+               notifySyncObject();
        }
 
        private void consumeClose(Throwable throwable) {
-               consumeConnectionClosed(throwable);
+               connectionFailureReason.set(throwable);
+               connectionClosed.set(true);
                notifySyncObject();
        }
 
@@ -153,11 +164,11 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        @Override
        public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+               connectionFailureReason.set(new IOException("duplicate client name"));
+               connectionClosed.set(true);
+               notifySyncObject();
        }
 
-       protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
-
        @Override
        public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
                consume(this::consumeSSKKeypair, sskKeypair);
@@ -419,6 +430,4 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                consumeClose(throwable);
        }
 
-       protected void consumeConnectionClosed(Throwable throwable) { }
-
 }