Rework FcpReplySequence into an abstract base class
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sun, 5 Jul 2015 20:12:23 +0000 (22:12 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sun, 5 Jul 2015 20:12:23 +0000 (22:12 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java

index 603b150..3a33730 100644 (file)
@@ -3,7 +3,6 @@ package net.pterodactylus.fcp.quelaton;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -25,8 +24,6 @@ import net.pterodactylus.fcp.Priority;
 import net.pterodactylus.fcp.ReturnType;
 import net.pterodactylus.fcp.SSKKeypair;
 
-import com.google.common.io.ByteStreams;
-
 /**
  * Default {@link FcpClient} implementation.
  *
@@ -60,16 +57,25 @@ public class DefaultFcpClient implements FcpClient {
        private FcpConnection createConnection() throws IOException {
                FcpConnection connection = new FcpConnection(hostname, port);
                connection.connect();
-               AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
-               AtomicBoolean receivedClosed = new AtomicBoolean();
-               FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
-               nodeHelloSequence
-                               .handle(NodeHello.class)
-                               .with((nodeHello) -> receivedNodeHello.set(nodeHello));
-               nodeHelloSequence
-                               .handle(CloseConnectionDuplicateClientName.class)
-                               .with((closeConnection) -> receivedClosed.set(true));
-               nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+               FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
+                       private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+                       private final AtomicBoolean receivedClosed = new AtomicBoolean();
+                       @Override
+                       protected boolean isFinished() {
+                               return receivedNodeHello.get() != null || receivedClosed.get();
+                       }
+
+                       @Override
+                       protected void consumeNodeHello(NodeHello nodeHello) {
+                               receivedNodeHello.set(nodeHello);
+                       }
+
+                       @Override
+                       protected void consumeCloseConnectionDuplicateClientName(
+                               CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+                               receivedClosed.set(true);
+                       }
+               };
                ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
                try {
                        nodeHelloSequence.send(clientHello).get();
@@ -91,31 +97,25 @@ public class DefaultFcpClient implements FcpClient {
                public Future<FcpKeyPair> execute() {
                        return threadPool.submit(() -> {
                                connect();
-                               Sequence sequence = new Sequence();
-                               FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
-                               replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
-                               replySequence.waitFor(sequence::isFinished);
-                               replySequence.send(new GenerateSSK()).get();
-                               return sequence.getKeyPair();
-                       });
-               }
-
-               private class Sequence {
-
-                       private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+                               return new FcpReplySequence<FcpKeyPair>(threadPool, fcpConnection.get()) {
+                                       private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
 
-                       public void handleSSKKeypair(SSKKeypair sskKeypair) {
-                               keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
-                       }
-
-                       public boolean isFinished() {
-                               return keyPair.get() != null;
-                       }
+                                       @Override
+                                       protected boolean isFinished() {
+                                               return keyPair.get() != null;
+                                       }
 
-                       public FcpKeyPair getKeyPair() {
-                               return keyPair.get();
-                       }
+                                       @Override
+                                       protected FcpKeyPair getResult() {
+                                               return keyPair.get();
+                                       }
 
+                                       @Override
+                                       protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+                                               keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
+                                       }
+                               }.send(new GenerateSSK()).get();
+                       });
                }
 
        }
@@ -179,11 +179,6 @@ public class DefaultFcpClient implements FcpClient {
 
                @Override
                public Future<Optional<Data>> uri(String uri) {
-                       return threadPool.submit(() -> execute(uri));
-               }
-
-               private Optional<Data> execute(String uri) throws IOException, ExecutionException, InterruptedException {
-                       DefaultFcpClient.this.connect();
                        ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
                        if (ignoreDataStore) {
                                clientGet.setIgnoreDataStore(true);
@@ -203,91 +198,74 @@ public class DefaultFcpClient implements FcpClient {
                        if (global) {
                                clientGet.setGlobal(true);
                        }
-                       try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
-                               Sequence sequence = new Sequence(identifier);
-                               replySequence.handle(AllData.class).with(sequence::allData);
-                               replySequence.handle(GetFailed.class).with(sequence::getFailed);
-                               replySequence.handleClose().with(sequence::disconnect);
-                               replySequence.waitFor(sequence::isFinished);
-                               replySequence.send(clientGet).get();
-                               return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
-                       }
-               }
-
-               private class Sequence {
-
-                       private final AtomicBoolean finished = new AtomicBoolean();
-                       private final AtomicBoolean failed = new AtomicBoolean();
-
-                       private final String identifier;
-
-                       private String contentType;
-                       private long dataLength;
-                       private InputStream payload;
-
-                       private Sequence(String identifier) {
-                               this.identifier = identifier;
-                       }
+                       return threadPool.submit(() -> {
+                               connect();
+                               FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, fcpConnection.get()) {
+                                       private final AtomicBoolean finished = new AtomicBoolean();
+                                       private final AtomicBoolean failed = new AtomicBoolean();
 
-                       public boolean isFinished() {
-                               return finished.get() || failed.get();
-                       }
+                                       private final String identifier = ClientGetCommandImpl.this.identifier;
 
-                       public boolean isSuccessful() {
-                               return !failed.get();
-                       }
+                                       private String contentType;
+                                       private long dataLength;
+                                       private InputStream payload;
 
-                       public Data getData() {
-                               return new Data() {
                                        @Override
-                                       public String getMimeType() {
-                                               synchronized (Sequence.this) {
-                                                       return contentType;
-                                               }
+                                       protected boolean isFinished() {
+                                               return finished.get() || failed.get();
                                        }
 
                                        @Override
-                                       public long size() {
-                                               synchronized (Sequence.this) {
-                                                       return dataLength;
-                                               }
+                                       protected Optional<Data> getResult() {
+                                               return failed.get() ? Optional.empty() : Optional.of(new Data() {
+                                                       @Override
+                                                       public String getMimeType() {
+                                                               return contentType;
+                                                       }
+
+                                                       @Override
+                                                       public long size() {
+                                                               return dataLength;
+                                                       }
+
+                                                       @Override
+                                                       public InputStream getInputStream() {
+                                                               return payload;
+                                                       }
+                                               });
                                        }
 
                                        @Override
-                                       public InputStream getInputStream() {
-                                               synchronized (Sequence.this) {
-                                                       return payload;
+                                       protected void consumeAllData(AllData allData) {
+                                               if (allData.getIdentifier().equals(identifier)) {
+                                                       synchronized (this) {
+                                                               contentType = allData.getContentType();
+                                                               dataLength = allData.getDataLength();
+                                                               try {
+                                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+                                                                       finished.set(true);
+                                                               } catch (IOException e) {
+                                                                       // TODO – logging
+                                                                       failed.set(true);
+                                                               }
+                                                       }
                                                }
                                        }
-                               };
-                       }
 
-                       public void allData(AllData allData) {
-                               if (allData.getIdentifier().equals(identifier)) {
-                                       synchronized (this) {
-                                               contentType = allData.getContentType();
-                                               dataLength = allData.getDataLength();
-                                               try {
-                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
-                                                       finished.set(true);
-                                               } catch (IOException e) {
-                                                       // TODO – logging
+                                       @Override
+                                       protected void consumeGetFailed(GetFailed getFailed) {
+                                               if (getFailed.getIdentifier().equals(identifier)) {
                                                        failed.set(true);
                                                }
                                        }
-                               }
-                       }
-
-                       public void getFailed(GetFailed getFailed) {
-                               if (getFailed.getIdentifier().equals(identifier)) {
-                                       failed.set(true);
-                               }
-                       }
-
-                       public void disconnect(Throwable t) {
-                               failed.set(true);
-                       }
 
+                                       @Override
+                                       protected void consumeConnectionClosed(Throwable throwable) {
+                                               failed.set(true);
+                                       }
+                               };
+                               return replySequence.send(clientGet).get();
+                       });
                }
 
        }
index 0c54d7c..25db847 100644 (file)
@@ -1,17 +1,11 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.ConfigData;
 import net.pterodactylus.fcp.DataFound;
@@ -57,312 +51,343 @@ import net.pterodactylus.fcp.UnknownPeerNoteType;
  *
  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
  */
-public class FcpReplySequence implements AutoCloseable, FcpListener {
+public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
 
+       private final Object syncObject = new Object();
        private final ExecutorService executorService;
        private final FcpConnection fcpConnection;
-       private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
-       private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
-       private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
-       private Supplier<Boolean> endPredicate;
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
                this.executorService = executorService;
                this.fcpConnection = fcpConnection;
        }
 
-       public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
-               return new $1<>(messageClass);
-       }
-
-       public class $1<M extends BaseMessage> {
-
-               private Class<M> messageClass;
-
-               private $1(Class<M> messageClass) {
-                       this.messageClass = messageClass;
-               }
-
-               public FcpReplySequence with(Consumer<M> action) {
-                       expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
-                       return FcpReplySequence.this;
-               }
-
-       }
-
-       public $2 handleUnknown() {
-               return new $2();
-       }
-
-       public class $2 {
-
-               public FcpReplySequence with(Consumer<FcpMessage> consumer) {
-                       unknownMessageHandlers.add(consumer);
-                       return FcpReplySequence.this;
-               }
-
-       }
-
-       public $3 handleClose() {
-               return new $3();
-       }
+       protected abstract boolean isFinished();
 
-       public class $3 {
+       public Future<R> send(FcpMessage fcpMessage) throws IOException {
+               try {
+               fcpConnection.addFcpListener(this);
 
-               public FcpReplySequence with(Consumer<Throwable> consumer) {
-                       closeHandlers.add(consumer);
-                       return FcpReplySequence.this;
+               } catch (Throwable throwable) {
+                       throwable.printStackTrace();
                }
-
-       }
-
-       public void waitFor(Supplier<Boolean> endPredicate) {
-               this.endPredicate = endPredicate;
-       }
-
-       public Future<?> send(FcpMessage fcpMessage) throws IOException {
-               fcpConnection.addFcpListener(this);
                fcpConnection.sendMessage(fcpMessage);
                return executorService.submit(() -> {
-                       synchronized (endPredicate) {
-                               while (!endPredicate.get()) {
-                                       endPredicate.wait();
+                       synchronized (syncObject) {
+                               while (!isFinished()) {
+                                       syncObject.wait();
                                }
                        }
-                       return null;
+                       return getResult();
                });
        }
 
+       protected R getResult() {
+               return null;
+       }
+
        @Override
        public void close() {
                fcpConnection.removeFcpListener(this);
        }
 
-       private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
-               if (expectedMessageActions.containsKey(fcpMessageClass)) {
-                       expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
+       private <M> void consume(Consumer<M> consumer,  M message) {
+               consumer.accept(message);
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
                }
        }
 
        private void consumeUnknown(FcpMessage fcpMessage) {
-               for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
-                       unknownMessageHandler.accept(fcpMessage);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
+               consumeUnknownMessage(fcpMessage);
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
                }
        }
 
        private void consumeClose(Throwable throwable) {
-               for (Consumer<Throwable> closeHandler : closeHandlers) {
-                       closeHandler.accept(throwable);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
+               consumeConnectionClosed(throwable);
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
                }
        }
 
        @Override
-       public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
-               consume(NodeHello.class, nodeHello);
+       public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+               consume(this::consumeNodeHello, nodeHello);
        }
 
+       protected void consumeNodeHello(NodeHello nodeHello) { }
+
        @Override
-       public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+       public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+               consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
        }
 
+       protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
+
        @Override
-       public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
-               consume(SSKKeypair.class, sskKeypair);
+       public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+               consume(this::consumeSSKKeypair, sskKeypair);
        }
 
+       protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
        @Override
-       public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
-               consume(Peer.class, peer);
+       public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+               consume(this::consumePeer, peer);
        }
 
