From 14d38c217ac5e222e8d0d26eb4264fd69f9395d6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Thu, 9 Jul 2015 07:01:01 +0200 Subject: [PATCH] Allow the reply sequence to initiate own messages --- .../fcp/quelaton/FcpReplySequence.java | 37 +++++++++++++++------- .../fcp/quelaton/FcpReplySequenceTest.java | 4 +-- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 9b2db2a..106bd8a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,6 +1,8 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -59,6 +61,7 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private final Object syncObject = new Object(); private final ListeningExecutorService executorService; private final FcpConnection fcpConnection; + private final Queue messages = new ConcurrentLinkedQueue<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { this.executorService = MoreExecutors.listeningDecorator(executorService); @@ -69,10 +72,17 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener public ListenableFuture send(FcpMessage fcpMessage) throws IOException { fcpConnection.addFcpListener(this); - fcpConnection.sendMessage(fcpMessage); + messages.add(fcpMessage); return executorService.submit(() -> { synchronized (syncObject) { - while (!isFinished()) { + while (!isFinished() || !messages.isEmpty()) { + while (messages.peek() != null) { + FcpMessage message = messages.poll(); + fcpConnection.sendMessage(message); + } + if (isFinished()) { + continue; + } syncObject.wait(); } } @@ -80,6 +90,17 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener }); } + protected void sendMessage(FcpMessage fcpMessage) { + messages.add(fcpMessage); + notifySyncObject(); + } + + private void notifySyncObject() { + synchronized (syncObject) { + syncObject.notifyAll(); + } + } + protected R getResult() { return null; } @@ -91,23 +112,17 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private void consume(Consumer consumer, M message) { consumer.accept(message); - synchronized (syncObject) { - syncObject.notifyAll(); - } + notifySyncObject(); } private void consumeUnknown(FcpMessage fcpMessage) { consumeUnknownMessage(fcpMessage); - synchronized (syncObject) { - syncObject.notifyAll(); - } + notifySyncObject(); } private void consumeClose(Throwable throwable) { consumeConnectionClosed(throwable); - synchronized (syncObject) { - syncObject.notifyAll(); - } + notifySyncObject(); } @Override diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java index 07864ae..dc07b52 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java @@ -69,9 +69,9 @@ public class FcpReplySequenceTest { private final FcpMessage fcpMessage = new FcpMessage("Test"); @Test - public void canSendMessage() throws IOException { + public void canSendMessage() throws IOException, ExecutionException, InterruptedException { FcpReplySequence replySequence = createBasicReplySequence(); - replySequence.send(fcpMessage); + replySequence.send(fcpMessage).get(); verify(fcpConnection).sendMessage(fcpMessage); } -- 2.7.4