Allow the reply sequence to initiate own messages
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 9 Jul 2015 05:01:01 +0000 (07:01 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 9 Jul 2015 05:01:01 +0000 (07:01 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java

index 9b2db2a..106bd8a 100644 (file)
@@ -1,6 +1,8 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 
@@ -59,6 +61,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final Object syncObject = new Object();
        private final ListeningExecutorService executorService;
        private final FcpConnection fcpConnection;
+       private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = MoreExecutors.listeningDecorator(executorService);
@@ -69,10 +72,17 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
                fcpConnection.addFcpListener(this);
-               fcpConnection.sendMessage(fcpMessage);
+               messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while (!isFinished()) {
+                               while (!isFinished() || !messages.isEmpty()) {
+                                       while (messages.peek() != null) {
+                                               FcpMessage message = messages.poll();
+                                               fcpConnection.sendMessage(message);
+                                       }
+                                       if (isFinished()) {
+                                               continue;
+                                       }
                                        syncObject.wait();
                                }
                        }
@@ -80,6 +90,17 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                });
        }
 
+       protected void sendMessage(FcpMessage fcpMessage) {
+               messages.add(fcpMessage);
+               notifySyncObject();
+       }
+
+       private void notifySyncObject() {
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
+               }
+       }
+
        protected R getResult() {
                return null;
        }
@@ -91,23 +112,17 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        private <M> void consume(Consumer<M> consumer,  M message) {
                consumer.accept(message);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
-               }
+               notifySyncObject();
        }
 
        private void consumeUnknown(FcpMessage fcpMessage) {
                consumeUnknownMessage(fcpMessage);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
-               }
+               notifySyncObject();
        }
 
        private void consumeClose(Throwable throwable) {
                consumeConnectionClosed(throwable);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
-               }
+               notifySyncObject();
        }
 
        @Override
index 07864ae..dc07b52 100644 (file)
@@ -69,9 +69,9 @@ public class FcpReplySequenceTest {
        private final FcpMessage fcpMessage = new FcpMessage("Test");
 
        @Test
-       public void canSendMessage() throws IOException {
+       public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
                FcpReplySequence replySequence = createBasicReplySequence();
-               replySequence.send(fcpMessage);
+               replySequence.send(fcpMessage).get();
                verify(fcpConnection).sendMessage(fcpMessage);
        }