+       protected void consumePeer(Peer peer) { }
+
        @Override
-       public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
-               consume(EndListPeers.class, endListPeers);
+       public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+               consume(this::consumeEndListPeers, endListPeers);
        }
 
+       protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
        @Override
-       public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
-               consume(PeerNote.class, peerNote);
+       public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+               consume(this::consumePeerNote, peerNote);
        }
 
+       protected void consumePeerNote(PeerNote peerNote) { }
+
        @Override
-       public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
-               consume(EndListPeerNotes.class, endListPeerNotes);
+       public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+               consume(this::consumeEndListPeerNotes, endListPeerNotes);
        }
 
+       protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
        @Override
-       public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
-               consume(PeerRemoved.class, peerRemoved);
+       public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+               consume(this::consumePeerRemoved, peerRemoved);
        }
 
+       protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
        @Override
-       public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
-               consume(NodeData.class, nodeData);
+       public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+               consume(this::consumeNodeData, nodeData);
        }
 
+       protected void consumeNodeData(NodeData nodeData) { }
+
        @Override
-       public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
-               consume(TestDDAReply.class, testDDAReply);
+       public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+               consume(this::consumeTestDDAReply, testDDAReply);
        }
 
+       protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
        @Override
-       public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
-               consume(TestDDAComplete.class, testDDAComplete);
+       public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+               consume(this::consumeTestDDAComplete, testDDAComplete);
        }
 
+       protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
        @Override
-       public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
-               consume(PersistentGet.class, persistentGet);
+       public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+               consume(this::consumePersistentGet, persistentGet);
        }
 
+       protected void consumePersistentGet(PersistentGet persistentGet) { }
+
        @Override
-       public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
-               consume(PersistentPut.class, persistentPut);
+       public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+               consume(this::consumePersistentPut, persistentPut);
        }
 
+       protected void consumePersistentPut(PersistentPut persistentPut) { }
+
        @Override
-       public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+       public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
                        EndListPersistentRequests endListPersistentRequests) {
-               consume(EndListPersistentRequests.class, endListPersistentRequests);
+               consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
        }
 
+       protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
        @Override
-       public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
-               consume(URIGenerated.class, uriGenerated);
+       public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+               consume(this::consumeURIGenerated, uriGenerated);
        }
 
+       protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
        @Override
-       public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
-               consume(DataFound.class, dataFound);
+       public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+               consume(this::consumeDataFound, dataFound);
        }
 
+       protected void consumeDataFound(DataFound dataFound) { }
+
        @Override
-       public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
-               consume(AllData.class, allData);
+       public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+               consume(this::consumeAllData, allData);
        }
 
+       protected void consumeAllData(AllData allData) { }
+
        @Override
