From ff2e4b93bfb1030fd11ec295f4ae6e9bd29b6966 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Fri, 10 Jul 2015 22:40:12 +0200 Subject: [PATCH] Handle connection failures in the reply sequence --- .../net/pterodactylus/fcp/quelaton/ClientHelloImpl.java | 9 +-------- .../net/pterodactylus/fcp/quelaton/FcpReplySequence.java | 13 ++++++++----- .../pterodactylus/fcp/quelaton/FcpReplySequenceTest.java | 14 ++++++-------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java index 9c03291..c093e89 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java @@ -62,7 +62,6 @@ public class ClientHelloImpl { private class ClientHelloReplySequence extends FcpReplySequence { private final AtomicReference receivedNodeHello = new AtomicReference<>(); - private final AtomicBoolean receivedClosed = new AtomicBoolean(); public ClientHelloReplySequence(FcpConnection connection) { super(ClientHelloImpl.this.threadPool, connection); @@ -70,7 +69,7 @@ public class ClientHelloImpl { @Override protected boolean isFinished() { - return receivedNodeHello.get() != null || receivedClosed.get(); + return receivedNodeHello.get() != null; } @Override @@ -83,12 +82,6 @@ public class ClientHelloImpl { receivedNodeHello.set(nodeHello); } - @Override - protected void consumeCloseConnectionDuplicateClientName( - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - receivedClosed.set(true); - } - } } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index ea5d084..9b44b9a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -6,6 +6,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -67,6 +68,7 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private final FcpConnection fcpConnection; private final Queue messages = new ConcurrentLinkedQueue<>(); private final AtomicReference identifier = new AtomicReference<>(); + private final AtomicBoolean connectionClosed = new AtomicBoolean(); private final AtomicReference connectionFailureReason = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { @@ -86,12 +88,12 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener messages.add(fcpMessage); return executorService.submit(() -> { synchronized (syncObject) { - while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) { + while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) { while (messages.peek() != null) { FcpMessage message = messages.poll(); fcpConnection.sendMessage(message); } - if (isFinished() || (connectionFailureReason.get() != null)) { + if (isFinished() || connectionClosed.get()) { continue; } syncObject.wait(); @@ -148,6 +150,7 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private void consumeClose(Throwable throwable) { connectionFailureReason.set(throwable); + connectionClosed.set(true); notifySyncObject(); } @@ -161,11 +164,11 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); + connectionFailureReason.set(new IOException("duplicate client name")); + connectionClosed.set(true); + notifySyncObject(); } - protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } - @Override public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { consume(this::consumeSSKKeypair, sskKeypair); diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java index f81201e..9300ee8 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java @@ -127,9 +127,13 @@ public class FcpReplySequenceTest { waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new); } - @Test + @Test(expected = ExecutionException.class) public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException { - waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new); + replySequence.setExpectedMessage(""); + Future result = replySequence.send(fcpMessage); + replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection, + new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName"))); + result.get(); } @Test @@ -407,12 +411,6 @@ public class FcpReplySequenceTest { } @Override - protected void consumeCloseConnectionDuplicateClientName( - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - gotMessage.set(closeConnectionDuplicateClientName.getName()); - } - - @Override protected void consumeSSKKeypair(SSKKeypair sskKeypair) { gotMessage.set(sskKeypair.getName()); } -- 2.7.4