In an FCP reply sequence, only match replies with the correct identifier
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 09:29:17 +0000 (11:29 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jul 2015 09:29:17 +0000 (11:29 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

index 44850af..6e5f7e1 100644 (file)
@@ -168,6 +168,7 @@ class ClientPutCommandImpl implements ClientPutCommand {
                protected void consumeProtocolError(ProtocolError protocolError) {
                        if (protocolError.getIdentifier().equals(identifier.get())) {
                                if (protocolError.getCode() == 25) {
+                                       setIdentifier(directory.get());
                                        sendMessage(new TestDDARequest(directory.get(), true, false));
                                } else {
                                        putFinished.set(true);
@@ -190,6 +191,7 @@ class ClientPutCommandImpl implements ClientPutCommand {
                @Override
                protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
                        if (testDDAComplete.getDirectory().equals(directory.get())) {
+                               setIdentifier(originalClientPut.get().getField("Identifier"));
                                sendMessage(originalClientPut.get());
                        }
                }
index 106bd8a..afaac4c 100644 (file)
@@ -1,12 +1,15 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.ConfigData;
 import net.pterodactylus.fcp.DataFound;
@@ -62,15 +65,21 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
        private final ListeningExecutorService executorService;
        private final FcpConnection fcpConnection;
        private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+       private final AtomicReference<String> identifier = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = MoreExecutors.listeningDecorator(executorService);
                this.fcpConnection = fcpConnection;
        }
 
+       protected void setIdentifier(String identifier) {
+               this.identifier.set(identifier);
+       }
+
        protected abstract boolean isFinished();
 
        public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+               setIdentifier(fcpMessage.getField("Identifier"));
                fcpConnection.addFcpListener(this);
                messages.add(fcpMessage);
                return executorService.submit(() -> {
@@ -110,14 +119,23 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
                fcpConnection.removeFcpListener(this);
        }
 
-       private <M> void consume(Consumer<M> consumer,  M message) {
-               consumer.accept(message);
-               notifySyncObject();
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+               consume(consumer, message, "Identifier");
+       }
+
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+                       String identifier) {
+               if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+                       consumer.accept(message);
+                       notifySyncObject();
+               }
        }
 
        private void consumeUnknown(FcpMessage fcpMessage) {
-               consumeUnknownMessage(fcpMessage);
-               notifySyncObject();
+               if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
+                       consumeUnknownMessage(fcpMessage);
+                       notifySyncObject();
+               }
        }
 
        private void consumeClose(Throwable throwable) {
@@ -191,14 +209,14 @@ public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
 
        @Override
        public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
-               consume(this::consumeTestDDAReply, testDDAReply);
+               consume(this::consumeTestDDAReply, testDDAReply, "Directory");
        }
 
        protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
 
        @Override
        public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
-               consume(this::consumeTestDDAComplete, testDDAComplete);
+               consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
        }
 
        protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
index da3ba42..7a43771 100644 (file)
@@ -60,11 +60,12 @@ public class DefaultFcpClientTest {
        public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
                Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
                connectNode();
-               fcpServer.collectUntil(is("EndMessage"));
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               String identifier = extractIdentifier(lines);
                fcpServer.writeLine("SSKKeypair",
                        "InsertURI=" + INSERT_URI + "",
                        "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=My Identifier from GenerateSSK",
+                       "Identifier=" + identifier,
                        "EndMessage");
                FcpKeyPair keyPair = keyPairFuture.get();
                assertThat(keyPair.getPublicKey(), is(REQUEST_URI));