-       public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
-               consume(SimpleProgress.class, simpleProgress);
+       public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+               consume(this::consumeSimpleProgress, simpleProgress);
        }
 
+       protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
        @Override
-       public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
-               consume(StartedCompression.class, startedCompression);
+       public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+               consume(this::consumeStartedCompression, startedCompression);
        }
 
+       protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
        @Override
-       public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
-               consume(FinishedCompression.class, finishedCompression);
+       public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+               consume(this::consumeFinishedCompression, finishedCompression);
        }
 
+       protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
        @Override
-       public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
-               consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+       public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+               consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
        }
 
+       protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
        @Override
-       public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+       public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
                        UnknownNodeIdentifier unknownNodeIdentifier) {
-               consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+               consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
        }
 
+       protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
        @Override
-       public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
-               consume(ConfigData.class, configData);
+       public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+               consume(this::consumeConfigData, configData);
        }
 
+       protected void consumeConfigData(ConfigData configData) { }
+
        @Override
-       public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
-               consume(GetFailed.class, getFailed);
+       public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+               consume(this::consumeGetFailed, getFailed);
        }
 
+       protected void consumeGetFailed(GetFailed getFailed) { }
+
        @Override
-       public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
-               consume(PutFailed.class, putFailed);
+       public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+               consume(this::consumePutFailed, putFailed);
        }
 
+       protected void consumePutFailed(PutFailed putFailed) { }
+
        @Override
-       public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
-               consume(IdentifierCollision.class, identifierCollision);
+       public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+               consume(this::consumeIdentifierCollision, identifierCollision);
        }
 
+       protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
        @Override
-       public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
-               consume(PersistentPutDir.class, persistentPutDir);
+       public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+               consume(this::consumePersistentPutDir, persistentPutDir);
        }
 
+       protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
        @Override
-       public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+       public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
                        PersistentRequestRemoved persistentRequestRemoved) {
-               consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+               consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
        }
 
+       protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
        @Override
-       public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
-               consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+       public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+               consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
        }
 
