X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FFcpReplySequence.java;h=2c2dfe1eb3464b0ba597b9d1ecd7681c56982009;hb=24c7c279c67bd5fe34bfef81e1ab936d7b555b3a;hp=106bd8abceec60bd1054615215f0f9449247a930;hpb=14d38c217ac5e222e8d0d26eb4264fd69f9395d6;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 106bd8a..2c2dfe1 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,12 +1,15 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.ConfigData; import net.pterodactylus.fcp.DataFound; @@ -62,15 +65,21 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private final ListeningExecutorService executorService; private final FcpConnection fcpConnection; private final Queue messages = new ConcurrentLinkedQueue<>(); + private final AtomicReference identifier = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { this.executorService = MoreExecutors.listeningDecorator(executorService); this.fcpConnection = fcpConnection; } + protected void setIdentifier(String identifier) { + this.identifier.set(identifier); + } + protected abstract boolean isFinished(); public ListenableFuture send(FcpMessage fcpMessage) throws IOException { + setIdentifier(fcpMessage.getField("Identifier")); fcpConnection.addFcpListener(this); messages.add(fcpMessage); return executorService.submit(() -> { @@ -110,14 +119,27 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener fcpConnection.removeFcpListener(this); } - private void consume(Consumer consumer, M message) { + private void consume(Consumer consumer, M message) { + consume(consumer, message, "Identifier"); + } + + private void consume(Consumer consumer, M message, + String identifier) { + if (Objects.equals(message.getField(identifier), this.identifier.get())) { + consumeAlways(consumer, message); + } + } + + private void consumeAlways(Consumer consumer, M message) { consumer.accept(message); notifySyncObject(); } private void consumeUnknown(FcpMessage fcpMessage) { - consumeUnknownMessage(fcpMessage); - notifySyncObject(); + if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) { + consumeUnknownMessage(fcpMessage); + notifySyncObject(); + } } private void consumeClose(Throwable throwable) { @@ -135,7 +157,7 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); + consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); } protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } @@ -191,14 +213,14 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(this::consumeTestDDAReply, testDDAReply); + consume(this::consumeTestDDAReply, testDDAReply, "Directory"); } protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } @Override public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(this::consumeTestDDAComplete, testDDAComplete); + consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); } protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }