Rework FcpReplySequence into an abstract base class
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / FcpReplySequenceTest.java
index fb849fe..07864ae 100644 (file)
@@ -12,7 +12,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.AllData;
 import net.pterodactylus.fcp.BaseMessage;
@@ -66,317 +65,531 @@ public class FcpReplySequenceTest {
 
        private final FcpConnection fcpConnection = mock(FcpConnection.class);
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-       private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
+       private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
        private final FcpMessage fcpMessage = new FcpMessage("Test");
 
        @Test
        public void canSendMessage() throws IOException {
-               replyWaiter.send(fcpMessage);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
                verify(fcpConnection).sendMessage(fcpMessage);
        }
 
+       private FcpReplySequence createBasicReplySequence() {
+               return new FcpReplySequence(executorService, fcpConnection) {
+                               @Override
+                               protected boolean isFinished() {
+                                       return true;
+                               }
+                       };
+       }
+
        @Test
        public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
-               replyWaiter.send(fcpMessage);
-               verify(fcpConnection).addFcpListener(replyWaiter);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
+               verify(fcpConnection).addFcpListener(replySequence);
        }
 
        @Test
        public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
-               replyWaiter.send(fcpMessage);
-               replyWaiter.close();
-               verify(fcpConnection).removeFcpListener(replyWaiter);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
+               replySequence.close();
+               verify(fcpConnection).removeFcpListener(replySequence);
        }
 
-       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
-                       Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
-               AtomicBoolean gotMessage = setupMessage(messageClass);
-               Future<?> result = replyWaiter.send(fcpMessage);
-               sendMessage(messageReceiver, message.get());
-               result.get();
-               assertThat(gotMessage.get(), is(true));
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
+               waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
        }
 
-       private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
-               messageReceiver.receive(fcpConnection, message);
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
+               replySequence.setExpectedMessage(message.getName());
+               Future<Boolean> result = replySequence.send(fcpMessage);
+               messageReceiver.receiveMessage(fcpConnection, message);
+               assertThat(result.get(), is(true));
        }
 
-       private interface MessageReceiver<M extends BaseMessage> {
-
-               void receive(FcpConnection fcpConnection, M message);
+       private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
+               return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
        }
 
-       private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
-               AtomicBoolean gotMessage = new AtomicBoolean();
-               replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
-               replyWaiter.waitFor(() -> gotMessage.get());
-               return gotMessage;
+       private interface MessageCreator<M extends BaseMessage> {
+
+               M create(FcpMessage fcpMessage);
+
        }
 
        @Test
        public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
-               waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
-                               () -> new NodeHello(new FcpMessage("NodeHello")));
+               waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
        }
 
        @Test
-       public void waitingForConnectionClosedDuplicateClientNameWorks()
-       throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
-                               CloseConnectionDuplicateClientName.class,
-                               () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+       public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
+               waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
        }
 
        @Test
        public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
-                               () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
+               waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
        }
 
        @Test
        public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
+               waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
        }
 
        @Test
        public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
-                               () -> new EndListPeers(new FcpMessage("EndListPeers")));
+               waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
        }
 
        @Test
        public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
-                               () -> new PeerNote(new FcpMessage("PeerNote")));
+               waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
        }
 
        @Test
        public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
-                               () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
+               waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
        }
 
        @Test
        public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
-                               () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
+               waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
        }
 
        @Test
        public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
-                               () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
-                                               .put(
-                                                               "ark.number", "0")
-                                               .put("auth.negTypes", "")
-                                               .put("version", "0,0,0,0")
-                                               .put("lastGoodVersion", "0,0,0,0")));
+               waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
+                       new FcpMessage("NodeData").put("ark.pubURI", "")
+                                       .put("ark.number", "0")
+                                       .put("auth.negTypes", "")
+                                       .put("version", "0,0,0,0")
+                                       .put("lastGoodVersion", "0,0,0,0")));
        }
 
        @Test
        public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
-                               () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
+               waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
        }
 
        @Test
        public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
-                               () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
+               waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
        }
 
        @Test
        public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
-                               () -> new PersistentGet(new FcpMessage("PersistentGet")));
+               waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
        }
 
        @Test
        public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
-                               () -> new PersistentPut(new FcpMessage("PersistentPut")));
+               waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
        }
 
        @Test
        public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
-                               () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
+               waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
        }
 
        @Test
        public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
-                               () -> new URIGenerated(new FcpMessage("URIGenerated")));
+               waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
        }
 
        @Test
        public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
-                               () -> new DataFound(new FcpMessage("DataFound")));
+               waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
        }
 
        @Test
        public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
-                               () -> new AllData(new FcpMessage("AllData"), null));
+               waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
        }
 
        @Test
        public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
-                               () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
+               waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
        }
 
        @Test
        public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