+       protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
        @Override
-       public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
-               consume(PluginInfo.class, pluginInfo);
+       public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+               consume(this::consumePluginInfo, pluginInfo);
        }
 
+       protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
        @Override
-       public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
-               consume(FCPPluginReply.class, fcpPluginReply);
+       public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+               consume(this::consumeFCPPluginReply, fcpPluginReply);
        }
 
+       protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
        @Override
-       public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+       public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
                        PersistentRequestModified persistentRequestModified) {
-               consume(PersistentRequestModified.class, persistentRequestModified);
+               consume(this::consumePersistentRequestModified, persistentRequestModified);
        }
 
+       protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
        @Override
-       public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
-               consume(PutSuccessful.class, putSuccessful);
+       public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+               consume(this::consumePutSuccessful, putSuccessful);
        }
 
+       protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
        @Override
-       public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
-               consume(PutFetchable.class, putFetchable);
+       public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+               consume(this::consumePutFetchable, putFetchable);
        }
 
+       protected void consumePutFetchable(PutFetchable putFetchable) { }
+
        @Override
-       public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
-               consume(SentFeed.class, sentFeed);
+       public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+               consume(this::consumeSentFeed, sentFeed);
        }
 
+       protected void consumeSentFeed(SentFeed sentFeed) { }
+
        @Override
-       public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
-               consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+       public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+               consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
        }
 
+       protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
        @Override
-       public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
-               consume(ProtocolError.class, protocolError);
+       public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+               consume(this::consumeProtocolError, protocolError);
        }
 
+       protected void consumeProtocolError(ProtocolError protocolError) { }
+
        @Override
-       public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+       public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
                consumeUnknown(fcpMessage);
        }
 
+       protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
        @Override
-       public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+       public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
                consumeClose(throwable);
        }
 
+       protected void consumeConnectionClosed(Throwable throwable) { }
+
 }
index fb849fe..07864ae 100644 (file)
@@ -12,7 +12,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.AllData;
 import net.pterodactylus.fcp.BaseMessage;
@@ -66,317 +65,531 @@ public class FcpReplySequenceTest {
 
        private final FcpConnection fcpConnection = mock(FcpConnection.class);
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();
-       private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
+       private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
        private final FcpMessage fcpMessage = new FcpMessage("Test");
 
        @Test
        public void canSendMessage() throws IOException {
-               replyWaiter.send(fcpMessage);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
                verify(fcpConnection).sendMessage(fcpMessage);
        }
 
+       private FcpReplySequence createBasicReplySequence() {
+               return new FcpReplySequence(executorService, fcpConnection) {
+                               @Override
+                               protected boolean isFinished() {
+                                       return true;
+                               }
+                       };
+       }
+
        @Test
        public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
-               replyWaiter.send(fcpMessage);
-               verify(fcpConnection).addFcpListener(replyWaiter);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
+               verify(fcpConnection).addFcpListener(replySequence);
        }
 
        @Test
        public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
-               replyWaiter.send(fcpMessage);
-               replyWaiter.close();
-               verify(fcpConnection).removeFcpListener(replyWaiter);
+               FcpReplySequence replySequence = createBasicReplySequence();
+               replySequence.send(fcpMessage);
+               replySequence.close();
+               verify(fcpConnection).removeFcpListener(replySequence);
        }
 
-       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
-                       Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
-               AtomicBoolean gotMessage = setupMessage(messageClass);
-               Future<?> result = replyWaiter.send(fcpMessage);
-               sendMessage(messageReceiver, message.get());
-               result.get();
-               assertThat(gotMessage.get(), is(true));
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
+               waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
        }
 
-       private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
-               messageReceiver.receive(fcpConnection, message);
+       private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
+               replySequence.setExpectedMessage(message.getName());
+               Future<Boolean> result = replySequence.send(fcpMessage);
+               messageReceiver.receiveMessage(fcpConnection, message);
+               assertThat(result.get(), is(true));
        }
 
-       private interface MessageReceiver<M extends BaseMessage> {
-
-               void receive(FcpConnection fcpConnection, M message);
+       private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
+               return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
        }
 
