From: David ‘Bombe’ Roden Date: Sun, 5 Jul 2015 20:12:23 +0000 (+0200) Subject: Rework FcpReplySequence into an abstract base class X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=28e284e354ca3825675010f36c2b16b11c5f70ef;p=jFCPlib.git Rework FcpReplySequence into an abstract base class --- diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 603b150..3a33730 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -3,7 +3,6 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.io.InputStream; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -25,8 +24,6 @@ import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.ReturnType; import net.pterodactylus.fcp.SSKKeypair; -import com.google.common.io.ByteStreams; - /** * Default {@link FcpClient} implementation. * @@ -60,16 +57,25 @@ public class DefaultFcpClient implements FcpClient { private FcpConnection createConnection() throws IOException { FcpConnection connection = new FcpConnection(hostname, port); connection.connect(); - AtomicReference receivedNodeHello = new AtomicReference<>(); - AtomicBoolean receivedClosed = new AtomicBoolean(); - FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection); - nodeHelloSequence - .handle(NodeHello.class) - .with((nodeHello) -> receivedNodeHello.set(nodeHello)); - nodeHelloSequence - .handle(CloseConnectionDuplicateClientName.class) - .with((closeConnection) -> receivedClosed.set(true)); - nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get()); + FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection) { + private final AtomicReference receivedNodeHello = new AtomicReference<>(); + private final AtomicBoolean receivedClosed = new AtomicBoolean(); + @Override + protected boolean isFinished() { + return receivedNodeHello.get() != null || receivedClosed.get(); + } + + @Override + protected void consumeNodeHello(NodeHello nodeHello) { + receivedNodeHello.set(nodeHello); + } + + @Override + protected void consumeCloseConnectionDuplicateClientName( + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + receivedClosed.set(true); + } + }; ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { nodeHelloSequence.send(clientHello).get(); @@ -91,31 +97,25 @@ public class DefaultFcpClient implements FcpClient { public Future execute() { return threadPool.submit(() -> { connect(); - Sequence sequence = new Sequence(); - FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get()); - replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair); - replySequence.waitFor(sequence::isFinished); - replySequence.send(new GenerateSSK()).get(); - return sequence.getKeyPair(); - }); - } - - private class Sequence { - - private AtomicReference keyPair = new AtomicReference<>(); + return new FcpReplySequence(threadPool, fcpConnection.get()) { + private AtomicReference keyPair = new AtomicReference<>(); - public void handleSSKKeypair(SSKKeypair sskKeypair) { - keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI())); - } - - public boolean isFinished() { - return keyPair.get() != null; - } + @Override + protected boolean isFinished() { + return keyPair.get() != null; + } - public FcpKeyPair getKeyPair() { - return keyPair.get(); - } + @Override + protected FcpKeyPair getResult() { + return keyPair.get(); + } + @Override + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { + keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI())); + } + }.send(new GenerateSSK()).get(); + }); } } @@ -179,11 +179,6 @@ public class DefaultFcpClient implements FcpClient { @Override public Future> uri(String uri) { - return threadPool.submit(() -> execute(uri)); - } - - private Optional execute(String uri) throws IOException, ExecutionException, InterruptedException { - DefaultFcpClient.this.connect(); ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); if (ignoreDataStore) { clientGet.setIgnoreDataStore(true); @@ -203,91 +198,74 @@ public class DefaultFcpClient implements FcpClient { if (global) { clientGet.setGlobal(true); } - try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) { - Sequence sequence = new Sequence(identifier); - replySequence.handle(AllData.class).with(sequence::allData); - replySequence.handle(GetFailed.class).with(sequence::getFailed); - replySequence.handleClose().with(sequence::disconnect); - replySequence.waitFor(sequence::isFinished); - replySequence.send(clientGet).get(); - return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty(); - } - } - - private class Sequence { - - private final AtomicBoolean finished = new AtomicBoolean(); - private final AtomicBoolean failed = new AtomicBoolean(); - - private final String identifier; - - private String contentType; - private long dataLength; - private InputStream payload; - - private Sequence(String identifier) { - this.identifier = identifier; - } + return threadPool.submit(() -> { + connect(); + FcpReplySequence> replySequence = new FcpReplySequence>(threadPool, fcpConnection.get()) { + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); - public boolean isFinished() { - return finished.get() || failed.get(); - } + private final String identifier = ClientGetCommandImpl.this.identifier; - public boolean isSuccessful() { - return !failed.get(); - } + private String contentType; + private long dataLength; + private InputStream payload; - public Data getData() { - return new Data() { @Override - public String getMimeType() { - synchronized (Sequence.this) { - return contentType; - } + protected boolean isFinished() { + return finished.get() || failed.get(); } @Override - public long size() { - synchronized (Sequence.this) { - return dataLength; - } + protected Optional getResult() { + return failed.get() ? Optional.empty() : Optional.of(new Data() { + @Override + public String getMimeType() { + return contentType; + } + + @Override + public long size() { + return dataLength; + } + + @Override + public InputStream getInputStream() { + return payload; + } + }); } @Override - public InputStream getInputStream() { - synchronized (Sequence.this) { - return payload; + protected void consumeAllData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } } } - }; - } - public void allData(AllData allData) { - if (allData.getIdentifier().equals(identifier)) { - synchronized (this) { - contentType = allData.getContentType(); - dataLength = allData.getDataLength(); - try { - payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); - finished.set(true); - } catch (IOException e) { - // TODO – logging + @Override + protected void consumeGetFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { failed.set(true); } } - } - } - - public void getFailed(GetFailed getFailed) { - if (getFailed.getIdentifier().equals(identifier)) { - failed.set(true); - } - } - - public void disconnect(Throwable t) { - failed.set(true); - } + @Override + protected void consumeConnectionClosed(Throwable throwable) { + failed.set(true); + } + }; + return replySequence.send(clientGet).get(); + }); } } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 0c54d7c..25db847 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,17 +1,11 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.function.Consumer; -import java.util.function.Supplier; import net.pterodactylus.fcp.AllData; -import net.pterodactylus.fcp.BaseMessage; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.ConfigData; import net.pterodactylus.fcp.DataFound; @@ -57,312 +51,343 @@ import net.pterodactylus.fcp.UnknownPeerNoteType; * * @author David ‘Bombe’ Roden */ -public class FcpReplySequence implements AutoCloseable, FcpListener { +public abstract class FcpReplySequence implements AutoCloseable, FcpListener { + private final Object syncObject = new Object(); private final ExecutorService executorService; private final FcpConnection fcpConnection; - private final Map, Consumer> expectedMessageActions = new HashMap<>(); - private final List> unknownMessageHandlers = new ArrayList<>(); - private final List> closeHandlers = new ArrayList<>(); - private Supplier endPredicate; public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { this.executorService = executorService; this.fcpConnection = fcpConnection; } - public $1 handle(Class messageClass) { - return new $1<>(messageClass); - } - - public class $1 { - - private Class messageClass; - - private $1(Class messageClass) { - this.messageClass = messageClass; - } - - public FcpReplySequence with(Consumer action) { - expectedMessageActions.put(messageClass, (Consumer) action); - return FcpReplySequence.this; - } - - } - - public $2 handleUnknown() { - return new $2(); - } - - public class $2 { - - public FcpReplySequence with(Consumer consumer) { - unknownMessageHandlers.add(consumer); - return FcpReplySequence.this; - } - - } - - public $3 handleClose() { - return new $3(); - } + protected abstract boolean isFinished(); - public class $3 { + public Future send(FcpMessage fcpMessage) throws IOException { + try { + fcpConnection.addFcpListener(this); - public FcpReplySequence with(Consumer consumer) { - closeHandlers.add(consumer); - return FcpReplySequence.this; + } catch (Throwable throwable) { + throwable.printStackTrace(); } - - } - - public void waitFor(Supplier endPredicate) { - this.endPredicate = endPredicate; - } - - public Future send(FcpMessage fcpMessage) throws IOException { - fcpConnection.addFcpListener(this); fcpConnection.sendMessage(fcpMessage); return executorService.submit(() -> { - synchronized (endPredicate) { - while (!endPredicate.get()) { - endPredicate.wait(); + synchronized (syncObject) { + while (!isFinished()) { + syncObject.wait(); } } - return null; + return getResult(); }); } + protected R getResult() { + return null; + } + @Override public void close() { fcpConnection.removeFcpListener(this); } - private void consume(Class fcpMessageClass, BaseMessage fcpMessage) { - if (expectedMessageActions.containsKey(fcpMessageClass)) { - expectedMessageActions.get(fcpMessageClass).accept(fcpMessage); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); + private void consume(Consumer consumer, M message) { + consumer.accept(message); + synchronized (syncObject) { + syncObject.notifyAll(); } } private void consumeUnknown(FcpMessage fcpMessage) { - for (Consumer unknownMessageHandler : unknownMessageHandlers) { - unknownMessageHandler.accept(fcpMessage); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); + consumeUnknownMessage(fcpMessage); + synchronized (syncObject) { + syncObject.notifyAll(); } } private void consumeClose(Throwable throwable) { - for (Consumer closeHandler : closeHandlers) { - closeHandler.accept(throwable); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); + consumeConnectionClosed(throwable); + synchronized (syncObject) { + syncObject.notifyAll(); } } @Override - public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { - consume(NodeHello.class, nodeHello); + public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { + consume(this::consumeNodeHello, nodeHello); } + protected void consumeNodeHello(NodeHello nodeHello) { } + @Override - public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, + public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName); + consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); } + protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } + @Override - public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { - consume(SSKKeypair.class, sskKeypair); + public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { + consume(this::consumeSSKKeypair, sskKeypair); } + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { } + @Override - public void receivedPeer(FcpConnection fcpConnection, Peer peer) { - consume(Peer.class, peer); + public final void receivedPeer(FcpConnection fcpConnection, Peer peer) { + consume(this::consumePeer, peer); } + protected void consumePeer(Peer peer) { } + @Override - public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { - consume(EndListPeers.class, endListPeers); + public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { + consume(this::consumeEndListPeers, endListPeers); } + protected void consumeEndListPeers(EndListPeers endListPeers) { } + @Override - public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { - consume(PeerNote.class, peerNote); + public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { + consume(this::consumePeerNote, peerNote); } + protected void consumePeerNote(PeerNote peerNote) { } + @Override - public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { - consume(EndListPeerNotes.class, endListPeerNotes); + public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { + consume(this::consumeEndListPeerNotes, endListPeerNotes); } + protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { } + @Override - public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { - consume(PeerRemoved.class, peerRemoved); + public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { + consume(this::consumePeerRemoved, peerRemoved); } + protected void consumePeerRemoved(PeerRemoved peerRemoved) { } + @Override - public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { - consume(NodeData.class, nodeData); + public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { + consume(this::consumeNodeData, nodeData); } + protected void consumeNodeData(NodeData nodeData) { } + @Override - public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(TestDDAReply.class, testDDAReply); + public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { + consume(this::consumeTestDDAReply, testDDAReply); } + protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } + @Override - public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(TestDDAComplete.class, testDDAComplete); + public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { + consume(this::consumeTestDDAComplete, testDDAComplete); } + protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } + @Override - public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { - consume(PersistentGet.class, persistentGet); + public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { + consume(this::consumePersistentGet, persistentGet); } + protected void consumePersistentGet(PersistentGet persistentGet) { } + @Override - public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { - consume(PersistentPut.class, persistentPut); + public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { + consume(this::consumePersistentPut, persistentPut); } + protected void consumePersistentPut(PersistentPut persistentPut) { } + @Override - public void receivedEndListPersistentRequests(FcpConnection fcpConnection, + public final void receivedEndListPersistentRequests(FcpConnection fcpConnection, EndListPersistentRequests endListPersistentRequests) { - consume(EndListPersistentRequests.class, endListPersistentRequests); + consume(this::consumeEndListPersistentRequests, endListPersistentRequests); } + protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { } + @Override - public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { - consume(URIGenerated.class, uriGenerated); + public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { + consume(this::consumeURIGenerated, uriGenerated); } + protected void consumeURIGenerated(URIGenerated uriGenerated) { } + @Override - public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { - consume(DataFound.class, dataFound); + public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { + consume(this::consumeDataFound, dataFound); } + protected void consumeDataFound(DataFound dataFound) { } + @Override - public void receivedAllData(FcpConnection fcpConnection, AllData allData) { - consume(AllData.class, allData); + public final void receivedAllData(FcpConnection fcpConnection, AllData allData) { + consume(this::consumeAllData, allData); } + protected void consumeAllData(AllData allData) { } + @Override - public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { - consume(SimpleProgress.class, simpleProgress); + public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { + consume(this::consumeSimpleProgress, simpleProgress); } + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { } + @Override - public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { - consume(StartedCompression.class, startedCompression); + public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { + consume(this::consumeStartedCompression, startedCompression); } + protected void consumeStartedCompression(StartedCompression startedCompression) { } + @Override - public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { - consume(FinishedCompression.class, finishedCompression); + public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { + consume(this::consumeFinishedCompression, finishedCompression); } + protected void consumeFinishedCompression(FinishedCompression finishedCompression) { } + @Override - public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { - consume(UnknownPeerNoteType.class, unknownPeerNoteType); + public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { + consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType); } + protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { } + @Override - public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, + public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, UnknownNodeIdentifier unknownNodeIdentifier) { - consume(UnknownNodeIdentifier.class, unknownNodeIdentifier); + consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier); } + protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { } + @Override - public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { - consume(ConfigData.class, configData); + public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { + consume(this::consumeConfigData, configData); } + protected void consumeConfigData(ConfigData configData) { } + @Override - public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { - consume(GetFailed.class, getFailed); + public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { + consume(this::consumeGetFailed, getFailed); } + protected void consumeGetFailed(GetFailed getFailed) { } + @Override - public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { - consume(PutFailed.class, putFailed); + public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { + consume(this::consumePutFailed, putFailed); } + protected void consumePutFailed(PutFailed putFailed) { } + @Override - public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { - consume(IdentifierCollision.class, identifierCollision); + public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { + consume(this::consumeIdentifierCollision, identifierCollision); } + protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { } + @Override - public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { - consume(PersistentPutDir.class, persistentPutDir); + public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { + consume(this::consumePersistentPutDir, persistentPutDir); } + protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { } + @Override - public void receivedPersistentRequestRemoved(FcpConnection fcpConnection, + public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection, PersistentRequestRemoved persistentRequestRemoved) { - consume(PersistentRequestRemoved.class, persistentRequestRemoved); + consume(this::consumePersistentRequestRemoved, persistentRequestRemoved); } + protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { } + @Override - public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { - consume(SubscribedUSKUpdate.class, subscribedUSKUpdate); + public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { + consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate); } + protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { } + @Override - public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { - consume(PluginInfo.class, pluginInfo); + public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { + consume(this::consumePluginInfo, pluginInfo); } + protected void consumePluginInfo(PluginInfo pluginInfo) { } + @Override - public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { - consume(FCPPluginReply.class, fcpPluginReply); + public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { + consume(this::consumeFCPPluginReply, fcpPluginReply); } + protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { } + @Override - public void receivedPersistentRequestModified(FcpConnection fcpConnection, + public final void receivedPersistentRequestModified(FcpConnection fcpConnection, PersistentRequestModified persistentRequestModified) { - consume(PersistentRequestModified.class, persistentRequestModified); + consume(this::consumePersistentRequestModified, persistentRequestModified); } + protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { } + @Override - public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { - consume(PutSuccessful.class, putSuccessful); + public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { + consume(this::consumePutSuccessful, putSuccessful); } + protected void consumePutSuccessful(PutSuccessful putSuccessful) { } + @Override - public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { - consume(PutFetchable.class, putFetchable); + public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { + consume(this::consumePutFetchable, putFetchable); } + protected void consumePutFetchable(PutFetchable putFetchable) { } + @Override - public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { - consume(SentFeed.class, sentFeed); + public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { + consume(this::consumeSentFeed, sentFeed); } + protected void consumeSentFeed(SentFeed sentFeed) { } + @Override - public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { - consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed); + public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { + consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed); } + protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { } + @Override - public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { - consume(ProtocolError.class, protocolError); + public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { + consume(this::consumeProtocolError, protocolError); } + protected void consumeProtocolError(ProtocolError protocolError) { } + @Override - public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { + public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { consumeUnknown(fcpMessage); } + protected void consumeUnknownMessage(FcpMessage fcpMessage) { } + @Override - public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { + public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { consumeClose(throwable); } + protected void consumeConnectionClosed(Throwable throwable) { } + } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java index fb849fe..07864ae 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java @@ -12,7 +12,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import net.pterodactylus.fcp.AllData; import net.pterodactylus.fcp.BaseMessage; @@ -66,317 +65,531 @@ public class FcpReplySequenceTest { private final FcpConnection fcpConnection = mock(FcpConnection.class); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection); + private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection); private final FcpMessage fcpMessage = new FcpMessage("Test"); @Test public void canSendMessage() throws IOException { - replyWaiter.send(fcpMessage); + FcpReplySequence replySequence = createBasicReplySequence(); + replySequence.send(fcpMessage); verify(fcpConnection).sendMessage(fcpMessage); } + private FcpReplySequence createBasicReplySequence() { + return new FcpReplySequence(executorService, fcpConnection) { + @Override + protected boolean isFinished() { + return true; + } + }; + } + @Test public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException { - replyWaiter.send(fcpMessage); - verify(fcpConnection).addFcpListener(replyWaiter); + FcpReplySequence replySequence = createBasicReplySequence(); + replySequence.send(fcpMessage); + verify(fcpConnection).addFcpListener(replySequence); } @Test public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException { - replyWaiter.send(fcpMessage); - replyWaiter.close(); - verify(fcpConnection).removeFcpListener(replyWaiter); + FcpReplySequence replySequence = createBasicReplySequence(); + replySequence.send(fcpMessage); + replySequence.close(); + verify(fcpConnection).removeFcpListener(replySequence); } - private void waitForASpecificMessage(MessageReceiver messageReceiver, - Class messageClass, Supplier message) throws IOException, InterruptedException, ExecutionException { - AtomicBoolean gotMessage = setupMessage(messageClass); - Future result = replyWaiter.send(fcpMessage); - sendMessage(messageReceiver, message.get()); - result.get(); - assertThat(gotMessage.get(), is(true)); + private void waitForASpecificMessage(MessageReceiver messageReceiver, Class messageClass, MessageCreator messageCreator) throws IOException, InterruptedException, ExecutionException { + waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName()))); } - private void sendMessage(MessageReceiver messageReceiver, M message) { - messageReceiver.receive(fcpConnection, message); + private void waitForASpecificMessage(MessageReceiver messageReceiver, M message) throws IOException, InterruptedException, ExecutionException { + replySequence.setExpectedMessage(message.getName()); + Future result = replySequence.send(fcpMessage); + messageReceiver.receiveMessage(fcpConnection, message); + assertThat(result.get(), is(true)); } - private interface MessageReceiver { - - void receive(FcpConnection fcpConnection, M message); + private M createMessage(Class messageClass, MessageCreator messageCreator) { + return messageCreator.create(new FcpMessage(messageClass.getSimpleName())); } - private AtomicBoolean setupMessage(Class messageClass) { - AtomicBoolean gotMessage = new AtomicBoolean(); - replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true)); - replyWaiter.waitFor(() -> gotMessage.get()); - return gotMessage; + private interface MessageCreator { + + M create(FcpMessage fcpMessage); + } @Test public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException { - waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class, - () -> new NodeHello(new FcpMessage("NodeHello"))); + waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new); } @Test - public void waitingForConnectionClosedDuplicateClientNameWorks() - throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName, - CloseConnectionDuplicateClientName.class, - () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName"))); + public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException { + waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new); } @Test public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class, - () -> new SSKKeypair(new FcpMessage("SSKKeypair"))); + waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new); } @Test public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer"))); + waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new); } @Test public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class, - () -> new EndListPeers(new FcpMessage("EndListPeers"))); + waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new); } @Test public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class, - () -> new PeerNote(new FcpMessage("PeerNote"))); + waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new); } @Test public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class, - () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes"))); + waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new); } @Test public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class, - () -> new PeerRemoved(new FcpMessage("PeerRemoved"))); + waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new); } @Test public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class, - () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "") - .put( - "ark.number", "0") - .put("auth.negTypes", "") - .put("version", "0,0,0,0") - .put("lastGoodVersion", "0,0,0,0"))); + waitForASpecificMessage(replySequence::receivedNodeData, new NodeData( + new FcpMessage("NodeData").put("ark.pubURI", "") + .put("ark.number", "0") + .put("auth.negTypes", "") + .put("version", "0,0,0,0") + .put("lastGoodVersion", "0,0,0,0"))); } @Test public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class, - () -> new TestDDAReply(new FcpMessage("TestDDAReply"))); + waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new); } @Test public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class, - () -> new TestDDAComplete(new FcpMessage("TestDDAComplete"))); + waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new); } @Test public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class, - () -> new PersistentGet(new FcpMessage("PersistentGet"))); + waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new); } @Test public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class, - () -> new PersistentPut(new FcpMessage("PersistentPut"))); + waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new); } @Test public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class, - () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests"))); + waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new); } @Test public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class, - () -> new URIGenerated(new FcpMessage("URIGenerated"))); + waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new); } @Test public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class, - () -> new DataFound(new FcpMessage("DataFound"))); + waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new); } @Test public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class, - () -> new AllData(new FcpMessage("AllData"), null)); + waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null)); } @Test public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class, - () -> new SimpleProgress(new FcpMessage("SimpleProgress"))); + waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new); } @Test public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class, - () -> new StartedCompression(new FcpMessage("StartedCompression"))); + waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new); } @Test public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class, - () -> new FinishedCompression(new FcpMessage("FinishedCompression"))); + waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new); } @Test public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, - () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType"))); + waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new); } @Test public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, - () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier"))); + waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new); } @Test public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class, - () -> new ConfigData(new FcpMessage("ConfigData"))); + waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new); } @Test public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class, - () -> new GetFailed(new FcpMessage("GetFailed"))); + waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new); } @Test public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class, - () -> new PutFailed(new FcpMessage("PutFailed"))); + waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new); } @Test public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class, - () -> new IdentifierCollision(new FcpMessage("IdentifierCollision"))); + waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new); } @Test public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class, - () -> new PersistentPutDir(new FcpMessage("PersistentPutDir"))); + waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new); } @Test public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, - () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved"))); + waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new); } @Test public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, - () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate"))); + waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new); } @Test public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class, - () -> new PluginInfo(new FcpMessage("PluginInfo"))); + waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new); } @Test public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class, - () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null)); + waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null)); } @Test public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class, - () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified"))); + waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new); } @Test public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class, - () -> new PutSuccessful(new FcpMessage("PutSuccessful"))); + waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new); } @Test public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class, - () -> new PutFetchable(new FcpMessage("PutFetchable"))); + waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new); } @Test public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class, - () -> new SentFeed(new FcpMessage("SentFeed"))); + waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new); } @Test public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class, - () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed"))); + waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new); } @Test public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class, - () -> new ProtocolError(new FcpMessage("ProtocolError"))); + waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new); } @Test public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException { - AtomicReference receivedMessage = new AtomicReference<>(); - replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message)); - replyWaiter.waitFor(() -> receivedMessage.get() != null); - Future result = replyWaiter.send(fcpMessage); - replyWaiter.receivedMessage(fcpConnection, fcpMessage); - result.get(); - assertThat(receivedMessage.get(), is(fcpMessage)); + replySequence.setExpectedMessage("SomeFcpMessage"); + Future result = replySequence.send(fcpMessage); + replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage")); + assertThat(result.get(), is(true)); } @Test public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException { - AtomicBoolean gotPutFailed = new AtomicBoolean(); - replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true)); - AtomicBoolean gotGetFailed = new AtomicBoolean(); - replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true)); - replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get()); - Future result = replyWaiter.send(fcpMessage); + TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) { + private final AtomicBoolean gotPutFailed = new AtomicBoolean(); + private final AtomicBoolean gotGetFailed = new AtomicBoolean(); + + @Override + protected boolean isFinished() { + return gotPutFailed.get() && gotGetFailed.get(); + } + + @Override + protected Boolean getResult() { + return isFinished(); + } + + @Override + protected void consumePutFailed(PutFailed putFailed) { + gotPutFailed.set(true); + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + gotGetFailed.set(true); + } + }; + Future result = replySequence.send(fcpMessage); assertThat(result.isDone(), is(false)); - replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed"))); + replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed"))); assertThat(result.isDone(), is(false)); - replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed"))); - result.get(); + replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed"))); + assertThat(result.get(), is(true)); } @Test public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException { - AtomicReference receivedThrowable = new AtomicReference<>(); - replyWaiter.handleClose().with((e) -> receivedThrowable.set(e)); - replyWaiter.waitFor(() -> receivedThrowable.get() != null); - Future result = replyWaiter.send(fcpMessage); + replySequence.setExpectedMessage("ConnectionClosed"); + Future result = replySequence.send(fcpMessage); Throwable throwable = new Throwable(); - replyWaiter.connectionClosed(fcpConnection, throwable); - result.get(); - assertThat(receivedThrowable.get(), is(throwable)); + replySequence.connectionClosed(fcpConnection, throwable); + assertThat(result.get(), is(true)); + assertThat(replySequence.receivedThrowable.get(), is(throwable)); + } + + @FunctionalInterface + private interface MessageReceiver { + + void receiveMessage(FcpConnection fcpConnection, M message); + + } + + private static class TestFcpReplySequence extends FcpReplySequence { + + private final AtomicReference gotMessage = new AtomicReference<>(); + private final AtomicReference expectedMessage = new AtomicReference<>(); + private final AtomicReference receivedThrowable = new AtomicReference<>(); + + public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { + super(executorService, fcpConnection); + } + + public void setExpectedMessage(String expectedMessage) { + this.expectedMessage.set(expectedMessage); + } + + @Override + protected boolean isFinished() { + return getResult(); + } + + @Override + protected Boolean getResult() { + return expectedMessage.get().equals(gotMessage.get()); + } + + @Override + protected void consumeNodeHello(NodeHello nodeHello) { + gotMessage.set(nodeHello.getName()); + } + + @Override + protected void consumeCloseConnectionDuplicateClientName( + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + gotMessage.set(closeConnectionDuplicateClientName.getName()); + } + + @Override + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { + gotMessage.set(sskKeypair.getName()); + } + + @Override + protected void consumePeer(Peer peer) { + gotMessage.set(peer.getName()); + } + + @Override + protected void consumeEndListPeers(EndListPeers endListPeers) { + gotMessage.set(endListPeers.getName()); + } + + @Override + protected void consumePeerNote(PeerNote peerNote) { + gotMessage.set(peerNote.getName()); + } + + @Override + protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { + gotMessage.set(endListPeerNotes.getName()); + } + + @Override + protected void consumePeerRemoved(PeerRemoved peerRemoved) { + gotMessage.set(peerRemoved.getName()); + } + + @Override + protected void consumeNodeData(NodeData nodeData) { + gotMessage.set(nodeData.getName()); + } + + @Override + protected void consumeTestDDAReply(TestDDAReply testDDAReply) { + gotMessage.set(testDDAReply.getName()); + } + + @Override + protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { + gotMessage.set(testDDAComplete.getName()); + } + + @Override + protected void consumePersistentGet(PersistentGet persistentGet) { + gotMessage.set(persistentGet.getName()); + } + + @Override + protected void consumePersistentPut(PersistentPut persistentPut) { + gotMessage.set(persistentPut.getName()); + } + + @Override + protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { + gotMessage.set(endListPersistentRequests.getName()); + } + + @Override + protected void consumeURIGenerated(URIGenerated uriGenerated) { + gotMessage.set(uriGenerated.getName()); + } + + @Override + protected void consumeDataFound(DataFound dataFound) { + gotMessage.set(dataFound.getName()); + } + + @Override + protected void consumeAllData(AllData allData) { + gotMessage.set(allData.getName()); + } + + @Override + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { + gotMessage.set(simpleProgress.getName()); + } + + @Override + protected void consumeStartedCompression(StartedCompression startedCompression) { + gotMessage.set(startedCompression.getName()); + } + + @Override + protected void consumeFinishedCompression(FinishedCompression finishedCompression) { + gotMessage.set(finishedCompression.getName()); + } + + @Override + protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { + gotMessage.set(unknownPeerNoteType.getName()); + } + + @Override + protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { + gotMessage.set(unknownNodeIdentifier.getName()); + } + + @Override + protected void consumeConfigData(ConfigData configData) { + gotMessage.set(configData.getName()); + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + gotMessage.set(getFailed.getName()); + } + + @Override + protected void consumePutFailed(PutFailed putFailed) { + gotMessage.set(putFailed.getName()); + } + + @Override + protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { + gotMessage.set(identifierCollision.getName()); + } + + @Override + protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { + gotMessage.set(persistentPutDir.getName()); + } + + @Override + protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { + gotMessage.set(persistentRequestRemoved.getName()); + } + + @Override + protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { + gotMessage.set(subscribedUSKUpdate.getName()); + } + + @Override + protected void consumePluginInfo(PluginInfo pluginInfo) { + gotMessage.set(pluginInfo.getName()); + } + + @Override + protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { + gotMessage.set(fcpPluginReply.getName()); + } + + @Override + protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { + gotMessage.set(persistentRequestModified.getName()); + } + + @Override + protected void consumePutSuccessful(PutSuccessful putSuccessful) { + gotMessage.set(putSuccessful.getName()); + } + + @Override + protected void consumePutFetchable(PutFetchable putFetchable) { + gotMessage.set(putFetchable.getName()); + } + + @Override + protected void consumeSentFeed(SentFeed sentFeed) { + gotMessage.set(sentFeed.getName()); + } + + @Override + protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { + gotMessage.set(receivedBookmarkFeed.getName()); + } + + @Override + protected void consumeProtocolError(ProtocolError protocolError) { + gotMessage.set(protocolError.getName()); + } + + @Override + protected void consumeUnknownMessage(FcpMessage fcpMessage) { + gotMessage.set(fcpMessage.getName()); + } + + @Override + protected void consumeConnectionClosed(Throwable throwable) { + receivedThrowable.set(throwable); + gotMessage.set("ConnectionClosed"); + } + } }