-                               () -> new StartedCompression(new FcpMessage("StartedCompression")));
+               waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
        }
 
        @Test
        public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
-                               () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
+               waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
        }
 
        @Test
        public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
-                               () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
+               waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
        }
 
        @Test
        public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
-                               () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
+               waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
        }
 
        @Test
        public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
-                               () -> new ConfigData(new FcpMessage("ConfigData")));
+               waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
        }
 
        @Test
        public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
-                               () -> new GetFailed(new FcpMessage("GetFailed")));
+               waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
        }
 
        @Test
        public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
-                               () -> new PutFailed(new FcpMessage("PutFailed")));
+               waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
        }
 
        @Test
        public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
-                               () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
+               waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
        }
 
        @Test
        public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
-                               () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
+               waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
        }
 
        @Test
        public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
-                               () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
+               waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
        }
 
        @Test
        public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
-                               () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
+               waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
        }
 
        @Test
        public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
-                               () -> new PluginInfo(new FcpMessage("PluginInfo")));
+               waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
        }
 
        @Test
        public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
-                               () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+               waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
        }
 
        @Test
        public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
-                               () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
+               waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
        }
 
        @Test
        public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
-                               () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
+               waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
        }
 
        @Test
        public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
-                               () -> new PutFetchable(new FcpMessage("PutFetchable")));
+               waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
        }
 
        @Test
        public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
-                               () -> new SentFeed(new FcpMessage("SentFeed")));
+               waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
        }
 
        @Test
        public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
-                               () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
+               waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
        }
 
        @Test
        public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
-                               () -> new ProtocolError(new FcpMessage("ProtocolError")));
+               waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
        }
 
        @Test
        public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
-               replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
-               replyWaiter.waitFor(() -> receivedMessage.get() != null);
-               Future<?> result = replyWaiter.send(fcpMessage);
-               replyWaiter.receivedMessage(fcpConnection, fcpMessage);
-               result.get();
-               assertThat(receivedMessage.get(), is(fcpMessage));
+               replySequence.setExpectedMessage("SomeFcpMessage");
+               Future<Boolean> result = replySequence.send(fcpMessage);
+               replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
+               assertThat(result.get(), is(true));
        }
 
        @Test
        public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicBoolean gotPutFailed = new AtomicBoolean();
-               replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
-               AtomicBoolean gotGetFailed = new AtomicBoolean();
-               replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
-               replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
-               Future<?> result = replyWaiter.send(fcpMessage);
+               TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
+                       private final AtomicBoolean gotPutFailed = new AtomicBoolean();
+                       private final AtomicBoolean gotGetFailed = new AtomicBoolean();
+
+                       @Override
+                       protected boolean isFinished() {
+                               return gotPutFailed.get() && gotGetFailed.get();
+                       }
+
+                       @Override
+                       protected Boolean getResult() {
+                               return isFinished();
+                       }
+
+                       @Override
+                       protected void consumePutFailed(PutFailed putFailed) {
+                               gotPutFailed.set(true);
+                       }
+
+                       @Override
+                       protected void consumeGetFailed(GetFailed getFailed) {
+                               gotGetFailed.set(true);
+                       }
+               };
+               Future<?> result = replySequence.send(fcpMessage);
                assertThat(result.isDone(), is(false));
-               replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+               replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
                assertThat(result.isDone(), is(false));
-               replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
-               result.get();
+               replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+               assertThat(result.get(), is(true));
        }
 
        @Test
        public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
-               replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
-               replyWaiter.waitFor(() -> receivedThrowable.get() != null);
-               Future<?> result = replyWaiter.send(fcpMessage);
+               replySequence.setExpectedMessage("ConnectionClosed");
+               Future<Boolean> result = replySequence.send(fcpMessage);
                Throwable throwable = new Throwable();