-       private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
-               AtomicBoolean gotMessage = new AtomicBoolean();
-               replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
-               replyWaiter.waitFor(() -> gotMessage.get());
-               return gotMessage;
+       private interface MessageCreator<M extends BaseMessage> {
+
+               M create(FcpMessage fcpMessage);
+
        }
 
        @Test
        public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
-               waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
-                               () -> new NodeHello(new FcpMessage("NodeHello")));
+               waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
        }
 
        @Test
-       public void waitingForConnectionClosedDuplicateClientNameWorks()
-       throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
-                               CloseConnectionDuplicateClientName.class,
-                               () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+       public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
+               waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
        }
 
        @Test
        public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
-                               () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
+               waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
        }
 
        @Test
        public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
+               waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
        }
 
        @Test
        public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
-                               () -> new EndListPeers(new FcpMessage("EndListPeers")));
+               waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
        }
 
        @Test
        public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
-                               () -> new PeerNote(new FcpMessage("PeerNote")));
+               waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
        }
 
        @Test
        public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
-                               () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
+               waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
        }
 
        @Test
        public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
-                               () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
+               waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
        }
 
        @Test
        public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
-                               () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
-                                               .put(
-                                                               "ark.number", "0")
-                                               .put("auth.negTypes", "")
-                                               .put("version", "0,0,0,0")
-                                               .put("lastGoodVersion", "0,0,0,0")));
+               waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
+                       new FcpMessage("NodeData").put("ark.pubURI", "")
+                                       .put("ark.number", "0")
+                                       .put("auth.negTypes", "")
+                                       .put("version", "0,0,0,0")
+                                       .put("lastGoodVersion", "0,0,0,0")));
        }
 
        @Test
        public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
-                               () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
+               waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
        }
 
        @Test
        public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
-                               () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
+               waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
        }
 
        @Test
        public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
-                               () -> new PersistentGet(new FcpMessage("PersistentGet")));
+               waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
        }
 
        @Test
        public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
-                               () -> new PersistentPut(new FcpMessage("PersistentPut")));
+               waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
        }
 
        @Test
        public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
-                               () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
+               waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
        }
 
        @Test
        public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
-                               () -> new URIGenerated(new FcpMessage("URIGenerated")));
+               waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
        }
 
        @Test
        public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
-                               () -> new DataFound(new FcpMessage("DataFound")));
+               waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
        }
 
        @Test
        public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
-                               () -> new AllData(new FcpMessage("AllData"), null));
+               waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
        }
 
        @Test
        public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
-                               () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
+               waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
        }
 
        @Test
        public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
-                               () -> new StartedCompression(new FcpMessage("StartedCompression")));
+               waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
        }
 
        @Test
        public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
-                               () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
+               waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
        }
 
        @Test
        public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
-                               () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
+               waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
        }
 
        @Test
        public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
-                               () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
+               waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
        }
 
        @Test
        public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
-                               () -> new ConfigData(new FcpMessage("ConfigData")));
+               waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
        }
 
        @Test
        public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
-                               () -> new GetFailed(new FcpMessage("GetFailed")));
+               waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
        }
 
        @Test
        public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
-                               () -> new PutFailed(new FcpMessage("PutFailed")));
+               waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
        }
 
        @Test
        public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
-                               () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
+               waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
        }
 
        @Test
        public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
-                               () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
+               waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
        }
 
        @Test
        public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
-                               () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
+               waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
        }
 
        @Test
        public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
-                               () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
+               waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
        }
 
        @Test
        public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
-                               () -> new PluginInfo(new FcpMessage("PluginInfo")));
+               waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
        }
 
        @Test
        public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
-                               () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+               waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
        }
 
        @Test
        public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
-                               () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
+               waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
        }
 
        @Test
        public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
-                               () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
+               waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
        }
 
        @Test
        public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
-                               () -> new PutFetchable(new FcpMessage("PutFetchable")));
+               waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
        }
 
        @Test
        public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
-                               () -> new SentFeed(new FcpMessage("SentFeed")));
+               waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
        }
 
        @Test
        public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
-                               () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
+               waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
        }
 
        @Test
        public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
-               waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
-                               () -> new ProtocolError(new FcpMessage("ProtocolError")));
+               waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
        }
 
        @Test
        public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
-               replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
-               replyWaiter.waitFor(() -> receivedMessage.get() != null);
-               Future<?> result = replyWaiter.send(fcpMessage);
-               replyWaiter.receivedMessage(fcpConnection, fcpMessage);
-               result.get();
-               assertThat(receivedMessage.get(), is(fcpMessage));
+               replySequence.setExpectedMessage("SomeFcpMessage");
+               Future<Boolean> result = replySequence.send(fcpMessage);
+               replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
+               assertThat(result.get(), is(true));
        }
 
        @Test
        public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicBoolean gotPutFailed = new AtomicBoolean();
-               replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
-               AtomicBoolean gotGetFailed = new AtomicBoolean();
-               replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
-               replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
-               Future<?> result = replyWaiter.send(fcpMessage);
+               TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
+                       private final AtomicBoolean gotPutFailed = new AtomicBoolean();
+                       private final AtomicBoolean gotGetFailed = new AtomicBoolean();
+
+                       @Override
+                       protected boolean isFinished() {
+                               return gotPutFailed.get() && gotGetFailed.get();
+                       }
+
+                       @Override
+                       protected Boolean getResult() {
+                               return isFinished();
+                       }
+
+                       @Override
+                       protected void consumePutFailed(PutFailed putFailed) {
+                               gotPutFailed.set(true);
+                       }
+
+                       @Override
+                       protected void consumeGetFailed(GetFailed getFailed) {
+                               gotGetFailed.set(true);
+                       }
+               };
+               Future<?> result = replySequence.send(fcpMessage);
                assertThat(result.isDone(), is(false));
-               replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+               replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
                assertThat(result.isDone(), is(false));
-               replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
-               result.get();
+               replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+               assertThat(result.get(), is(true));
        }
 
        @Test
        public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
-               AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
-               replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
-               replyWaiter.waitFor(() -> receivedThrowable.get() != null);
-               Future<?> result = replyWaiter.send(fcpMessage);
+               replySequence.setExpectedMessage("ConnectionClosed");
+               Future<Boolean> result = replySequence.send(fcpMessage);
                Throwable throwable = new Throwable();
