Don’t let clients handle connection failures
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 12:39:21 +0000 (14:39 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 12:39:21 +0000 (14:39 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java

index 575d94b..4439cce 100644 (file)
@@ -171,11 +171,6 @@ class ClientGetCommandImpl implements ClientGetCommand {
                }
 
                @Override
-               protected void consumeConnectionClosed(Throwable throwable) {
-                       failed.set(true);
-               }
-
-               @Override
                public ListenableFuture<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
                        identifier.set(fcpMessage.getField("Identifier"));
                        return super.send(fcpMessage);
index 8a2ebfe..0228cce 100644 (file)
@@ -184,11 +184,6 @@ class ClientPutCommandImpl implements ClientPutCommand {
                        sendMessage(originalClientPut.get());
                }
 
-               @Override
-               protected void consumeConnectionClosed(Throwable throwable) {
-                       putFinished.set(true);
-               }
-
        }
 
 }
index 2c2dfe1..8bdaf5c 100644 (file)
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -66,6 +67,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final FcpConnection fcpConnection;
        private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
        private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = MoreExecutors.listeningDecorator(executorService);
@@ -84,17 +86,21 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while (!isFinished() || !messages.isEmpty()) {
+                               while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
                                        while (messages.peek() != null) {
                                                FcpMessage message = messages.poll();
                                                fcpConnection.sendMessage(message);
                                        }
-                                       if (isFinished()) {
+                                       if (isFinished() || (connectionFailureReason.get() != null)) {
                                                continue;
                                        }
                                        syncObject.wait();
                                }
                        }
+                       Throwable throwable = connectionFailureReason.get();
+                       if (throwable != null) {
+                               throw new ExecutionException(throwable);
+                       }
                        return getResult();
                });
        }
@@ -143,7 +149,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        }
 
        private void consumeClose(Throwable throwable) {
-               consumeConnectionClosed(throwable);
+               connectionFailureReason.set(throwable);
                notifySyncObject();
        }
 
@@ -423,6 +429,4 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                consumeClose(throwable);
        }
 
-       protected void consumeConnectionClosed(Throwable throwable) { }
-
 }
index 6632d59..c01d3b7 100644 (file)
@@ -78,6 +78,18 @@ public class DefaultFcpClientTest {
                keyPairFuture.get();
        }
 
+       @Test(expected = ExecutionException.class)
+       public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
+       throws IOException, ExecutionException, InterruptedException {
+               Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
+               Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
+               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+               fcpServer.connect().get();
+               fcpServer.collectUntil(is("EndMessage"));
+               fcpServer.close();
+               keyPairFuture.get();
+       }
+
        @Test
        public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
                Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
@@ -221,15 +233,14 @@ public class DefaultFcpClientTest {
                assertThat(data.isPresent(), is(false));
        }
 
-       @Test
+       @Test(expected = ExecutionException.class)
        public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
                Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
                connectNode();
                List<String> lines = fcpServer.collectUntil(is("EndMessage"));
                assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
                fcpServer.close();
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.isPresent(), is(false));
+               dataFuture.get();
        }
 
        @Test
index dc07b52..f81201e 100644 (file)
@@ -1,7 +1,9 @@
 package net.pterodactylus.fcp.quelaton;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -354,12 +356,19 @@ public class FcpReplySequenceTest {
 
        @Test
        public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
-               replySequence.setExpectedMessage("ConnectionClosed");
+               replySequence.setExpectedMessage("none");
                Future<Boolean> result = replySequence.send(fcpMessage);
                Throwable throwable = new Throwable();
                replySequence.connectionClosed(fcpConnection, throwable);
-               assertThat(result.get(), is(true));
-               assertThat(replySequence.receivedThrowable.get(), is(throwable));
+               try {
+                       result.get();
+               } catch (ExecutionException e) {
+                       Throwable t = e;
+                       while (t.getCause() != null) {
+                               t = t.getCause();
+                       }
+                       assertThat(t, sameInstance(throwable));
+               }
        }
 
        @FunctionalInterface
@@ -373,7 +382,6 @@ public class FcpReplySequenceTest {
 
                private final AtomicReference<String> gotMessage = new AtomicReference<>();
                private final AtomicReference<String> expectedMessage = new AtomicReference<>();
-               private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
 
                public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                        super(executorService, fcpConnection);
@@ -584,12 +592,6 @@ public class FcpReplySequenceTest {
                        gotMessage.set(fcpMessage.getName());
                }
 
-               @Override
-               protected void consumeConnectionClosed(Throwable throwable) {
-                       receivedThrowable.set(throwable);
-                       gotMessage.set("ConnectionClosed");
-               }
-
        }
 
 }