Rename FcpReplySequence to FcpDialog
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 11 Jul 2015 18:33:22 +0000 (20:33 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 11 Jul 2015 18:33:22 +0000 (20:33 +0200)
12 files changed:
src/main/java/net/pterodactylus/fcp/quelaton/AddPeerCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java [deleted file]
src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/GetNodeCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ListPeerCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/ListPeersCommandImpl.java
src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java [deleted file]

index 5dfef6f..e1f0d5b 100644 (file)
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Default {@link AddPeerCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link AddPeerCommand} implementation based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
  */
@@ -72,7 +72,7 @@ public class AddPeerCommandImpl implements AddPeerCommand {
                }
        }
 
-       private class AddPeerSequence extends FcpReplySequence<Optional<Peer>> {
+       private class AddPeerSequence extends FcpDialog<Optional<Peer>> {
 
                private final AtomicBoolean finished = new AtomicBoolean();
                private final AtomicReference<Peer> peer = new AtomicReference<>();
index 55d3e1c..0c93fc3 100644 (file)
@@ -3,21 +3,17 @@ package net.pterodactylus.fcp.quelaton;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import net.pterodactylus.fcp.AllData;
 import net.pterodactylus.fcp.ClientGet;
-import net.pterodactylus.fcp.FcpMessage;
 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
 import net.pterodactylus.fcp.GetFailed;
 import net.pterodactylus.fcp.Priority;
 import net.pterodactylus.fcp.ReturnType;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -86,8 +82,8 @@ class ClientGetCommandImpl implements ClientGetCommand {
 
        private Optional<Data> execute(String uri) throws InterruptedException, ExecutionException, IOException {
                ClientGet clientGet = createClientGetCommand(uri);
-               try (ClientGetReplySequence clientGetReplySequence = new ClientGetReplySequence()) {
-                       return clientGetReplySequence.send(clientGet).get();
+               try (ClientGetDialog clientGetDialog = new ClientGetDialog()) {
+                       return clientGetDialog.send(clientGet).get();
                }
        }
 
@@ -115,7 +111,7 @@ class ClientGetCommandImpl implements ClientGetCommand {
                return clientGet;
        }
 
-       private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
+       private class ClientGetDialog extends FcpDialog<Optional<Data>> {
 
                private final AtomicBoolean finished = new AtomicBoolean();
                private final AtomicBoolean failed = new AtomicBoolean();
@@ -124,7 +120,7 @@ class ClientGetCommandImpl implements ClientGetCommand {
                private long dataLength;
                private InputStream payload;
 
-               public ClientGetReplySequence() throws IOException {
+               public ClientGetDialog() throws IOException {
                        super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
                }
 
index 4e3b51c..3ded685 100644 (file)
@@ -3,11 +3,9 @@ package net.pterodactylus.fcp.quelaton;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import net.pterodactylus.fcp.ClientHello;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.FcpConnection;
 import net.pterodactylus.fcp.NodeHello;
 
@@ -16,7 +14,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Internal <code>ClientHello</code> implementation based on {@link FcpReplySequence}.
+ * Internal <code>ClientHello</code> implementation based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -46,7 +44,7 @@ public class ClientHelloImpl {
                FcpConnection connection = new FcpConnection(hostname, port);
                connection.connect();
                ClientHello clientHello = new ClientHello(clientName.get(), "2.0");
-               try (ClientHelloReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection)) {
+               try (ClientHelloDialog nodeHelloSequence = new ClientHelloDialog(connection)) {
                        if (nodeHelloSequence.send(clientHello).get()) {
                                return connection;
                        }
@@ -58,11 +56,11 @@ public class ClientHelloImpl {
                throw new IOException(String.format("Could not connect to %s:%d.", hostname, port));
        }
 
-       private class ClientHelloReplySequence extends FcpReplySequence<Boolean> {
+       private class ClientHelloDialog extends FcpDialog<Boolean> {
 
                private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
 
-               public ClientHelloReplySequence(FcpConnection connection) {
+               public ClientHelloDialog(FcpConnection connection) {
                        super(ClientHelloImpl.this.threadPool, connection);
                }
 
index df345f3..4f8ddde 100644 (file)
@@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
+ * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -84,8 +84,8 @@ class ClientPutCommandImpl implements ClientPutCommand {
        private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
                String identifier = new RandomIdentifierGenerator().generate();
                ClientPut clientPut = createClientPutCommand(uri, identifier);
-               try (ClientPutReplySequence clientPutReplySequence = new ClientPutReplySequence()) {
-                       return clientPutReplySequence.send(clientPut).get();
+               try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
+                       return clientPutDialog.send(clientPut).get();
                }
        }
 
@@ -123,14 +123,14 @@ class ClientPutCommandImpl implements ClientPutCommand {
                return clientPut;
        }
 
-       private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
+       private class ClientPutDialog extends FcpDialog<Optional<Key>> {
 
                private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
                private final AtomicReference<String> directory = new AtomicReference<>();
                private final AtomicReference<Key> finalKey = new AtomicReference<>();
                private final AtomicBoolean putFinished = new AtomicBoolean();
 
-               public ClientPutReplySequence() throws IOException {
+               public ClientPutDialog() throws IOException {
                        super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
                }
 
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java
new file mode 100644 (file)
index 0000000..2aa003c
--- /dev/null
@@ -0,0 +1,433 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
+
+       private final Object syncObject = new Object();
+       private final ListeningExecutorService executorService;
+       private final FcpConnection fcpConnection;
+       private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+       private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicBoolean connectionClosed = new AtomicBoolean();
+       private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
+
+       public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
+               this.executorService = MoreExecutors.listeningDecorator(executorService);
+               this.fcpConnection = fcpConnection;
+       }
+
+       protected void setIdentifier(String identifier) {
+               this.identifier.set(identifier);
+       }
+
+       protected abstract boolean isFinished();
+
+       public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+               setIdentifier(fcpMessage.getField("Identifier"));
+               fcpConnection.addFcpListener(this);
+               messages.add(fcpMessage);
+               return executorService.submit(() -> {
+                       synchronized (syncObject) {
+                               while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
+                                       while (messages.peek() != null) {
+                                               FcpMessage message = messages.poll();
+                                               fcpConnection.sendMessage(message);
+                                       }
+                                       if (isFinished() || connectionClosed.get()) {
+                                               continue;
+                                       }
+                                       syncObject.wait();
+                               }
+                       }
+                       Throwable throwable = connectionFailureReason.get();
+                       if (throwable != null) {
+                               throw new ExecutionException(throwable);
+                       }
+                       return getResult();
+               });
+       }
+
+       protected void sendMessage(FcpMessage fcpMessage) {
+               messages.add(fcpMessage);
+               notifySyncObject();
+       }
+
+       private void notifySyncObject() {
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
+               }
+       }
+
+       protected R getResult() {
+               return null;
+       }
+
+       @Override
+       public void close() {
+               fcpConnection.removeFcpListener(this);
+       }
+
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+               consume(consumer, message, "Identifier");
+       }
+
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+                       String identifier) {
+               if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+                       consumeAlways(consumer, message);
+               }
+       }
+
+       private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
+               consumer.accept(message);
+               notifySyncObject();
+       }
+
+       private void consumeUnknown(FcpMessage fcpMessage) {
+               consumeUnknownMessage(fcpMessage);
+               notifySyncObject();
+       }
+
+       private void consumeClose(Throwable throwable) {
+               connectionFailureReason.set(throwable);
+               connectionClosed.set(true);
+               notifySyncObject();
+       }
+
+       @Override
+       public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+               consume(this::consumeNodeHello, nodeHello);
+       }
+
+       protected void consumeNodeHello(NodeHello nodeHello) { }
+
+       @Override
+       public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+               connectionFailureReason.set(new IOException("duplicate client name"));
+               connectionClosed.set(true);
+               notifySyncObject();
+       }
+
+       @Override
+       public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+               consume(this::consumeSSKKeypair, sskKeypair);
+       }
+
+       protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
+       @Override
+       public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+               consume(this::consumePeer, peer);
+       }
+
+       protected void consumePeer(Peer peer) { }
+
+       @Override
+       public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+               consume(this::consumeEndListPeers, endListPeers);
+       }
+
+       protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
+       @Override
+       public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+               consume(this::consumePeerNote, peerNote);
+       }
+
+       protected void consumePeerNote(PeerNote peerNote) { }
+
+       @Override
+       public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+               consume(this::consumeEndListPeerNotes, endListPeerNotes);
+       }
+
+       protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
+       @Override
+       public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+               consume(this::consumePeerRemoved, peerRemoved);
+       }
+
+       protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
+       @Override
+       public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+               consume(this::consumeNodeData, nodeData);
+       }
+
+       protected void consumeNodeData(NodeData nodeData) { }
+
+       @Override
+       public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+               consume(this::consumeTestDDAReply, testDDAReply, "Directory");
+       }
+
+       protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
+       @Override
+       public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+               consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
+       }
+
+       protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
+       @Override
+       public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+               consume(this::consumePersistentGet, persistentGet);
+       }
+
+       protected void consumePersistentGet(PersistentGet persistentGet) { }
+
+       @Override
+       public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+               consume(this::consumePersistentPut, persistentPut);
+       }
+
+       protected void consumePersistentPut(PersistentPut persistentPut) { }
+
+       @Override
+       public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+                       EndListPersistentRequests endListPersistentRequests) {
+               consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
+       }
+
+       protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
+       @Override
+       public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+               consume(this::consumeURIGenerated, uriGenerated);
+       }
+
+       protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
+       @Override
+       public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+               consume(this::consumeDataFound, dataFound);
+       }
+
+       protected void consumeDataFound(DataFound dataFound) { }
+
+       @Override
+       public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+               consume(this::consumeAllData, allData);
+       }
+
+       protected void consumeAllData(AllData allData) { }
+
+       @Override
+       public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+               consume(this::consumeSimpleProgress, simpleProgress);
+       }
+
+       protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
+       @Override
+       public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+               consume(this::consumeStartedCompression, startedCompression);
+       }
+
+       protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
+       @Override
+       public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+               consume(this::consumeFinishedCompression, finishedCompression);
+       }
+
+       protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
+       @Override
+       public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+               consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
+       }
+
+       protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
+       @Override
+       public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+                       UnknownNodeIdentifier unknownNodeIdentifier) {
+               consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
+       }
+
+       protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
+       @Override
+       public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+               consume(this::consumeConfigData, configData);
+       }
+
+       protected void consumeConfigData(ConfigData configData) { }
+
+       @Override
+       public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+               consume(this::consumeGetFailed, getFailed);
+       }
+
+       protected void consumeGetFailed(GetFailed getFailed) { }
+
+       @Override
+       public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+               consume(this::consumePutFailed, putFailed);
+       }
+
+       protected void consumePutFailed(PutFailed putFailed) { }
+
+       @Override
+       public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+               consume(this::consumeIdentifierCollision, identifierCollision);
+       }
+
+       protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
+       @Override
+       public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+               consume(this::consumePersistentPutDir, persistentPutDir);
+       }
+
+       protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
+       @Override
+       public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+                       PersistentRequestRemoved persistentRequestRemoved) {
+               consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
+       }
+
+       protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
+       @Override
+       public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+               consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
+       }
+
+       protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
+       @Override
+       public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+               consume(this::consumePluginInfo, pluginInfo);
+       }
+
+       protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
+       @Override
+       public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+               consume(this::consumeFCPPluginReply, fcpPluginReply);
+       }
+
+       protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
+       @Override
+       public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
+                       PersistentRequestModified persistentRequestModified) {
+               consume(this::consumePersistentRequestModified, persistentRequestModified);
+       }
+
+       protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
+       @Override
+       public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+               consume(this::consumePutSuccessful, putSuccessful);
+       }
+
+       protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
+       @Override
+       public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+               consume(this::consumePutFetchable, putFetchable);
+       }
+
+       protected void consumePutFetchable(PutFetchable putFetchable) { }
+
+       @Override
+       public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+               consume(this::consumeSentFeed, sentFeed);
+       }
+
+       protected void consumeSentFeed(SentFeed sentFeed) { }
+
+       @Override
+       public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+               consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
+       }
+
+       protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
+       @Override
+       public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+               consume(this::consumeProtocolError, protocolError);
+       }
+
+       protected void consumeProtocolError(ProtocolError protocolError) { }
+
+       @Override
+       public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+               consumeUnknown(fcpMessage);
+       }
+
+       protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
+       @Override
+       public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+               consumeClose(throwable);
+       }
+
+}
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
deleted file mode 100644 (file)
index 9b44b9a..0000000
+++ /dev/null
@@ -1,433 +0,0 @@
-package net.pterodactylus.fcp.quelaton;
-
-import java.io.IOException;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-
-import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
-import net.pterodactylus.fcp.ConfigData;
-import net.pterodactylus.fcp.DataFound;
-import net.pterodactylus.fcp.EndListPeerNotes;
-import net.pterodactylus.fcp.EndListPeers;
-import net.pterodactylus.fcp.EndListPersistentRequests;
-import net.pterodactylus.fcp.FCPPluginReply;
-import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpListener;
-import net.pterodactylus.fcp.FcpMessage;
-import net.pterodactylus.fcp.FinishedCompression;
-import net.pterodactylus.fcp.GetFailed;
-import net.pterodactylus.fcp.IdentifierCollision;
-import net.pterodactylus.fcp.NodeData;
-import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.Peer;
-import net.pterodactylus.fcp.PeerNote;
-import net.pterodactylus.fcp.PeerRemoved;
-import net.pterodactylus.fcp.PersistentGet;
-import net.pterodactylus.fcp.PersistentPut;
-import net.pterodactylus.fcp.PersistentPutDir;
-import net.pterodactylus.fcp.PersistentRequestModified;
-import net.pterodactylus.fcp.PersistentRequestRemoved;
-import net.pterodactylus.fcp.PluginInfo;
-import net.pterodactylus.fcp.ProtocolError;
-import net.pterodactylus.fcp.PutFailed;
-import net.pterodactylus.fcp.PutFetchable;
-import net.pterodactylus.fcp.PutSuccessful;
-import net.pterodactylus.fcp.ReceivedBookmarkFeed;
-import net.pterodactylus.fcp.SSKKeypair;
-import net.pterodactylus.fcp.SentFeed;
-import net.pterodactylus.fcp.SimpleProgress;
-import net.pterodactylus.fcp.StartedCompression;
-import net.pterodactylus.fcp.SubscribedUSKUpdate;
-import net.pterodactylus.fcp.TestDDAComplete;
-import net.pterodactylus.fcp.TestDDAReply;
-import net.pterodactylus.fcp.URIGenerated;
-import net.pterodactylus.fcp.UnknownNodeIdentifier;
-import net.pterodactylus.fcp.UnknownPeerNoteType;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
- *
- * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
- */
-public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
-
-       private final Object syncObject = new Object();
-       private final ListeningExecutorService executorService;
-       private final FcpConnection fcpConnection;
-       private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
-       private final AtomicReference<String> identifier = new AtomicReference<>();
-       private final AtomicBoolean connectionClosed = new AtomicBoolean();
-       private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
-
-       public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
-               this.executorService = MoreExecutors.listeningDecorator(executorService);
-               this.fcpConnection = fcpConnection;
-       }
-
-       protected void setIdentifier(String identifier) {
-               this.identifier.set(identifier);
-       }
-
-       protected abstract boolean isFinished();
-
-       public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
-               setIdentifier(fcpMessage.getField("Identifier"));
-               fcpConnection.addFcpListener(this);
-               messages.add(fcpMessage);
-               return executorService.submit(() -> {
-                       synchronized (syncObject) {
-                               while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
-                                       while (messages.peek() != null) {
-                                               FcpMessage message = messages.poll();
-                                               fcpConnection.sendMessage(message);
-                                       }
-                                       if (isFinished() || connectionClosed.get()) {
-                                               continue;
-                                       }
-                                       syncObject.wait();
-                               }
-                       }
-                       Throwable throwable = connectionFailureReason.get();
-                       if (throwable != null) {
-                               throw new ExecutionException(throwable);
-                       }
-                       return getResult();
-               });
-       }
-
-       protected void sendMessage(FcpMessage fcpMessage) {
-               messages.add(fcpMessage);
-               notifySyncObject();
-       }
-
-       private void notifySyncObject() {
-               synchronized (syncObject) {
-                       syncObject.notifyAll();
-               }
-       }
-
-       protected R getResult() {
-               return null;
-       }
-
-       @Override
-       public void close() {
-               fcpConnection.removeFcpListener(this);
-       }
-
-       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
-               consume(consumer, message, "Identifier");
-       }
-
-       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
-                       String identifier) {
-               if (Objects.equals(message.getField(identifier), this.identifier.get())) {
-                       consumeAlways(consumer, message);
-               }
-       }
-
-       private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
-               consumer.accept(message);
-               notifySyncObject();
-       }
-
-       private void consumeUnknown(FcpMessage fcpMessage) {
-               consumeUnknownMessage(fcpMessage);
-               notifySyncObject();
-       }
-
-       private void consumeClose(Throwable throwable) {
-               connectionFailureReason.set(throwable);
-               connectionClosed.set(true);
-               notifySyncObject();
-       }
-
-       @Override
-       public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
-               consume(this::consumeNodeHello, nodeHello);
-       }
-
-       protected void consumeNodeHello(NodeHello nodeHello) { }
-
-       @Override
-       public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
-                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               connectionFailureReason.set(new IOException("duplicate client name"));
-               connectionClosed.set(true);
-               notifySyncObject();
-       }
-
-       @Override
-       public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
-               consume(this::consumeSSKKeypair, sskKeypair);
-       }
-
-       protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
-
-       @Override
-       public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
-               consume(this::consumePeer, peer);
-       }
-
-       protected void consumePeer(Peer peer) { }
-
-       @Override
-       public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
-               consume(this::consumeEndListPeers, endListPeers);
-       }
-
-       protected void consumeEndListPeers(EndListPeers endListPeers) { }
-
-       @Override
-       public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
-               consume(this::consumePeerNote, peerNote);
-       }
-
-       protected void consumePeerNote(PeerNote peerNote) { }
-
-       @Override
-       public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
-               consume(this::consumeEndListPeerNotes, endListPeerNotes);
-       }
-
-       protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
-
-       @Override
-       public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
-               consume(this::consumePeerRemoved, peerRemoved);
-       }
-
-       protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
-
-       @Override
-       public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
-               consume(this::consumeNodeData, nodeData);
-       }
-
-       protected void consumeNodeData(NodeData nodeData) { }
-
-       @Override
-       public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
-               consume(this::consumeTestDDAReply, testDDAReply, "Directory");
-       }
-
-       protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
-
-       @Override
-       public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
-               consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
-       }
-
-       protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
-
-       @Override
-       public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
-               consume(this::consumePersistentGet, persistentGet);
-       }
-
-       protected void consumePersistentGet(PersistentGet persistentGet) { }
-
-       @Override
-       public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
-               consume(this::consumePersistentPut, persistentPut);
-       }
-
-       protected void consumePersistentPut(PersistentPut persistentPut) { }
-
-       @Override
-       public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
-                       EndListPersistentRequests endListPersistentRequests) {
-               consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
-       }
-
-       protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
-
-       @Override
-       public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
-               consume(this::consumeURIGenerated, uriGenerated);
-       }
-
-       protected void consumeURIGenerated(URIGenerated uriGenerated) { }
-
-       @Override
-       public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
-               consume(this::consumeDataFound, dataFound);
-       }
-
-       protected void consumeDataFound(DataFound dataFound) { }
-
-       @Override
-       public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
-               consume(this::consumeAllData, allData);
-       }
-
-       protected void consumeAllData(AllData allData) { }
-
-       @Override
-       public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
-               consume(this::consumeSimpleProgress, simpleProgress);
-       }
-
-       protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
-
-       @Override
-       public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
-               consume(this::consumeStartedCompression, startedCompression);
-       }
-
-       protected void consumeStartedCompression(StartedCompression startedCompression) { }
-
-       @Override
-       public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
-               consume(this::consumeFinishedCompression, finishedCompression);
-       }
-
-       protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
-
-       @Override
-       public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
-               consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
-       }
-
-       protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
-
-       @Override
-       public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
-                       UnknownNodeIdentifier unknownNodeIdentifier) {
-               consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
-       }
-
-       protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
-
-       @Override
-       public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
-               consume(this::consumeConfigData, configData);
-       }
-
-       protected void consumeConfigData(ConfigData configData) { }
-
-       @Override
-       public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
-               consume(this::consumeGetFailed, getFailed);
-       }
-
-       protected void consumeGetFailed(GetFailed getFailed) { }
-
-       @Override
-       public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
-               consume(this::consumePutFailed, putFailed);
-       }
-
-       protected void consumePutFailed(PutFailed putFailed) { }
-
-       @Override
-       public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
-               consume(this::consumeIdentifierCollision, identifierCollision);
-       }
-
-       protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
-
-       @Override
-       public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
-               consume(this::consumePersistentPutDir, persistentPutDir);
-       }
-
-       protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
-
-       @Override
-       public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
-                       PersistentRequestRemoved persistentRequestRemoved) {
-               consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
-       }
-
-       protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
-
-       @Override
-       public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
-               consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
-       }
-
-       protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
-
-       @Override
-       public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
-               consume(this::consumePluginInfo, pluginInfo);
-       }
-
-       protected void consumePluginInfo(PluginInfo pluginInfo) { }
-
-       @Override
-       public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
-               consume(this::consumeFCPPluginReply, fcpPluginReply);
-       }
-
-       protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
-
-       @Override
-       public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
-                       PersistentRequestModified persistentRequestModified) {
-               consume(this::consumePersistentRequestModified, persistentRequestModified);
-       }
-
-       protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
-
-       @Override
-       public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
-               consume(this::consumePutSuccessful, putSuccessful);
-       }
-
-       protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
-
-       @Override
-       public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
-               consume(this::consumePutFetchable, putFetchable);
-       }
-
-       protected void consumePutFetchable(PutFetchable putFetchable) { }
-
-       @Override
-       public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
-               consume(this::consumeSentFeed, sentFeed);
-       }
-
-       protected void consumeSentFeed(SentFeed sentFeed) { }
-
-       @Override
-       public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
-               consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
-       }
-
-       protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
-
-       @Override
-       public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
-               consume(this::consumeProtocolError, protocolError);
-       }
-
-       protected void consumeProtocolError(ProtocolError protocolError) { }
-
-       @Override
-       public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
-               consumeUnknown(fcpMessage);
-       }
-
-       protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
-
-       @Override
-       public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
-               consumeClose(throwable);
-       }
-
-}
index 564fa71..5f8cd5b 100644 (file)
@@ -34,16 +34,16 @@ class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
        }
 
        private FcpKeyPair executeSequence() throws InterruptedException, ExecutionException, IOException {
-               try (FcpKeyPairReplySequence fcpKeyPairReplySequence = new FcpKeyPairReplySequence()) {
-                       return fcpKeyPairReplySequence.send(new GenerateSSK()).get();
+               try (FcpKeyPairDialog fcpKeyPairDialog = new FcpKeyPairDialog()) {
+                       return fcpKeyPairDialog.send(new GenerateSSK()).get();
                }
        }
 
-       private class FcpKeyPairReplySequence extends FcpReplySequence<FcpKeyPair> {
+       private class FcpKeyPairDialog extends FcpDialog<FcpKeyPair> {
 
                private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
 
-               public FcpKeyPairReplySequence() throws IOException {
+               public FcpKeyPairDialog() throws IOException {
                        super(GenerateKeypairCommandImpl.this.threadPool, GenerateKeypairCommandImpl.this.connectionSupplier.get());
                }
 
index 2447179..025cddc 100644 (file)
@@ -14,7 +14,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Default {@link GetNodeCommandImpl} implementation based on {@link FcpReplySequence}.
+ * Default {@link GetNodeCommandImpl} implementation based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
  */
@@ -57,16 +57,16 @@ public class GetNodeCommandImpl implements GetNodeCommand {
        private NodeData executeSequence() throws InterruptedException, ExecutionException, IOException {
                GetNode getNode = new GetNode(new RandomIdentifierGenerator().generate(), giveOpennetRef.get(),
                        includePrivate.get(), includeVolatile.get());
-               try (GetNodeReplySequence getNodeReplySequence = new GetNodeReplySequence()) {
-                       return getNodeReplySequence.send(getNode).get();
+               try (GetNodeDialog getNodeDialog = new GetNodeDialog()) {
+                       return getNodeDialog.send(getNode).get();
                }
        }
 
-       private class GetNodeReplySequence extends FcpReplySequence<NodeData> {
+       private class GetNodeDialog extends FcpDialog<NodeData> {
 
                private final AtomicReference<NodeData> nodeData = new AtomicReference<>();
 
-               public GetNodeReplySequence() throws IOException {
+               public GetNodeDialog() throws IOException {
                        super(threadPool, connectionSupplier.get());
                }
 
index dd91133..44648c3 100644 (file)
@@ -14,7 +14,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 /**
- * Default {@link ListPeerCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link ListPeerCommand} implementation based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
  */
@@ -58,7 +58,7 @@ public class ListPeerCommandImpl implements ListPeerCommand {
                }
        }
 
-       private class ListPeerSequence extends FcpReplySequence<Peer> {
+       private class ListPeerSequence extends FcpDialog<Peer> {
 
                private final AtomicBoolean finished = new AtomicBoolean();
                private final AtomicReference<Peer> peer = new AtomicReference<>();
index 3d3baed..921c0fa 100644 (file)
@@ -16,7 +16,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Default {@link ListPeersCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link ListPeersCommand} implementation based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -52,17 +52,17 @@ public class ListPeersCommandImpl implements ListPeersCommand {
        private Collection<Peer> executeSequence() throws InterruptedException, ExecutionException, IOException {
                String identifier = new RandomIdentifierGenerator().generate();
                ListPeers listPeers = new ListPeers(identifier, includeMetadata.get(), includeVolatile.get());
-               try (ListPeersReplySequence listPeersReplySequence = new ListPeersReplySequence()) {
-                       return listPeersReplySequence.send(listPeers).get();
+               try (ListPeersDialog listPeersDialog = new ListPeersDialog()) {
+                       return listPeersDialog.send(listPeers).get();
                }
        }
 
-       private class ListPeersReplySequence extends FcpReplySequence<Collection<Peer>> {
+       private class ListPeersDialog extends FcpDialog<Collection<Peer>> {
 
                private final Collection<Peer> peers = new HashSet<>();
                private final AtomicBoolean finished = new AtomicBoolean(false);
 
-               public ListPeersReplySequence() throws IOException {
+               public ListPeersDialog() throws IOException {
                        super(threadPool, connectionSupplier.get());
                }
 
diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java
new file mode 100644 (file)
index 0000000..ee04e6f
--- /dev/null
@@ -0,0 +1,594 @@
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link FcpDialog}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpDialogTest {
+
+       private final FcpConnection fcpConnection = mock(FcpConnection.class);
+       private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+       private final TestFcpDialog dialog = new TestFcpDialog(executorService, fcpConnection);
+       private final FcpMessage fcpMessage = new FcpMessage("Test");
+
+       @Test
+       public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
+               FcpDialog dialog = createBasicDialog();
+               dialog.send(fcpMessage).get();
+               verify(fcpConnection).sendMessage(fcpMessage);
+       }
+
+       private FcpDialog createBasicDialog() {
+               return new FcpDialog(executorService, fcpConnection) {
+                               @Override
+                               protected boolean isFinished() {
+                                       return true;
+                               }
+                       };
+       }
+
+       @Test
+       public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
+               FcpDialog dialog = createBasicDialog();
+               dialog.send(fcpMessage);
+               verify(fcpConnection).addFcpListener(dialog);
+       }
+
+       @Test
+       public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
+               FcpDialog dialog = createBasicDialog();
+               dialog.send(fcpMessage);
+               dialog.close();
+               verify(fcpConnection).removeFcpListener(dialog);
+       }
+
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
+               waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
+       }
+
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
+               dialog.setExpectedMessage(message.getName());
+               Future<Boolean> result = dialog.send(fcpMessage);
+               messageReceiver.receiveMessage(fcpConnection, message);
+               assertThat(result.get(), is(true));
+       }
+
+       private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
+               return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
+       }
+
+       private interface MessageCreator<M extends BaseMessage> {
+
+               M create(FcpMessage fcpMessage);
+
+       }
+
+       @Test
+       public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
+               waitForASpecificMessage(dialog::receivedNodeHello, NodeHello.class, NodeHello::new);
+       }
+
+       @Test(expected = ExecutionException.class)
+       public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
+               dialog.setExpectedMessage("");
+               Future<Boolean> result = dialog.send(fcpMessage);
+               dialog.receivedCloseConnectionDuplicateClientName(fcpConnection,
+                       new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+               result.get();
+       }
+
+       @Test
+       public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
+       }
+
+       @Test
+       public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPeer, Peer.class, Peer::new);
+       }
+
+       @Test
+       public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
+       }
+
+       @Test
+       public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPeerNote, PeerNote.class, PeerNote::new);
+       }
+
+       @Test
+       public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
+       }
+
+       @Test
+       public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
+       }
+
+       @Test
+       public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedNodeData, new NodeData(
+                       new FcpMessage("NodeData").put("ark.pubURI", "")
+                                       .put("ark.number", "0")
+                                       .put("auth.negTypes", "")
+                                       .put("version", "0,0,0,0")
+                                       .put("lastGoodVersion", "0,0,0,0")));
+       }
+
+       @Test
+       public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
+       }
+
+       @Test
+       public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
+       }
+
+       @Test
+       public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
+       }
+
+       @Test
+       public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
+       }
+
+       @Test
+       public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
+       }
+
+       @Test
+       public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
+       }
+
+       @Test
+       public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedDataFound, DataFound.class, DataFound::new);
+       }
+
+       @Test
+       public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedAllData, new AllData(new FcpMessage("AllData"), null));
+       }
+
+       @Test
+       public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
+       }
+
+       @Test
+       public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
+       }
+
+       @Test
+       public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
+       }
+
+       @Test
+       public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
+       }
+
+       @Test
+       public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
+       }
+
+       @Test
+       public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedConfigData, ConfigData.class, ConfigData::new);
+       }
+
+       @Test
+       public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedGetFailed, GetFailed.class, GetFailed::new);
+       }
+
+       @Test
+       public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPutFailed, PutFailed.class, PutFailed::new);
+       }
+
+       @Test
+       public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
+       }
+
+       @Test
+       public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
+       }
+
+       @Test
+       public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
+       }
+
+       @Test
+       public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
+       }
+
+       @Test
+       public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
+       }
+
+       @Test
+       public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+       }
+
+       @Test
+       public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
+       }
+
+       @Test
+       public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
+       }
+
+       @Test
+       public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
+       }
+
+       @Test
+       public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedSentFeed, SentFeed.class, SentFeed::new);
+       }
+
+       @Test
+       public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
+       }
+
+       @Test
+       public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
+               waitForASpecificMessage(dialog::receivedProtocolError, ProtocolError.class, ProtocolError::new);
+       }
+
+       @Test
+       public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
+               dialog.setExpectedMessage("SomeFcpMessage");
+               Future<Boolean> result = dialog.send(fcpMessage);
+               dialog.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
+               assertThat(result.get(), is(true));
+       }
+
+       @Test
+       public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
+               TestFcpDialog testFcpDialog = new TestFcpDialog(executorService, fcpConnection) {
+                       private final AtomicBoolean gotPutFailed = new AtomicBoolean();
+                       private final AtomicBoolean gotGetFailed = new AtomicBoolean();
+
+                       @Override
+                       protected boolean isFinished() {
+                               return gotPutFailed.get() && gotGetFailed.get();
+                       }
+
+                       @Override
+                       protected Boolean getResult() {
+                               return isFinished();
+                       }
+
+                       @Override
+                       protected void consumePutFailed(PutFailed putFailed) {
+                               gotPutFailed.set(true);
+                       }
+
+                       @Override
+                       protected void consumeGetFailed(GetFailed getFailed) {
+                               gotGetFailed.set(true);
+                       }
+               };
+               Future<?> result = testFcpDialog.send(fcpMessage);
+               assertThat(result.isDone(), is(false));
+               testFcpDialog.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+               assertThat(result.isDone(), is(false));
+               testFcpDialog.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+               assertThat(result.get(), is(true));
+       }
+
+       @Test
+       public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
+               dialog.setExpectedMessage("none");
+               Future<Boolean> result = dialog.send(fcpMessage);
+               Throwable throwable = new Throwable();
+               dialog.connectionClosed(fcpConnection, throwable);
+               try {
+                       result.get();
+               } catch (ExecutionException e) {
+                       Throwable t = e;
+                       while (t.getCause() != null) {
+                               t = t.getCause();
+                       }
+                       assertThat(t, sameInstance(throwable));
+               }
+       }
+
+       @FunctionalInterface
+       private interface MessageReceiver<M> {
+
+               void receiveMessage(FcpConnection fcpConnection, M message);
+
+       }
+
+       private static class TestFcpDialog extends FcpDialog<Boolean> {
+
+               private final AtomicReference<String> gotMessage = new AtomicReference<>();
+               private final AtomicReference<String> expectedMessage = new AtomicReference<>();
+
+               public TestFcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
+                       super(executorService, fcpConnection);
+               }
+
+               public void setExpectedMessage(String expectedMessage) {
+                       this.expectedMessage.set(expectedMessage);
+               }
+
+               @Override
+               protected boolean isFinished() {
+                       return getResult();
+               }
+
+               @Override
+               protected Boolean getResult() {
+                       return expectedMessage.get().equals(gotMessage.get());
+               }
+
+               @Override
+               protected void consumeNodeHello(NodeHello nodeHello) {
+                       gotMessage.set(nodeHello.getName());
+               }
+
+               @Override
+               protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+                       gotMessage.set(sskKeypair.getName());
+               }
+
+               @Override
+               protected void consumePeer(Peer peer) {
+                       gotMessage.set(peer.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeers(EndListPeers endListPeers) {
+                       gotMessage.set(endListPeers.getName());
+               }
+
+               @Override
+               protected void consumePeerNote(PeerNote peerNote) {
+                       gotMessage.set(peerNote.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+                       gotMessage.set(endListPeerNotes.getName());
+               }
+
+               @Override
+               protected void consumePeerRemoved(PeerRemoved peerRemoved) {
+                       gotMessage.set(peerRemoved.getName());
+               }
+
+               @Override
+               protected void consumeNodeData(NodeData nodeData) {
+                       gotMessage.set(nodeData.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+                       gotMessage.set(testDDAReply.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+                       gotMessage.set(testDDAComplete.getName());
+               }
+
+               @Override
+               protected void consumePersistentGet(PersistentGet persistentGet) {
+                       gotMessage.set(persistentGet.getName());
+               }
+
+               @Override
+               protected void consumePersistentPut(PersistentPut persistentPut) {
+                       gotMessage.set(persistentPut.getName());
+               }
+
+               @Override
+               protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+                       gotMessage.set(endListPersistentRequests.getName());
+               }
+
+               @Override
+               protected void consumeURIGenerated(URIGenerated uriGenerated) {
+                       gotMessage.set(uriGenerated.getName());
+               }
+
+               @Override
+               protected void consumeDataFound(DataFound dataFound) {
+                       gotMessage.set(dataFound.getName());
+               }
+
+               @Override
+               protected void consumeAllData(AllData allData) {
+                       gotMessage.set(allData.getName());
+               }
+
+               @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       gotMessage.set(simpleProgress.getName());
+               }
+
+               @Override
+               protected void consumeStartedCompression(StartedCompression startedCompression) {
+                       gotMessage.set(startedCompression.getName());
+               }
+
+               @Override
+               protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
+                       gotMessage.set(finishedCompression.getName());
+               }
+
+               @Override
+               protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+                       gotMessage.set(unknownPeerNoteType.getName());
+               }
+
+               @Override
+               protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+                       gotMessage.set(unknownNodeIdentifier.getName());
+               }
+
+               @Override
+               protected void consumeConfigData(ConfigData configData) {
+                       gotMessage.set(configData.getName());
+               }
+
+               @Override
+               protected void consumeGetFailed(GetFailed getFailed) {
+                       gotMessage.set(getFailed.getName());
+               }
+
+               @Override
+               protected void consumePutFailed(PutFailed putFailed) {
+                       gotMessage.set(putFailed.getName());
+               }
+
+               @Override
+               protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+                       gotMessage.set(identifierCollision.getName());
+               }
+
+               @Override
+               protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
+                       gotMessage.set(persistentPutDir.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+                       gotMessage.set(persistentRequestRemoved.getName());
+               }
+
+               @Override
+               protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+                       gotMessage.set(subscribedUSKUpdate.getName());
+               }
+
+               @Override
+               protected void consumePluginInfo(PluginInfo pluginInfo) {
+                       gotMessage.set(pluginInfo.getName());
+               }
+
+               @Override
+               protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
+                       gotMessage.set(fcpPluginReply.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
+                       gotMessage.set(persistentRequestModified.getName());
+               }
+
+               @Override
+               protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+                       gotMessage.set(putSuccessful.getName());
+               }
+
+               @Override
+               protected void consumePutFetchable(PutFetchable putFetchable) {
+                       gotMessage.set(putFetchable.getName());
+               }
+
+               @Override
+               protected void consumeSentFeed(SentFeed sentFeed) {
+                       gotMessage.set(sentFeed.getName());
+               }
+
+               @Override
+               protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
+                       gotMessage.set(receivedBookmarkFeed.getName());
+               }
+
+               @Override
+               protected void consumeProtocolError(ProtocolError protocolError) {
+                       gotMessage.set(protocolError.getName());
+               }
+
+               @Override
+               protected void consumeUnknownMessage(FcpMessage fcpMessage) {
+                       gotMessage.set(fcpMessage.getName());
+               }
+
+       }
+
+}
diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java
deleted file mode 100644 (file)
index 9300ee8..0000000
+++ /dev/null
@@ -1,595 +0,0 @@
-package net.pterodactylus.fcp.quelaton;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
-import net.pterodactylus.fcp.ConfigData;
-import net.pterodactylus.fcp.DataFound;
-import net.pterodactylus.fcp.EndListPeerNotes;
-import net.pterodactylus.fcp.EndListPeers;
-import net.pterodactylus.fcp.EndListPersistentRequests;
-import net.pterodactylus.fcp.FCPPluginReply;
-import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpMessage;
-import net.pterodactylus.fcp.FinishedCompression;
-import net.pterodactylus.fcp.GetFailed;
-import net.pterodactylus.fcp.IdentifierCollision;
-import net.pterodactylus.fcp.NodeData;
-import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.Peer;
-import net.pterodactylus.fcp.PeerNote;
-import net.pterodactylus.fcp.PeerRemoved;
-import net.pterodactylus.fcp.PersistentGet;
-import net.pterodactylus.fcp.PersistentPut;
-import net.pterodactylus.fcp.PersistentPutDir;
-import net.pterodactylus.fcp.PersistentRequestModified;
-import net.pterodactylus.fcp.PersistentRequestRemoved;
-import net.pterodactylus.fcp.PluginInfo;
-import net.pterodactylus.fcp.ProtocolError;
-import net.pterodactylus.fcp.PutFailed;
-import net.pterodactylus.fcp.PutFetchable;
-import net.pterodactylus.fcp.PutSuccessful;
-import net.pterodactylus.fcp.ReceivedBookmarkFeed;
-import net.pterodactylus.fcp.SSKKeypair;
-import net.pterodactylus.fcp.SentFeed;
-import net.pterodactylus.fcp.SimpleProgress;
-import net.pterodactylus.fcp.StartedCompression;
-import net.pterodactylus.fcp.SubscribedUSKUpdate;
-import net.pterodactylus.fcp.TestDDAComplete;
-import net.pterodactylus.fcp.TestDDAReply;
-import net.pterodactylus.fcp.URIGenerated;
-import net.pterodactylus.fcp.UnknownNodeIdentifier;
-import net.pterodactylus.fcp.UnknownPeerNoteType;
-
-import org.junit.Test;
-
-/**
- * Unit test for {@link FcpReplySequence}.
- *
- * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
- */
-public class FcpReplySequenceTest {
-
-       private final FcpConnection fcpConnection = mock(FcpConnection.class);
-       private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-       private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
-       private final FcpMessage fcpMessage = new FcpMessage("Test");
-
-       @Test
-       public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
-               FcpReplySequence replySequence = createBasicReplySequence();
-               replySequence.send(fcpMessage).get();
-               verify(fcpConnection).sendMessage(fcpMessage);
-       }
-
-       private FcpReplySequence createBasicReplySequence() {
-               return new FcpReplySequence(executorService, fcpConnection) {
-                               @Override
-                               protected boolean isFinished() {
-                                       return true;
-                               }
-                       };
-       }
-
-       @Test
-       public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
-               FcpReplySequence replySequence = createBasicReplySequence();
-               replySequence.send(fcpMessage);
-               verify(fcpConnection).addFcpListener(replySequence);
-       }
-
-       @Test
-       public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
-               FcpReplySequence replySequence = createBasicReplySequence();
-               replySequence.send(fcpMessage);
-               replySequence.close();
-               verify(fcpConnection).removeFcpListener(replySequence);
-       }
-
-       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
-               waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
-       }
-
-       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
-               replySequence.setExpectedMessage(message.getName());
-               Future<Boolean> result = replySequence.send(fcpMessage);
-               messageReceiver.receiveMessage(fcpConnection, message);
-               assertThat(result.get(), is(true));
-       }
-
-       private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
-               return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
-       }
-
-       private interface MessageCreator<M extends BaseMessage> {
-
-               M create(FcpMessage fcpMessage);
-
-       }
-
-       @Test
-       public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
-               waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
-       }
-
-       @Test(expected = ExecutionException.class)
-       public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
-               replySequence.setExpectedMessage("");
-               Future<Boolean> result = replySequence.send(fcpMessage);
-               replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection,
-                       new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
-               result.get();
-       }
-
-       @Test
-       public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
-       }
-
-       @Test
-       public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
-       }
-
-       @Test
-       public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
-       }
-
-       @Test
-       public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
-       }
-
-       @Test
-       public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
-       }
-
-       @Test
-       public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
-       }
-
-       @Test
-       public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
-                       new FcpMessage("NodeData").put("ark.pubURI", "")
-                                       .put("ark.number", "0")
-                                       .put("auth.negTypes", "")
-                                       .put("version", "0,0,0,0")
-                                       .put("lastGoodVersion", "0,0,0,0")));
-       }
-
-       @Test
-       public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
-       }
-
-       @Test
-       public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
-       }
-
-       @Test
-       public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
-       }
-
-       @Test
-       public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
-       }
-
-       @Test
-       public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
-       }
-
-       @Test
-       public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
-       }
-
-       @Test
-       public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
-       }
-
-       @Test
-       public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
-       }
-
-       @Test
-       public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
-       }
-
-       @Test
-       public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
-       }
-
-       @Test
-       public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
-       }
-
-       @Test
-       public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
-       }
-
-       @Test
-       public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
-       }
-
-       @Test
-       public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
-       }
-
-       @Test
-       public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
-       }
-
-       @Test
-       public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
-       }
-
-       @Test
-       public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
-       }
-
-       @Test
-       public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
-       }
-
-       @Test
-       public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
-       }
-
-       @Test
-       public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
-       }
-
-       @Test
-       public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
-       }
-
-       @Test
-       public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
-       }
-
-       @Test
-       public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
-       }
-
-       @Test
-       public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
-       }
-
-       @Test
-       public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
-       }
-
-       @Test
-       public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
-       }
-
-       @Test
-       public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
-       }
-
-       @Test
-       public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
-       }
-
-       @Test
-       public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
-               replySequence.setExpectedMessage("SomeFcpMessage");
-               Future<Boolean> result = replySequence.send(fcpMessage);
-               replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
-               assertThat(result.get(), is(true));
-       }
-
-       @Test
-       public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
-               TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
-                       private final AtomicBoolean gotPutFailed = new AtomicBoolean();
-                       private final AtomicBoolean gotGetFailed = new AtomicBoolean();
-
-                       @Override
-                       protected boolean isFinished() {
-                               return gotPutFailed.get() && gotGetFailed.get();
-                       }
-
-                       @Override
-                       protected Boolean getResult() {
-                               return isFinished();
-                       }
-
-                       @Override
-                       protected void consumePutFailed(PutFailed putFailed) {
-                               gotPutFailed.set(true);
-                       }
-
-                       @Override
-                       protected void consumeGetFailed(GetFailed getFailed) {
-                               gotGetFailed.set(true);
-                       }
-               };
-               Future<?> result = replySequence.send(fcpMessage);
-               assertThat(result.isDone(), is(false));
-               replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
-               assertThat(result.isDone(), is(false));
-               replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
-               assertThat(result.get(), is(true));
-       }
-
-       @Test
-       public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
-               replySequence.setExpectedMessage("none");
-               Future<Boolean> result = replySequence.send(fcpMessage);
-               Throwable throwable = new Throwable();
-               replySequence.connectionClosed(fcpConnection, throwable);
-               try {
-                       result.get();
-               } catch (ExecutionException e) {
-                       Throwable t = e;
-                       while (t.getCause() != null) {
-                               t = t.getCause();
-                       }
-                       assertThat(t, sameInstance(throwable));
-               }
-       }
-
-       @FunctionalInterface
-       private interface MessageReceiver<M> {
-
-               void receiveMessage(FcpConnection fcpConnection, M message);
-
-       }
-
-       private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
-
-               private final AtomicReference<String> gotMessage = new AtomicReference<>();
-               private final AtomicReference<String> expectedMessage = new AtomicReference<>();
-
-               public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
-                       super(executorService, fcpConnection);
-               }
-
-               public void setExpectedMessage(String expectedMessage) {
-                       this.expectedMessage.set(expectedMessage);
-               }
-
-               @Override
-               protected boolean isFinished() {
-                       return getResult();
-               }
-
-               @Override
-               protected Boolean getResult() {
-                       return expectedMessage.get().equals(gotMessage.get());
-               }
-
-               @Override
-               protected void consumeNodeHello(NodeHello nodeHello) {
-                       gotMessage.set(nodeHello.getName());
-               }
-
-               @Override
-               protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
-                       gotMessage.set(sskKeypair.getName());
-               }
-
-               @Override
-               protected void consumePeer(Peer peer) {
-                       gotMessage.set(peer.getName());
-               }
-
-               @Override
-               protected void consumeEndListPeers(EndListPeers endListPeers) {
-                       gotMessage.set(endListPeers.getName());
-               }
-
-               @Override
-               protected void consumePeerNote(PeerNote peerNote) {
-                       gotMessage.set(peerNote.getName());
-               }
-
-               @Override
-               protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
-                       gotMessage.set(endListPeerNotes.getName());
-               }
-
-               @Override
-               protected void consumePeerRemoved(PeerRemoved peerRemoved) {
-                       gotMessage.set(peerRemoved.getName());
-               }
-
-               @Override
-               protected void consumeNodeData(NodeData nodeData) {
-                       gotMessage.set(nodeData.getName());
-               }
-
-               @Override
-               protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
-                       gotMessage.set(testDDAReply.getName());
-               }
-
-               @Override
-               protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
-                       gotMessage.set(testDDAComplete.getName());
-               }
-
-               @Override
-               protected void consumePersistentGet(PersistentGet persistentGet) {
-                       gotMessage.set(persistentGet.getName());
-               }
-
-               @Override
-               protected void consumePersistentPut(PersistentPut persistentPut) {
-                       gotMessage.set(persistentPut.getName());
-               }
-
-               @Override
-               protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
-                       gotMessage.set(endListPersistentRequests.getName());
-               }
-
-               @Override
-               protected void consumeURIGenerated(URIGenerated uriGenerated) {
-                       gotMessage.set(uriGenerated.getName());
-               }
-
-               @Override
-               protected void consumeDataFound(DataFound dataFound) {
-                       gotMessage.set(dataFound.getName());
-               }
-
-               @Override
-               protected void consumeAllData(AllData allData) {
-                       gotMessage.set(allData.getName());
-               }
-
-               @Override
-               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
-                       gotMessage.set(simpleProgress.getName());
-               }
-
-               @Override
-               protected void consumeStartedCompression(StartedCompression startedCompression) {
-                       gotMessage.set(startedCompression.getName());
-               }
-
-               @Override
-               protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
-                       gotMessage.set(finishedCompression.getName());
-               }
-
-               @Override
-               protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
-                       gotMessage.set(unknownPeerNoteType.getName());
-               }
-
-               @Override
-               protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
-                       gotMessage.set(unknownNodeIdentifier.getName());
-               }
-
-               @Override
-               protected void consumeConfigData(ConfigData configData) {
-                       gotMessage.set(configData.getName());
-               }
-
-               @Override
-               protected void consumeGetFailed(GetFailed getFailed) {
-                       gotMessage.set(getFailed.getName());
-               }
-
-               @Override
-               protected void consumePutFailed(PutFailed putFailed) {
-                       gotMessage.set(putFailed.getName());
-               }
-
-               @Override
-               protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
-                       gotMessage.set(identifierCollision.getName());
-               }
-
-               @Override
-               protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
-                       gotMessage.set(persistentPutDir.getName());
-               }
-
-               @Override
-               protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
-                       gotMessage.set(persistentRequestRemoved.getName());
-               }
-
-               @Override
-               protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
-                       gotMessage.set(subscribedUSKUpdate.getName());
-               }
-
-               @Override
-               protected void consumePluginInfo(PluginInfo pluginInfo) {
-                       gotMessage.set(pluginInfo.getName());
-               }
-
-               @Override
-               protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
-                       gotMessage.set(fcpPluginReply.getName());
-               }
-
-               @Override
-               protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
-                       gotMessage.set(persistentRequestModified.getName());
-               }
-
-               @Override
-               protected void consumePutSuccessful(PutSuccessful putSuccessful) {
-                       gotMessage.set(putSuccessful.getName());
-               }
-
-               @Override
-               protected void consumePutFetchable(PutFetchable putFetchable) {
-                       gotMessage.set(putFetchable.getName());
-               }
-
-               @Override
-               protected void consumeSentFeed(SentFeed sentFeed) {
-                       gotMessage.set(sentFeed.getName());
-               }
-
-               @Override
-               protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
-                       gotMessage.set(receivedBookmarkFeed.getName());
-               }
-
-               @Override
-               protected void consumeProtocolError(ProtocolError protocolError) {
-                       gotMessage.set(protocolError.getName());
-               }
-
-               @Override
-               protected void consumeUnknownMessage(FcpMessage fcpMessage) {
-                       gotMessage.set(fcpMessage.getName());
-               }
-
-       }
-
-}