-               replyWaiter.connectionClosed(fcpConnection, throwable);
-               result.get();
-               assertThat(receivedThrowable.get(), is(throwable));
+               replySequence.connectionClosed(fcpConnection, throwable);
+               assertThat(result.get(), is(true));
+               assertThat(replySequence.receivedThrowable.get(), is(throwable));
+       }
+
+       @FunctionalInterface
+       private interface MessageReceiver<M> {
+
+               void receiveMessage(FcpConnection fcpConnection, M message);
+
+       }
+
+       private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
+
+               private final AtomicReference<String> gotMessage = new AtomicReference<>();
+               private final AtomicReference<String> expectedMessage = new AtomicReference<>();
+               private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
+
+               public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
+                       super(executorService, fcpConnection);
+               }
+
+               public void setExpectedMessage(String expectedMessage) {
+                       this.expectedMessage.set(expectedMessage);
+               }
+
+               @Override
+               protected boolean isFinished() {
+                       return getResult();
+               }
+
+               @Override
+               protected Boolean getResult() {
+                       return expectedMessage.get().equals(gotMessage.get());
+               }
+
+               @Override
+               protected void consumeNodeHello(NodeHello nodeHello) {
+                       gotMessage.set(nodeHello.getName());
+               }
+
+               @Override
+               protected void consumeCloseConnectionDuplicateClientName(
+                       CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+                       gotMessage.set(closeConnectionDuplicateClientName.getName());
+               }
+
+               @Override
+               protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+                       gotMessage.set(sskKeypair.getName());
+               }
+
+               @Override
+               protected void consumePeer(Peer peer) {
+                       gotMessage.set(peer.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeers(EndListPeers endListPeers) {
+                       gotMessage.set(endListPeers.getName());
+               }
+
+               @Override
+               protected void consumePeerNote(PeerNote peerNote) {
+                       gotMessage.set(peerNote.getName());
+               }
+
+               @Override
+               protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+                       gotMessage.set(endListPeerNotes.getName());
+               }
+
+               @Override
+               protected void consumePeerRemoved(PeerRemoved peerRemoved) {
+                       gotMessage.set(peerRemoved.getName());
+               }
+
+               @Override
+               protected void consumeNodeData(NodeData nodeData) {
+                       gotMessage.set(nodeData.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+                       gotMessage.set(testDDAReply.getName());
+               }
+
+               @Override
+               protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+                       gotMessage.set(testDDAComplete.getName());
+               }
+
+               @Override
+               protected void consumePersistentGet(PersistentGet persistentGet) {
+                       gotMessage.set(persistentGet.getName());
+               }
+
+               @Override
+               protected void consumePersistentPut(PersistentPut persistentPut) {
+                       gotMessage.set(persistentPut.getName());
+               }
+
+               @Override
+               protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+                       gotMessage.set(endListPersistentRequests.getName());
+               }
+
+               @Override
+               protected void consumeURIGenerated(URIGenerated uriGenerated) {
+                       gotMessage.set(uriGenerated.getName());
+               }
+
+               @Override
+               protected void consumeDataFound(DataFound dataFound) {
+                       gotMessage.set(dataFound.getName());
+               }
+
+               @Override
+               protected void consumeAllData(AllData allData) {
+                       gotMessage.set(allData.getName());
+               }
+
+               @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       gotMessage.set(simpleProgress.getName());
+               }
+
+               @Override
+               protected void consumeStartedCompression(StartedCompression startedCompression) {
+                       gotMessage.set(startedCompression.getName());
+               }
+
+               @Override
+               protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
+                       gotMessage.set(finishedCompression.getName());
+               }
+
+               @Override
+               protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+                       gotMessage.set(unknownPeerNoteType.getName());
+               }
+
+               @Override
+               protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+                       gotMessage.set(unknownNodeIdentifier.getName());
+               }
+
+               @Override
+               protected void consumeConfigData(ConfigData configData) {
+                       gotMessage.set(configData.getName());
+               }
+
+               @Override
+               protected void consumeGetFailed(GetFailed getFailed) {
+                       gotMessage.set(getFailed.getName());
+               }
+
+               @Override
+               protected void consumePutFailed(PutFailed putFailed) {
+                       gotMessage.set(putFailed.getName());
+               }
+
+               @Override
+               protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+                       gotMessage.set(identifierCollision.getName());
+               }
+
+               @Override
+               protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
+                       gotMessage.set(persistentPutDir.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+                       gotMessage.set(persistentRequestRemoved.getName());
+               }
+
+               @Override
+               protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+                       gotMessage.set(subscribedUSKUpdate.getName());
+               }
+
+               @Override
+               protected void consumePluginInfo(PluginInfo pluginInfo) {
+                       gotMessage.set(pluginInfo.getName());
+               }
+
+               @Override
+               protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
+                       gotMessage.set(fcpPluginReply.getName());
+               }
+
+               @Override
+               protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
+                       gotMessage.set(persistentRequestModified.getName());
+               }
+
+               @Override
+               protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+                       gotMessage.set(putSuccessful.getName());
+               }
+
+               @Override
+               protected void consumePutFetchable(PutFetchable putFetchable) {
+                       gotMessage.set(putFetchable.getName());
+               }
+
+               @Override
+               protected void consumeSentFeed(SentFeed sentFeed) {
+                       gotMessage.set(sentFeed.getName());
+               }
+
+               @Override
+               protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
+                       gotMessage.set(receivedBookmarkFeed.getName());
+               }
+
+               @Override
+               protected void consumeProtocolError(ProtocolError protocolError) {
+                       gotMessage.set(protocolError.getName());
+               }
+
+               @Override
+               protected void consumeUnknownMessage(FcpMessage fcpMessage) {
+                       gotMessage.set(fcpMessage.getName());
+               }
+
+               @Override
+               protected void consumeConnectionClosed(Throwable throwable) {
+                       receivedThrowable.set(throwable);
+                       gotMessage.set("ConnectionClosed");
+               }
+
        }
 
 }