Add new “quelaton” FCP client API
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 27 Jun 2015 22:01:03 +0000 (00:01 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 27 Jun 2015 22:14:27 +0000 (00:14 +0200)
pom.xml
src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/fake/TextSocket.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 0e832fa..4c9da2a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <version>1.3</version>
                </dependency>
                <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-all</artifactId>
+                       <version>1.10.19</version>
+               </dependency>
+               <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>16.0.1</version>
@@ -46,8 +51,8 @@
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-compiler-plugin</artifactId>
                                <configuration>
-                                       <source>1.6</source>
-                                       <target>1.6</target>
+                                       <source>1.8</source>
+                                       <target>1.8</target>
                                        <encoding>UTF-8</encoding>
                                </configuration>
                        </plugin>
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java
new file mode 100644 (file)
index 0000000..e0c6e12
--- /dev/null
@@ -0,0 +1,97 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.ClientHello;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.GenerateSSK;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.SSKKeypair;
+
+/**
+ * Default {@link FcpClient} implementation.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class DefaultFcpClient implements FcpClient {
+
+       private final ExecutorService threadPool;
+       private final String hostname;
+       private final int port;
+       private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
+       private final Supplier<String> clientName;
+       private final Supplier<String> expectedVersion;
+
+       public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
+                       Supplier<String> expectedVersion) {
+               this.threadPool = threadPool;
+               this.hostname = hostname;
+               this.port = port;
+               this.clientName = clientName;
+               this.expectedVersion = expectedVersion;
+       }
+
+       private void connect() throws IOException {
+               if (fcpConnection.get() != null) {
+                       return;
+               }
+               fcpConnection.compareAndSet(null, createConnection());
+       }
+
+       private FcpConnection createConnection() throws IOException {
+               FcpConnection connection = new FcpConnection(hostname, port);
+               connection.connect();
+               AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+               AtomicBoolean receivedClosed = new AtomicBoolean();
+               FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
+               nodeHelloSequence
+                               .handle(NodeHello.class)
+                               .with((nodeHello) -> receivedNodeHello.set(nodeHello));
+               nodeHelloSequence
+                               .handle(CloseConnectionDuplicateClientName.class)
+                               .with((closeConnection) -> receivedClosed.set(true));
+               nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+               ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
+               try {
+                       nodeHelloSequence.send(clientHello).get();
+               } catch (InterruptedException | ExecutionException e) {
+                       connection.close();
+                       throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
+               }
+               return connection;
+       }
+
+       @Override
+       public GenerateKeypairCommand generateKeypair() {
+               return new GenerateKeypairCommandImpl();
+       }
+
+       private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
+
+               @Override
+               public Future<FcpKeyPair> execute() {
+                       return threadPool.submit(() -> {
+                               connect();
+                               GenerateSSK generateSSK = new GenerateSSK();
+                               AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+                               FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
+                               replySequence.handle(SSKKeypair.class)
+                                               .with((message) -> keyPair.set(
+                                                               new FcpKeyPair(message.getRequestURI(), message.getInsertURI())));
+                               replySequence.waitFor(() -> keyPair.get() != null);
+                               replySequence.send(generateSSK).get();
+                               return keyPair.get();
+                       });
+               }
+
+       }
+
+}
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java
new file mode 100644 (file)
index 0000000..065f68a
--- /dev/null
@@ -0,0 +1,12 @@
+package net.pterodactylus.fcp.quelaton;
+
+/**
+ * FCP client used to communicate with a Freenet node.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface FcpClient {
+
+       GenerateKeypairCommand generateKeypair();
+
+}
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
new file mode 100644 (file)
index 0000000..0c54d7c
--- /dev/null
@@ -0,0 +1,368 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+/**
+ * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpReplySequence implements AutoCloseable, FcpListener {
+
+       private final ExecutorService executorService;
+       private final FcpConnection fcpConnection;
+       private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
+       private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
+       private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
+       private Supplier<Boolean> endPredicate;
+
+       public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
+               this.executorService = executorService;
+               this.fcpConnection = fcpConnection;
+       }
+
+       public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
+               return new $1<>(messageClass);
+       }
+
+       public class $1<M extends BaseMessage> {
+
+               private Class<M> messageClass;
+
+               private $1(Class<M> messageClass) {
+                       this.messageClass = messageClass;
+               }
+
+               public FcpReplySequence with(Consumer<M> action) {
+                       expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
+                       return FcpReplySequence.this;
+               }
+
+       }
+
+       public $2 handleUnknown() {
+               return new $2();
+       }
+
+       public class $2 {
+
+               public FcpReplySequence with(Consumer<FcpMessage> consumer) {
+                       unknownMessageHandlers.add(consumer);
+                       return FcpReplySequence.this;
+               }
+
+       }
+
+       public $3 handleClose() {
+               return new $3();
+       }
+
+       public class $3 {
+
+               public FcpReplySequence with(Consumer<Throwable> consumer) {
+                       closeHandlers.add(consumer);
+                       return FcpReplySequence.this;
+               }
+
+       }
+
+       public void waitFor(Supplier<Boolean> endPredicate) {
+               this.endPredicate = endPredicate;
+       }
+
+       public Future<?> send(FcpMessage fcpMessage) throws IOException {
+               fcpConnection.addFcpListener(this);
+               fcpConnection.sendMessage(fcpMessage);
+               return executorService.submit(() -> {
+                       synchronized (endPredicate) {
+                               while (!endPredicate.get()) {
+                                       endPredicate.wait();
+                               }
+                       }
+                       return null;
+               });
+       }
+
+       @Override
+       public void close() {
+               fcpConnection.removeFcpListener(this);
+       }
+
+       private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
+               if (expectedMessageActions.containsKey(fcpMessageClass)) {
+                       expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
+               }
+               synchronized (endPredicate) {
+                       endPredicate.notifyAll();
+               }
+       }
+
+       private void consumeUnknown(FcpMessage fcpMessage) {
+               for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
+                       unknownMessageHandler.accept(fcpMessage);
+               }
+               synchronized (endPredicate) {
+                       endPredicate.notifyAll();
+               }
+       }
+
+       private void consumeClose(Throwable throwable) {
+               for (Consumer<Throwable> closeHandler : closeHandlers) {
+                       closeHandler.accept(throwable);
+               }
+               synchronized (endPredicate) {
+                       endPredicate.notifyAll();
+               }
+       }
+
+       @Override
+       public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+               consume(NodeHello.class, nodeHello);
+       }
+
+       @Override
+       public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+               consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+       }
+
+       @Override
+       public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+               consume(SSKKeypair.class, sskKeypair);
+       }
+
+       @Override
+       public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+               consume(Peer.class, peer);
+       }
+
+       @Override
+       public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+               consume(EndListPeers.class, endListPeers);
+       }
+
+       @Override
+       public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+               consume(PeerNote.class, peerNote);
+       }
+
+       @Override
+       public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+               consume(EndListPeerNotes.class, endListPeerNotes);
+       }
+
+       @Override
+       public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+               consume(PeerRemoved.class, peerRemoved);
+       }
+
+       @Override
+       public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+               consume(NodeData.class, nodeData);
+       }
+
+       @Override
+       public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+               consume(TestDDAReply.class, testDDAReply);
+       }
+
+       @Override
+       public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+               consume(TestDDAComplete.class, testDDAComplete);
+       }
+
+       @Override
+       public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+               consume(PersistentGet.class, persistentGet);
+       }
+
+       @Override
+       public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+               consume(PersistentPut.class, persistentPut);
+       }
+
+       @Override
+       public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+                       EndListPersistentRequests endListPersistentRequests) {
+               consume(EndListPersistentRequests.class, endListPersistentRequests);
+       }
+
+       @Override
+       public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+               consume(URIGenerated.class, uriGenerated);
+       }
+
+       @Override
+       public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+               consume(DataFound.class, dataFound);
+       }
+
+       @Override
+       public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+               consume(AllData.class, allData);
+       }
+
+       @Override
+       public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+               consume(SimpleProgress.class, simpleProgress);
+       }
+
+       @Override
+       public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+               consume(StartedCompression.class, startedCompression);
+       }
+
+       @Override
+       public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+               consume(FinishedCompression.class, finishedCompression);
+       }
+
+       @Override
+       public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+               consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+       }
+
+       @Override
+       public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+                       UnknownNodeIdentifier unknownNodeIdentifier) {
+               consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+       }
+
+       @Override
+       public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+               consume(ConfigData.class, configData);
+       }
+
+       @Override
+       public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+               consume(GetFailed.class, getFailed);
+       }
+
+       @Override
+       public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+               consume(PutFailed.class, putFailed);
+       }
+
+       @Override
+       public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+               consume(IdentifierCollision.class, identifierCollision);
+       }
+
+       @Override
+       public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+               consume(PersistentPutDir.class, persistentPutDir);
+       }
+
+       @Override
+       public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+                       PersistentRequestRemoved persistentRequestRemoved) {
+               consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+       }
+
+       @Override
+       public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+               consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+       }
+
+       @Override
+       public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+               consume(PluginInfo.class, pluginInfo);
+       }
+
+       @Override
+       public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+               consume(FCPPluginReply.class, fcpPluginReply);
+       }
+
+       @Override
+       public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+                       PersistentRequestModified persistentRequestModified) {
+               consume(PersistentRequestModified.class, persistentRequestModified);
+       }
+
+       @Override
+       public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+               consume(PutSuccessful.class, putSuccessful);
+       }
+
+       @Override
+       public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+               consume(PutFetchable.class, putFetchable);
+       }
+
+       @Override
+       public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+               consume(SentFeed.class, sentFeed);
+       }
+
+       @Override
+       public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+               consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+       }
+
+       @Override
+       public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+               consume(ProtocolError.class, protocolError);
+       }
+
+       @Override
+       public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+               consumeUnknown(fcpMessage);
+       }
+
+       @Override
+       public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+               consumeClose(throwable);
+       }
+
+}
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java b/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java
new file mode 100644 (file)
index 0000000..b919537
--- /dev/null
@@ -0,0 +1,16 @@
+package net.pterodactylus.fcp.quelaton;
+
+import net.pterodactylus.fcp.FcpKeyPair;
+
+import java.util.concurrent.Future;
+
+/**
+ * Command to generate an SSK key pair.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface GenerateKeypairCommand {
+
+       Future<FcpKeyPair> execute();
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java
new file mode 100644 (file)
index 0000000..07afb24
--- /dev/null
@@ -0,0 +1,55 @@
+package net.pterodactylus.fcp.fake;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.hamcrest.Matcher;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FakeTcpServer {
+
+       private final ServerSocket serverSocket;
+       private final ExecutorService executorService;
+       private final AtomicReference<TextSocket> clientSocket = new AtomicReference<>();
+
+       public FakeTcpServer(ExecutorService executorService) throws IOException {
+               this.executorService = executorService;
+               this.serverSocket = new ServerSocket(0);
+       }
+
+       public int getPort() {
+               return serverSocket.getLocalPort();
+       }
+
+       public Future<?> connect() throws IOException {
+               return executorService.submit(new Callable<Void>() {
+                       @Override
+                       public Void call() throws Exception {
+                               clientSocket.set(new TextSocket(serverSocket.accept()));
+                               return null;
+                       }
+               });
+       }
+
+       public List<String> collectUntil(Matcher<String> lineMatcher) throws IOException {
+               return clientSocket.get().collectUntil(lineMatcher);
+       }
+
+       public void writeLine(String line) throws IOException {
+               clientSocket.get().writeLine(line);
+       }
+
+       public String readLine() throws IOException {
+               return clientSocket.get().readLine();
+       }
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java
new file mode 100644 (file)
index 0000000..976dcd6
--- /dev/null
@@ -0,0 +1,61 @@
+package net.pterodactylus.fcp.fake;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.net.Proxy;
+import java.net.ProxySelector;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FakeTcpServerTest {
+
+       private final ExecutorService sameThread = Executors.newSingleThreadExecutor();
+       private final FakeTcpServer tcpServer;
+
+       public FakeTcpServerTest() throws IOException {
+               this.tcpServer = new FakeTcpServer(sameThread);
+       }
+
+       @Test
+       public void testConnect() throws IOException, ExecutionException, InterruptedException {
+               ProxySelector.setDefault(new ProxySelector() {
+                       @Override
+                       public List<Proxy> select(URI uri) {
+                               return asList(Proxy.NO_PROXY);
+                       }
+
+                       @Override
+                       public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
+                       }
+               });
+               tcpServer.connect();
+               try (TextSocket clientSocket = new TextSocket(new Socket("127.0.0.1", tcpServer.getPort()))) {
+                       clientSocket.writeLine("Hello");
+                       clientSocket.writeLine("Bye");
+                       List<String> receivedLines = tcpServer.collectUntil(is("Bye"));
+                       assertThat(receivedLines, contains("Hello", "Bye"));
+                       tcpServer.writeLine("Yes");
+                       tcpServer.writeLine("Quit");
+                       receivedLines = clientSocket.collectUntil(is("Quit"));
+                       assertThat(receivedLines, contains("Yes", "Quit"));
+               }
+       }
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/fake/TextSocket.java b/src/test/java/net/pterodactylus/fcp/fake/TextSocket.java
new file mode 100644 (file)
index 0000000..c85f346
--- /dev/null
@@ -0,0 +1,72 @@
+package net.pterodactylus.fcp.fake;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matcher;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+class TextSocket implements Closeable {
+
+       private final Socket socket;
+       private final InputStream socketInput;
+       private final OutputStream socketOutput;
+       private final BufferedReader inputReader;
+       private final Writer outputWriter;
+
+       TextSocket(Socket socket) throws IOException {
+               this.socket = socket;
+               this.socketInput = socket.getInputStream();
+               this.socketOutput = socket.getOutputStream();
+               this.inputReader = new BufferedReader(new InputStreamReader(socketInput, "UTF-8"));
+               this.outputWriter = new OutputStreamWriter(socketOutput, "UTF-8");
+       }
+
+       public String readLine() throws IOException {
+               return inputReader.readLine();
+       }
+
+       public void writeLine(String line) throws IOException {
+               outputWriter.write(line + "\n");
+               outputWriter.flush();
+       }
+
+       public List<String> collectUntil(Matcher<String> lineMatcher) throws IOException {
+               List<String> collectedLines = new ArrayList<String>();
+               while (true) {
+                       String line = readLine();
+                       if (line == null) {
+                               throw new EOFException();
+                       }
+                       collectedLines.add(line);
+                       if (lineMatcher.matches(line)) {
+                               break;
+                       }
+               }
+               return collectedLines;
+       }
+
+       @Override
+       public void close() throws IOException {
+               outputWriter.close();
+               inputReader.close();
+               socketOutput.close();
+               socketInput.close();
+               socket.close();
+       }
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java
new file mode 100644 (file)
index 0000000..86e7fae
--- /dev/null
@@ -0,0 +1,68 @@
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.fake.FakeTcpServer;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DefaultFcpClient}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class DefaultFcpClientTest {
+
+       private final ExecutorService threadPool = Executors.newCachedThreadPool();
+       private final FakeTcpServer fcpServer;
+       private final DefaultFcpClient fcpClient;
+
+       public DefaultFcpClientTest() throws IOException {
+               fcpServer = new FakeTcpServer(threadPool);
+               fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test", () -> "2.0");
+       }
+
+       @Test
+       public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
+               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+               connectNode();
+               fcpServer.collectUntil(is("EndMessage"));
+               fcpServer.writeLine("SSKKeypair\n"
+                               + "InsertURI=freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/\n"
+                               + "RequestURI=freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/\n"
+                               + "Identifier=My Identifier from GenerateSSK\n"
+                               + "EndMessage");
+               FcpKeyPair keyPair = keyPairFuture.get();
+               assertThat(keyPair.getPublicKey(),
+                               is("freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/"));
+               assertThat(keyPair.getPrivateKey(), is(
+                               "freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/"));
+       }
+
+       private void connectNode() throws InterruptedException, ExecutionException, IOException {
+               fcpServer.connect().get();
+               fcpServer.collectUntil(is("EndMessage"));
+               fcpServer.writeLine("NodeHello\n"
+                               + "FCPVersion=2.0\n"
+                               + "ConnectionIdentifier=754595fc35701d76096d8279d15c57e6\n"
+                               + "Version=Fred,0.7,1.0,1231\n"
+                               + "Node=Fred\n"
+                               + "NodeLanguage=ENGLISH\n"
+                               + "ExtRevision=23771\n"
+                               + "Build=1231\n"
+                               + "Testnet=false\n"
+                               + "ExtBuild=26\n"
+                               + "CompressionCodecs=3 - GZIP(0), BZIP2(1), LZMA(2)\n"
+                               + "Revision=@custom@\n"
+                               + "EndMessage");
+       }
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java
new file mode 100644 (file)
index 0000000..fb849fe
--- /dev/null
@@ -0,0 +1,382 @@
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link FcpReplySequence}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpReplySequenceTest {
+
+       private final FcpConnection fcpConnection = mock(FcpConnection.class);
+       private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+       private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
+       private final FcpMessage fcpMessage = new FcpMessage("Test");
+
+       @Test
+       public void canSendMessage() throws IOException {
+               replyWaiter.send(fcpMessage);
+               verify(fcpConnection).sendMessage(fcpMessage);
+       }
+
+       @Test
+       public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
+               replyWaiter.send(fcpMessage);
+               verify(fcpConnection).addFcpListener(replyWaiter);
+       }
+
+       @Test
+       public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
+               replyWaiter.send(fcpMessage);
+               replyWaiter.close();
+               verify(fcpConnection).removeFcpListener(replyWaiter);
+       }
+
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
+                       Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
+               AtomicBoolean gotMessage = setupMessage(messageClass);
+               Future<?> result = replyWaiter.send(fcpMessage);
+               sendMessage(messageReceiver, message.get());
+               result.get();
+               assertThat(gotMessage.get(), is(true));
+       }
+
+       private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
+               messageReceiver.receive(fcpConnection, message);
+       }
+
+       private interface MessageReceiver<M extends BaseMessage> {
+
+               void receive(FcpConnection fcpConnection, M message);
+       }
+
+       private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
+               AtomicBoolean gotMessage = new AtomicBoolean();
+               replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
+               replyWaiter.waitFor(() -> gotMessage.get());
+               return gotMessage;
+       }
+
+       @Test
+       public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
+               waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
+                               () -> new NodeHello(new FcpMessage("NodeHello")));
+       }
+
+       @Test
+       public void waitingForConnectionClosedDuplicateClientNameWorks()
+       throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
+                               CloseConnectionDuplicateClientName.class,
+                               () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+       }
+
+       @Test
+       public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
+                               () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
+       }
+
+       @Test
+       public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
+       }
+
+       @Test
+       public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
+                               () -> new EndListPeers(new FcpMessage("EndListPeers")));
+       }
+
+       @Test
+       public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
+                               () -> new PeerNote(new FcpMessage("PeerNote")));
+       }
+
+       @Test
+       public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
+                               () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
+       }
+
+       @Test
+       public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
+                               () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
+       }
+
+       @Test
+       public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
+                               () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
+                                               .put(
+                                                               "ark.number", "0")
+                                               .put("auth.negTypes", "")
+                                               .put("version", "0,0,0,0")
+                                               .put("lastGoodVersion", "0,0,0,0")));
+       }
+
+       @Test
+       public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
+                               () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
+       }
+
+       @Test
+       public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
+                               () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
+       }
+
+       @Test
+       public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
+                               () -> new PersistentGet(new FcpMessage("PersistentGet")));
+       }
+
+       @Test
+       public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
+                               () -> new PersistentPut(new FcpMessage("PersistentPut")));
+       }
+
+       @Test
+       public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
+                               () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
+       }
+
+       @Test
+       public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
+                               () -> new URIGenerated(new FcpMessage("URIGenerated")));
+       }
+
+       @Test
+       public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
+                               () -> new DataFound(new FcpMessage("DataFound")));
+       }
+
+       @Test
+       public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
+                               () -> new AllData(new FcpMessage("AllData"), null));
+       }
+
+       @Test
+       public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
+                               () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
+       }
+
+       @Test
+       public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
+                               () -> new StartedCompression(new FcpMessage("StartedCompression")));
+       }
+
+       @Test
+       public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
+                               () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
+       }
+
+       @Test
+       public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
+                               () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
+       }
+
+       @Test
+       public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
+                               () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
+       }
+
+       @Test
+       public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
+                               () -> new ConfigData(new FcpMessage("ConfigData")));
+       }
+
+       @Test
+       public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
+                               () -> new GetFailed(new FcpMessage("GetFailed")));
+       }
+
+       @Test
+       public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
+                               () -> new PutFailed(new FcpMessage("PutFailed")));
+       }
+
+       @Test
+       public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
+                               () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
+       }
+
+       @Test
+       public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
+                               () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
+       }
+
+       @Test
+       public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
+                               () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
+       }
+
+       @Test
+       public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
+                               () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
+       }
+
+       @Test
+       public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
+                               () -> new PluginInfo(new FcpMessage("PluginInfo")));
+       }
+
+       @Test
+       public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
+                               () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+       }
+
+       @Test
+       public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
+                               () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
+       }
+
+       @Test
+       public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
+                               () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
+       }
+
+       @Test
+       public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
+                               () -> new PutFetchable(new FcpMessage("PutFetchable")));
+       }
+
+       @Test
+       public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
+                               () -> new SentFeed(new FcpMessage("SentFeed")));
+       }
+
+       @Test
+       public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
+                               () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
+       }
+
+       @Test
+       public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
+                               () -> new ProtocolError(new FcpMessage("ProtocolError")));
+       }
+
+       @Test
+       public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
+               AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
+               replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
+               replyWaiter.waitFor(() -> receivedMessage.get() != null);
+               Future<?> result = replyWaiter.send(fcpMessage);
+               replyWaiter.receivedMessage(fcpConnection, fcpMessage);
+               result.get();
+               assertThat(receivedMessage.get(), is(fcpMessage));
+       }
+
+       @Test
+       public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
+               AtomicBoolean gotPutFailed = new AtomicBoolean();
+               replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
+               AtomicBoolean gotGetFailed = new AtomicBoolean();
+               replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
+               replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
+               Future<?> result = replyWaiter.send(fcpMessage);
+               assertThat(result.isDone(), is(false));
+               replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+               assertThat(result.isDone(), is(false));
+               replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+               result.get();
+       }
+
+       @Test
+       public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
+               AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
+               replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
+               replyWaiter.waitFor(() -> receivedThrowable.get() != null);
+               Future<?> result = replyWaiter.send(fcpMessage);
+               Throwable throwable = new Throwable();
+               replyWaiter.connectionClosed(fcpConnection, throwable);
+               result.get();
+               assertThat(receivedThrowable.get(), is(throwable));
+       }
+
+}