Throw an exception on EOF!
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
index 9b2db2a..2c2dfe1 100644 (file)
@@ -1,10 +1,15 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.ConfigData;
 import net.pterodactylus.fcp.DataFound;
@@ -59,20 +64,34 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final Object syncObject = new Object();
        private final ListeningExecutorService executorService;
        private final FcpConnection fcpConnection;
+       private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+       private final AtomicReference<String> identifier = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = MoreExecutors.listeningDecorator(executorService);
                this.fcpConnection = fcpConnection;
        }
 
+       protected void setIdentifier(String identifier) {
+               this.identifier.set(identifier);
+       }
+
        protected abstract boolean isFinished();
 
        public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+               setIdentifier(fcpMessage.getField("Identifier"));
                fcpConnection.addFcpListener(this);
-               fcpConnection.sendMessage(fcpMessage);
+               messages.add(fcpMessage);
                return executorService.submit(() -> {
                        synchronized (syncObject) {
-                               while (!isFinished()) {
+                               while (!isFinished() || !messages.isEmpty()) {
+                                       while (messages.peek() != null) {
+                                               FcpMessage message = messages.poll();
+                                               fcpConnection.sendMessage(message);
+                                       }
+                                       if (isFinished()) {
+                                               continue;
+                                       }
                                        syncObject.wait();
                                }
                        }
@@ -80,6 +99,17 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                });
        }
 
+       protected void sendMessage(FcpMessage fcpMessage) {
+               messages.add(fcpMessage);
+               notifySyncObject();
+       }
+
+       private void notifySyncObject() {
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
+               }
+       }
+
        protected R getResult() {
                return null;
        }
@@ -89,25 +119,32 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                fcpConnection.removeFcpListener(this);
        }
 
-       private <M> void consume(Consumer<M> consumer,  M message) {
-               consumer.accept(message);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+               consume(consumer, message, "Identifier");
+       }
+
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+                       String identifier) {
+               if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+                       consumeAlways(consumer, message);
                }
        }
 
+       private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
+               consumer.accept(message);
+               notifySyncObject();
+       }
+
        private void consumeUnknown(FcpMessage fcpMessage) {
-               consumeUnknownMessage(fcpMessage);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
+               if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
+                       consumeUnknownMessage(fcpMessage);
+                       notifySyncObject();
                }
        }
 
        private void consumeClose(Throwable throwable) {
                consumeConnectionClosed(throwable);
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
-               }
+               notifySyncObject();
        }
 
        @Override
@@ -120,7 +157,7 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        @Override
        public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+               consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
        }
 
        protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
@@ -176,14 +213,14 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        @Override
        public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
-               consume(this::consumeTestDDAReply, testDDAReply);
+               consume(this::consumeTestDDAReply, testDDAReply, "Directory");
        }
 
        protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
 
        @Override
        public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
-               consume(this::consumeTestDDAComplete, testDDAComplete);
+               consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
        }
 
        protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }