Handle connection failures in the reply sequence
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
index ea5d084..9b44b9a 100644 (file)
@@ -6,6 +6,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -67,6 +68,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final FcpConnection fcpConnection;
        private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
        private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicBoolean connectionClosed = new AtomicBoolean();
        private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
@@ -86,12 +88,12 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
+                               while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
                                        while (messages.peek() != null) {
                                                FcpMessage message = messages.poll();
                                                fcpConnection.sendMessage(message);
                                        }
-                                       if (isFinished() || (connectionFailureReason.get() != null)) {
+                                       if (isFinished() || connectionClosed.get()) {
                                                continue;
                                        }
                                        syncObject.wait();
@@ -148,6 +150,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        private void consumeClose(Throwable throwable) {
                connectionFailureReason.set(throwable);
+               connectionClosed.set(true);
                notifySyncObject();
        }
 
@@ -161,11 +164,11 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        @Override
        public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+               connectionFailureReason.set(new IOException("duplicate client name"));
+               connectionClosed.set(true);
+               notifySyncObject();
        }
 
-       protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
-
        @Override
        public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
                consume(this::consumeSSKKeypair, sskKeypair);