-               replyWaiter.connectionClosed(fcpConnection, throwable);
-               result.get();
-               assertThat(receivedThrowable.get(), is(throwable));
+               replySequence.connectionClosed(fcpConnection, throwable);
+               assertThat(result.get(), is(true));
+               assertThat(replySequence.receivedThrowable.get(), is(throwable));
+       }
+
+       @FunctionalInterface
+       private interface MessageReceiver<M> {
+
+               void receiveMessage(FcpConnection fcpConnection, M message);
+
+       }
+
+       private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
+
+               private final AtomicReference<String> gotMessage = new AtomicReference<>();
+               private final AtomicReference<String> expectedMessage = new AtomicReference<>();
+               private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
+
+               public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
+                       super(executorService, fcpConnection);
+               }
+
+               public void setExpectedMessage(String expectedMessage) {
+                       this.expectedMessage.set(expectedMessage);
+               }
+
+               @Override
+               protected boolean isFinished() {
+                       return getResult();
+               }
+
+               @Override
+               protected Boolean getResult() {
+                       return expectedMessage.get().equals(gotMessage.get());
+               }
+
+               @Override
+               protected void consumeNodeHello(NodeHello nodeHello) {
+                       gotMessage.set(nodeHello.getName());
+               }
+
+               @Override
+               protected void consumeCloseConnectionDuplicateClientName(
+                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+                       gotMessage.set(closeConnectionDuplicateClientName.getName());
+               }
+
+               @Override
+               protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+                       gotMessage.set(sskKeypair.getName());
+               }
+
+               @Override
+               protected void consumePeer(Peer peer) {
+                       gotMessage.set(peer.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeers(EndListPeers endListPeers) {
+                       gotMessage.set(endListPeers.getName());
+               }
+
+               @Override
+               protected void consumePeerNote(PeerNote peerNote) {
+                       gotMessage.set(peerNote.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+                       gotMessage.set(endListPeerNotes.getName());
+               }
+
+               @Override
+               protected void consumePeerRemoved(PeerRemoved peerRemoved) {
+                       gotMessage.set(peerRemoved.getName());
+               }
+
+               @Override
+               protected void consumeNodeData(NodeData nodeData) {
+                       gotMessage.set(nodeData.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+                       gotMessage.set(testDDAReply.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+                       gotMessage.set(testDDAComplete.getName());
+               }
+
+               @Override
+               protected void consumePersistentGet(PersistentGet persistentGet) {
+                       gotMessage.set(persistentGet.getName());
+               }
+
+               @Override
+               protected void consumePersistentPut(PersistentPut persistentPut) {
+                       gotMessage.set(persistentPut.getName());
+               }
+
+               @Override
+               protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+                       gotMessage.set(endListPersistentRequests.getName());
+               }
+
+               @Override
+               protected void consumeURIGenerated(URIGenerated uriGenerated) {
+                       gotMessage.set(uriGenerated.getName());
+               }
+
+               @Override
+               protected void consumeDataFound(DataFound dataFound) {
+                       gotMessage.set(dataFound.getName());
+               }
+
+               @Override
+               protected void consumeAllData(AllData allData) {
+                       gotMessage.set(allData.getName());
+               }
+
+               @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       gotMessage.set(simpleProgress.getName());
+               }
+
+               @Override
+               protected void consumeStartedCompression(StartedCompression startedCompression) {
+                       gotMessage.set(startedCompression.getName());
+               }
+
+               @Override
+               protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
+                       gotMessage.set(finishedCompression.getName());
+               }
+
+               @Override
+               protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+                       gotMessage.set(unknownPeerNoteType.getName());
+               }
+
+               @Override
+               protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+                       gotMessage.set(unknownNodeIdentifier.getName());
+               }
+
+               @Override
+               protected void consumeConfigData(ConfigData configData) {
+                       gotMessage.set(configData.getName());
+               }
+
+               @Override
+               protected void consumeGetFailed(GetFailed getFailed) {
+                       gotMessage.set(getFailed.getName());
+               }
+
+               @Override
+               protected void consumePutFailed(PutFailed putFailed) {
+                       gotMessage.set(putFailed.getName());
+               }
+
+               @Override
+               protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+                       gotMessage.set(identifierCollision.getName());
+               }
+
+               @Override
+               protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
+                       gotMessage.set(persistentPutDir.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+                       gotMessage.set(persistentRequestRemoved.getName());
+               }
+
+               @Override
+               protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+                       gotMessage.set(subscribedUSKUpdate.getName());
+               }
+
+               @Override
+               protected void consumePluginInfo(PluginInfo pluginInfo) {
+                       gotMessage.set(pluginInfo.getName());
+               }
+
+               @Override
+               protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
+                       gotMessage.set(fcpPluginReply.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
+                       gotMessage.set(persistentRequestModified.getName());
+               }
+
+               @Override
+               protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+                       gotMessage.set(putSuccessful.getName());
+               }
+
+               @Override
+               protected void consumePutFetchable(PutFetchable putFetchable) {
+                       gotMessage.set(putFetchable.getName());
+               }
+
+               @Override
+               protected void consumeSentFeed(SentFeed sentFeed) {
+                       gotMessage.set(sentFeed.getName());
+               }
+
+               @Override
+               protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
+                       gotMessage.set(receivedBookmarkFeed.getName());
+               }
+
+               @Override
+               protected void consumeProtocolError(ProtocolError protocolError) {
+                       gotMessage.set(protocolError.getName());
+               }
+
+               @Override
+               protected void consumeUnknownMessage(FcpMessage fcpMessage) {
+                       gotMessage.set(fcpMessage.getName());
+               }
+
+               @Override
+               protected void consumeConnectionClosed(Throwable throwable) {
+                       receivedThrowable.set(throwable);
+                       gotMessage.set("ConnectionClosed");
+               }
+
        }
 
 }