From 9eb54fff8cfc8a6b8ecf89d49a9a9898ec8f0215 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Fri, 10 Jul 2015 11:29:17 +0200 Subject: [PATCH] In an FCP reply sequence, only match replies with the correct identifier --- .../fcp/quelaton/ClientPutCommandImpl.java | 2 ++ .../fcp/quelaton/FcpReplySequence.java | 32 +++++++++++++++++----- .../fcp/quelaton/DefaultFcpClientTest.java | 5 ++-- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java index 44850af..6e5f7e1 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java @@ -168,6 +168,7 @@ class ClientPutCommandImpl implements ClientPutCommand { protected void consumeProtocolError(ProtocolError protocolError) { if (protocolError.getIdentifier().equals(identifier.get())) { if (protocolError.getCode() == 25) { + setIdentifier(directory.get()); sendMessage(new TestDDARequest(directory.get(), true, false)); } else { putFinished.set(true); @@ -190,6 +191,7 @@ class ClientPutCommandImpl implements ClientPutCommand { @Override protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { if (testDDAComplete.getDirectory().equals(directory.get())) { + setIdentifier(originalClientPut.get().getField("Identifier")); sendMessage(originalClientPut.get()); } } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 106bd8a..afaac4c 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,12 +1,15 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.ConfigData; import net.pterodactylus.fcp.DataFound; @@ -62,15 +65,21 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private final ListeningExecutorService executorService; private final FcpConnection fcpConnection; private final Queue messages = new ConcurrentLinkedQueue<>(); + private final AtomicReference identifier = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { this.executorService = MoreExecutors.listeningDecorator(executorService); this.fcpConnection = fcpConnection; } + protected void setIdentifier(String identifier) { + this.identifier.set(identifier); + } + protected abstract boolean isFinished(); public ListenableFuture send(FcpMessage fcpMessage) throws IOException { + setIdentifier(fcpMessage.getField("Identifier")); fcpConnection.addFcpListener(this); messages.add(fcpMessage); return executorService.submit(() -> { @@ -110,14 +119,23 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener fcpConnection.removeFcpListener(this); } - private void consume(Consumer consumer, M message) { - consumer.accept(message); - notifySyncObject(); + private void consume(Consumer consumer, M message) { + consume(consumer, message, "Identifier"); + } + + private void consume(Consumer consumer, M message, + String identifier) { + if (Objects.equals(message.getField(identifier), this.identifier.get())) { + consumer.accept(message); + notifySyncObject(); + } } private void consumeUnknown(FcpMessage fcpMessage) { - consumeUnknownMessage(fcpMessage); - notifySyncObject(); + if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) { + consumeUnknownMessage(fcpMessage); + notifySyncObject(); + } } private void consumeClose(Throwable throwable) { @@ -191,14 +209,14 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(this::consumeTestDDAReply, testDDAReply); + consume(this::consumeTestDDAReply, testDDAReply, "Directory"); } protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } @Override public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(this::consumeTestDDAComplete, testDDAComplete); + consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); } protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index da3ba42..7a43771 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -60,11 +60,12 @@ public class DefaultFcpClientTest { public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException { Future keyPairFuture = fcpClient.generateKeypair().execute(); connectNode(); - fcpServer.collectUntil(is("EndMessage")); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); fcpServer.writeLine("SSKKeypair", "InsertURI=" + INSERT_URI + "", "RequestURI=" + REQUEST_URI + "", - "Identifier=My Identifier from GenerateSSK", + "Identifier=" + identifier, "EndMessage"); FcpKeyPair keyPair = keyPairFuture.get(); assertThat(keyPair.getPublicKey(), is(REQUEST_URI)); -- 2.7.4