Handle connection failures in the reply sequence
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 20:40:12 +0000 (22:40 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 20:40:12 +0000 (22:40 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java

index 9c03291..c093e89 100644 (file)
@@ -62,7 +62,6 @@ public class ClientHelloImpl {
        private class ClientHelloReplySequence extends FcpReplySequence<Boolean> {
 
                private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
-               private final AtomicBoolean receivedClosed = new AtomicBoolean();
 
                public ClientHelloReplySequence(FcpConnection connection) {
                        super(ClientHelloImpl.this.threadPool, connection);
@@ -70,7 +69,7 @@ public class ClientHelloImpl {
 
                @Override
                protected boolean isFinished() {
-                       return receivedNodeHello.get() != null || receivedClosed.get();
+                       return receivedNodeHello.get() != null;
                }
 
                @Override
@@ -83,12 +82,6 @@ public class ClientHelloImpl {
                        receivedNodeHello.set(nodeHello);
                }
 
-               @Override
-               protected void consumeCloseConnectionDuplicateClientName(
-                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-                       receivedClosed.set(true);
-               }
-
        }
 
 }
index ea5d084..9b44b9a 100644 (file)
@@ -6,6 +6,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -67,6 +68,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final FcpConnection fcpConnection;
        private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
        private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicBoolean connectionClosed = new AtomicBoolean();
        private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
@@ -86,12 +88,12 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
+                               while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
                                        while (messages.peek() != null) {
                                                FcpMessage message = messages.poll();
                                                fcpConnection.sendMessage(message);
                                        }
-                                       if (isFinished() || (connectionFailureReason.get() != null)) {
+                                       if (isFinished() || connectionClosed.get()) {
                                                continue;
                                        }
                                        syncObject.wait();
@@ -148,6 +150,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        private void consumeClose(Throwable throwable) {
                connectionFailureReason.set(throwable);
+               connectionClosed.set(true);
                notifySyncObject();
        }
 
@@ -161,11 +164,11 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        @Override
        public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+               connectionFailureReason.set(new IOException("duplicate client name"));
+               connectionClosed.set(true);
+               notifySyncObject();
        }
 
-       protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
-
        @Override
        public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
                consume(this::consumeSSKKeypair, sskKeypair);
index f81201e..9300ee8 100644 (file)
@@ -127,9 +127,13 @@ public class FcpReplySequenceTest {
                waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
        }
 
-       @Test
+       @Test(expected = ExecutionException.class)
        public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
-               waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
+               replySequence.setExpectedMessage("");
+               Future<Boolean> result = replySequence.send(fcpMessage);
+               replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection,
+                       new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+               result.get();
        }
 
        @Test
@@ -407,12 +411,6 @@ public class FcpReplySequenceTest {
                }
 
                @Override
-               protected void consumeCloseConnectionDuplicateClientName(
-                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-                       gotMessage.set(closeConnectionDuplicateClientName.getName());
-               }
-
-               @Override
                protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
                        gotMessage.set(sskKeypair.getName());
                }