From: David ‘Bombe’ Roden Date: Sat, 11 Jul 2015 18:33:22 +0000 (+0200) Subject: Rename FcpReplySequence to FcpDialog X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=9f8674f7ad4b179b3a2cac9a24f1dc6152548bc1;p=jFCPlib.git Rename FcpReplySequence to FcpDialog --- diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/AddPeerCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/AddPeerCommandImpl.java index 5dfef6f..e1f0d5b 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/AddPeerCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/AddPeerCommandImpl.java @@ -19,7 +19,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Default {@link AddPeerCommand} implementation based on {@link FcpReplySequence}. + * Default {@link AddPeerCommand} implementation based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -72,7 +72,7 @@ public class AddPeerCommandImpl implements AddPeerCommand { } } - private class AddPeerSequence extends FcpReplySequence> { + private class AddPeerSequence extends FcpDialog> { private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicReference peer = new AtomicReference<>(); diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java index 55d3e1c..0c93fc3 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java @@ -3,21 +3,17 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.io.InputStream; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import net.pterodactylus.fcp.AllData; import net.pterodactylus.fcp.ClientGet; -import net.pterodactylus.fcp.FcpMessage; import net.pterodactylus.fcp.FcpUtils.TempInputStream; import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.ReturnType; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -86,8 +82,8 @@ class ClientGetCommandImpl implements ClientGetCommand { private Optional execute(String uri) throws InterruptedException, ExecutionException, IOException { ClientGet clientGet = createClientGetCommand(uri); - try (ClientGetReplySequence clientGetReplySequence = new ClientGetReplySequence()) { - return clientGetReplySequence.send(clientGet).get(); + try (ClientGetDialog clientGetDialog = new ClientGetDialog()) { + return clientGetDialog.send(clientGet).get(); } } @@ -115,7 +111,7 @@ class ClientGetCommandImpl implements ClientGetCommand { return clientGet; } - private class ClientGetReplySequence extends FcpReplySequence> { + private class ClientGetDialog extends FcpDialog> { private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicBoolean failed = new AtomicBoolean(); @@ -124,7 +120,7 @@ class ClientGetCommandImpl implements ClientGetCommand { private long dataLength; private InputStream payload; - public ClientGetReplySequence() throws IOException { + public ClientGetDialog() throws IOException { super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get()); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java index 4e3b51c..3ded685 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientHelloImpl.java @@ -3,11 +3,9 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import net.pterodactylus.fcp.ClientHello; -import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; import net.pterodactylus.fcp.NodeHello; @@ -16,7 +14,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Internal ClientHello implementation based on {@link FcpReplySequence}. + * Internal ClientHello implementation based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -46,7 +44,7 @@ public class ClientHelloImpl { FcpConnection connection = new FcpConnection(hostname, port); connection.connect(); ClientHello clientHello = new ClientHello(clientName.get(), "2.0"); - try (ClientHelloReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection)) { + try (ClientHelloDialog nodeHelloSequence = new ClientHelloDialog(connection)) { if (nodeHelloSequence.send(clientHello).get()) { return connection; } @@ -58,11 +56,11 @@ public class ClientHelloImpl { throw new IOException(String.format("Could not connect to %s:%d.", hostname, port)); } - private class ClientHelloReplySequence extends FcpReplySequence { + private class ClientHelloDialog extends FcpDialog { private final AtomicReference receivedNodeHello = new AtomicReference<>(); - public ClientHelloReplySequence(FcpConnection connection) { + public ClientHelloDialog(FcpConnection connection) { super(ClientHelloImpl.this.threadPool, connection); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java index df345f3..4f8ddde 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}. + * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -84,8 +84,8 @@ class ClientPutCommandImpl implements ClientPutCommand { private Optional execute(String uri) throws InterruptedException, ExecutionException, IOException { String identifier = new RandomIdentifierGenerator().generate(); ClientPut clientPut = createClientPutCommand(uri, identifier); - try (ClientPutReplySequence clientPutReplySequence = new ClientPutReplySequence()) { - return clientPutReplySequence.send(clientPut).get(); + try (ClientPutDialog clientPutDialog = new ClientPutDialog()) { + return clientPutDialog.send(clientPut).get(); } } @@ -123,14 +123,14 @@ class ClientPutCommandImpl implements ClientPutCommand { return clientPut; } - private class ClientPutReplySequence extends FcpReplySequence> { + private class ClientPutDialog extends FcpDialog> { private final AtomicReference originalClientPut = new AtomicReference<>(); private final AtomicReference directory = new AtomicReference<>(); private final AtomicReference finalKey = new AtomicReference<>(); private final AtomicBoolean putFinished = new AtomicBoolean(); - public ClientPutReplySequence() throws IOException { + public ClientPutDialog() throws IOException { super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get()); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java new file mode 100644 index 0000000..2aa003c --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpDialog.java @@ -0,0 +1,433 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.IOException; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; +import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; +import net.pterodactylus.fcp.ConfigData; +import net.pterodactylus.fcp.DataFound; +import net.pterodactylus.fcp.EndListPeerNotes; +import net.pterodactylus.fcp.EndListPeers; +import net.pterodactylus.fcp.EndListPersistentRequests; +import net.pterodactylus.fcp.FCPPluginReply; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpListener; +import net.pterodactylus.fcp.FcpMessage; +import net.pterodactylus.fcp.FinishedCompression; +import net.pterodactylus.fcp.GetFailed; +import net.pterodactylus.fcp.IdentifierCollision; +import net.pterodactylus.fcp.NodeData; +import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Peer; +import net.pterodactylus.fcp.PeerNote; +import net.pterodactylus.fcp.PeerRemoved; +import net.pterodactylus.fcp.PersistentGet; +import net.pterodactylus.fcp.PersistentPut; +import net.pterodactylus.fcp.PersistentPutDir; +import net.pterodactylus.fcp.PersistentRequestModified; +import net.pterodactylus.fcp.PersistentRequestRemoved; +import net.pterodactylus.fcp.PluginInfo; +import net.pterodactylus.fcp.ProtocolError; +import net.pterodactylus.fcp.PutFailed; +import net.pterodactylus.fcp.PutFetchable; +import net.pterodactylus.fcp.PutSuccessful; +import net.pterodactylus.fcp.ReceivedBookmarkFeed; +import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.SentFeed; +import net.pterodactylus.fcp.SimpleProgress; +import net.pterodactylus.fcp.StartedCompression; +import net.pterodactylus.fcp.SubscribedUSKUpdate; +import net.pterodactylus.fcp.TestDDAComplete; +import net.pterodactylus.fcp.TestDDAReply; +import net.pterodactylus.fcp.URIGenerated; +import net.pterodactylus.fcp.UnknownNodeIdentifier; +import net.pterodactylus.fcp.UnknownPeerNoteType; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies. + * + * @author David ‘Bombe’ Roden + */ +public abstract class FcpDialog implements AutoCloseable, FcpListener { + + private final Object syncObject = new Object(); + private final ListeningExecutorService executorService; + private final FcpConnection fcpConnection; + private final Queue messages = new ConcurrentLinkedQueue<>(); + private final AtomicReference identifier = new AtomicReference<>(); + private final AtomicBoolean connectionClosed = new AtomicBoolean(); + private final AtomicReference connectionFailureReason = new AtomicReference<>(); + + public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection) { + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.fcpConnection = fcpConnection; + } + + protected void setIdentifier(String identifier) { + this.identifier.set(identifier); + } + + protected abstract boolean isFinished(); + + public ListenableFuture send(FcpMessage fcpMessage) throws IOException { + setIdentifier(fcpMessage.getField("Identifier")); + fcpConnection.addFcpListener(this); + messages.add(fcpMessage); + return executorService.submit(() -> { + synchronized (syncObject) { + while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) { + while (messages.peek() != null) { + FcpMessage message = messages.poll(); + fcpConnection.sendMessage(message); + } + if (isFinished() || connectionClosed.get()) { + continue; + } + syncObject.wait(); + } + } + Throwable throwable = connectionFailureReason.get(); + if (throwable != null) { + throw new ExecutionException(throwable); + } + return getResult(); + }); + } + + protected void sendMessage(FcpMessage fcpMessage) { + messages.add(fcpMessage); + notifySyncObject(); + } + + private void notifySyncObject() { + synchronized (syncObject) { + syncObject.notifyAll(); + } + } + + protected R getResult() { + return null; + } + + @Override + public void close() { + fcpConnection.removeFcpListener(this); + } + + private void consume(Consumer consumer, M message) { + consume(consumer, message, "Identifier"); + } + + private void consume(Consumer consumer, M message, + String identifier) { + if (Objects.equals(message.getField(identifier), this.identifier.get())) { + consumeAlways(consumer, message); + } + } + + private void consumeAlways(Consumer consumer, M message) { + consumer.accept(message); + notifySyncObject(); + } + + private void consumeUnknown(FcpMessage fcpMessage) { + consumeUnknownMessage(fcpMessage); + notifySyncObject(); + } + + private void consumeClose(Throwable throwable) { + connectionFailureReason.set(throwable); + connectionClosed.set(true); + notifySyncObject(); + } + + @Override + public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { + consume(this::consumeNodeHello, nodeHello); + } + + protected void consumeNodeHello(NodeHello nodeHello) { } + + @Override + public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + connectionFailureReason.set(new IOException("duplicate client name")); + connectionClosed.set(true); + notifySyncObject(); + } + + @Override + public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { + consume(this::consumeSSKKeypair, sskKeypair); + } + + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { } + + @Override + public final void receivedPeer(FcpConnection fcpConnection, Peer peer) { + consume(this::consumePeer, peer); + } + + protected void consumePeer(Peer peer) { } + + @Override + public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { + consume(this::consumeEndListPeers, endListPeers); + } + + protected void consumeEndListPeers(EndListPeers endListPeers) { } + + @Override + public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { + consume(this::consumePeerNote, peerNote); + } + + protected void consumePeerNote(PeerNote peerNote) { } + + @Override + public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { + consume(this::consumeEndListPeerNotes, endListPeerNotes); + } + + protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { } + + @Override + public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { + consume(this::consumePeerRemoved, peerRemoved); + } + + protected void consumePeerRemoved(PeerRemoved peerRemoved) { } + + @Override + public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { + consume(this::consumeNodeData, nodeData); + } + + protected void consumeNodeData(NodeData nodeData) { } + + @Override + public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { + consume(this::consumeTestDDAReply, testDDAReply, "Directory"); + } + + protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } + + @Override + public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { + consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); + } + + protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } + + @Override + public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { + consume(this::consumePersistentGet, persistentGet); + } + + protected void consumePersistentGet(PersistentGet persistentGet) { } + + @Override + public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { + consume(this::consumePersistentPut, persistentPut); + } + + protected void consumePersistentPut(PersistentPut persistentPut) { } + + @Override + public final void receivedEndListPersistentRequests(FcpConnection fcpConnection, + EndListPersistentRequests endListPersistentRequests) { + consume(this::consumeEndListPersistentRequests, endListPersistentRequests); + } + + protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { } + + @Override + public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { + consume(this::consumeURIGenerated, uriGenerated); + } + + protected void consumeURIGenerated(URIGenerated uriGenerated) { } + + @Override + public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { + consume(this::consumeDataFound, dataFound); + } + + protected void consumeDataFound(DataFound dataFound) { } + + @Override + public final void receivedAllData(FcpConnection fcpConnection, AllData allData) { + consume(this::consumeAllData, allData); + } + + protected void consumeAllData(AllData allData) { } + + @Override + public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { + consume(this::consumeSimpleProgress, simpleProgress); + } + + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { } + + @Override + public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { + consume(this::consumeStartedCompression, startedCompression); + } + + protected void consumeStartedCompression(StartedCompression startedCompression) { } + + @Override + public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { + consume(this::consumeFinishedCompression, finishedCompression); + } + + protected void consumeFinishedCompression(FinishedCompression finishedCompression) { } + + @Override + public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { + consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType); + } + + protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { } + + @Override + public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, + UnknownNodeIdentifier unknownNodeIdentifier) { + consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier); + } + + protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { } + + @Override + public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { + consume(this::consumeConfigData, configData); + } + + protected void consumeConfigData(ConfigData configData) { } + + @Override + public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { + consume(this::consumeGetFailed, getFailed); + } + + protected void consumeGetFailed(GetFailed getFailed) { } + + @Override + public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { + consume(this::consumePutFailed, putFailed); + } + + protected void consumePutFailed(PutFailed putFailed) { } + + @Override + public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { + consume(this::consumeIdentifierCollision, identifierCollision); + } + + protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { } + + @Override + public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { + consume(this::consumePersistentPutDir, persistentPutDir); + } + + protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { } + + @Override + public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection, + PersistentRequestRemoved persistentRequestRemoved) { + consume(this::consumePersistentRequestRemoved, persistentRequestRemoved); + } + + protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { } + + @Override + public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { + consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate); + } + + protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { } + + @Override + public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { + consume(this::consumePluginInfo, pluginInfo); + } + + protected void consumePluginInfo(PluginInfo pluginInfo) { } + + @Override + public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { + consume(this::consumeFCPPluginReply, fcpPluginReply); + } + + protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { } + + @Override + public final void receivedPersistentRequestModified(FcpConnection fcpConnection, + PersistentRequestModified persistentRequestModified) { + consume(this::consumePersistentRequestModified, persistentRequestModified); + } + + protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { } + + @Override + public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { + consume(this::consumePutSuccessful, putSuccessful); + } + + protected void consumePutSuccessful(PutSuccessful putSuccessful) { } + + @Override + public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { + consume(this::consumePutFetchable, putFetchable); + } + + protected void consumePutFetchable(PutFetchable putFetchable) { } + + @Override + public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { + consume(this::consumeSentFeed, sentFeed); + } + + protected void consumeSentFeed(SentFeed sentFeed) { } + + @Override + public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { + consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed); + } + + protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { } + + @Override + public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { + consume(this::consumeProtocolError, protocolError); + } + + protected void consumeProtocolError(ProtocolError protocolError) { } + + @Override + public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { + consumeUnknown(fcpMessage); + } + + protected void consumeUnknownMessage(FcpMessage fcpMessage) { } + + @Override + public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { + consumeClose(throwable); + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java deleted file mode 100644 index 9b44b9a..0000000 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ /dev/null @@ -1,433 +0,0 @@ -package net.pterodactylus.fcp.quelaton; - -import java.io.IOException; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - -import net.pterodactylus.fcp.AllData; -import net.pterodactylus.fcp.BaseMessage; -import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; -import net.pterodactylus.fcp.ConfigData; -import net.pterodactylus.fcp.DataFound; -import net.pterodactylus.fcp.EndListPeerNotes; -import net.pterodactylus.fcp.EndListPeers; -import net.pterodactylus.fcp.EndListPersistentRequests; -import net.pterodactylus.fcp.FCPPluginReply; -import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpListener; -import net.pterodactylus.fcp.FcpMessage; -import net.pterodactylus.fcp.FinishedCompression; -import net.pterodactylus.fcp.GetFailed; -import net.pterodactylus.fcp.IdentifierCollision; -import net.pterodactylus.fcp.NodeData; -import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.Peer; -import net.pterodactylus.fcp.PeerNote; -import net.pterodactylus.fcp.PeerRemoved; -import net.pterodactylus.fcp.PersistentGet; -import net.pterodactylus.fcp.PersistentPut; -import net.pterodactylus.fcp.PersistentPutDir; -import net.pterodactylus.fcp.PersistentRequestModified; -import net.pterodactylus.fcp.PersistentRequestRemoved; -import net.pterodactylus.fcp.PluginInfo; -import net.pterodactylus.fcp.ProtocolError; -import net.pterodactylus.fcp.PutFailed; -import net.pterodactylus.fcp.PutFetchable; -import net.pterodactylus.fcp.PutSuccessful; -import net.pterodactylus.fcp.ReceivedBookmarkFeed; -import net.pterodactylus.fcp.SSKKeypair; -import net.pterodactylus.fcp.SentFeed; -import net.pterodactylus.fcp.SimpleProgress; -import net.pterodactylus.fcp.StartedCompression; -import net.pterodactylus.fcp.SubscribedUSKUpdate; -import net.pterodactylus.fcp.TestDDAComplete; -import net.pterodactylus.fcp.TestDDAReply; -import net.pterodactylus.fcp.URIGenerated; -import net.pterodactylus.fcp.UnknownNodeIdentifier; -import net.pterodactylus.fcp.UnknownPeerNoteType; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -/** - * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies. - * - * @author David ‘Bombe’ Roden - */ -public abstract class FcpReplySequence implements AutoCloseable, FcpListener { - - private final Object syncObject = new Object(); - private final ListeningExecutorService executorService; - private final FcpConnection fcpConnection; - private final Queue messages = new ConcurrentLinkedQueue<>(); - private final AtomicReference identifier = new AtomicReference<>(); - private final AtomicBoolean connectionClosed = new AtomicBoolean(); - private final AtomicReference connectionFailureReason = new AtomicReference<>(); - - public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { - this.executorService = MoreExecutors.listeningDecorator(executorService); - this.fcpConnection = fcpConnection; - } - - protected void setIdentifier(String identifier) { - this.identifier.set(identifier); - } - - protected abstract boolean isFinished(); - - public ListenableFuture send(FcpMessage fcpMessage) throws IOException { - setIdentifier(fcpMessage.getField("Identifier")); - fcpConnection.addFcpListener(this); - messages.add(fcpMessage); - return executorService.submit(() -> { - synchronized (syncObject) { - while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) { - while (messages.peek() != null) { - FcpMessage message = messages.poll(); - fcpConnection.sendMessage(message); - } - if (isFinished() || connectionClosed.get()) { - continue; - } - syncObject.wait(); - } - } - Throwable throwable = connectionFailureReason.get(); - if (throwable != null) { - throw new ExecutionException(throwable); - } - return getResult(); - }); - } - - protected void sendMessage(FcpMessage fcpMessage) { - messages.add(fcpMessage); - notifySyncObject(); - } - - private void notifySyncObject() { - synchronized (syncObject) { - syncObject.notifyAll(); - } - } - - protected R getResult() { - return null; - } - - @Override - public void close() { - fcpConnection.removeFcpListener(this); - } - - private void consume(Consumer consumer, M message) { - consume(consumer, message, "Identifier"); - } - - private void consume(Consumer consumer, M message, - String identifier) { - if (Objects.equals(message.getField(identifier), this.identifier.get())) { - consumeAlways(consumer, message); - } - } - - private void consumeAlways(Consumer consumer, M message) { - consumer.accept(message); - notifySyncObject(); - } - - private void consumeUnknown(FcpMessage fcpMessage) { - consumeUnknownMessage(fcpMessage); - notifySyncObject(); - } - - private void consumeClose(Throwable throwable) { - connectionFailureReason.set(throwable); - connectionClosed.set(true); - notifySyncObject(); - } - - @Override - public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { - consume(this::consumeNodeHello, nodeHello); - } - - protected void consumeNodeHello(NodeHello nodeHello) { } - - @Override - public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - connectionFailureReason.set(new IOException("duplicate client name")); - connectionClosed.set(true); - notifySyncObject(); - } - - @Override - public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { - consume(this::consumeSSKKeypair, sskKeypair); - } - - protected void consumeSSKKeypair(SSKKeypair sskKeypair) { } - - @Override - public final void receivedPeer(FcpConnection fcpConnection, Peer peer) { - consume(this::consumePeer, peer); - } - - protected void consumePeer(Peer peer) { } - - @Override - public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { - consume(this::consumeEndListPeers, endListPeers); - } - - protected void consumeEndListPeers(EndListPeers endListPeers) { } - - @Override - public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { - consume(this::consumePeerNote, peerNote); - } - - protected void consumePeerNote(PeerNote peerNote) { } - - @Override - public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { - consume(this::consumeEndListPeerNotes, endListPeerNotes); - } - - protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { } - - @Override - public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { - consume(this::consumePeerRemoved, peerRemoved); - } - - protected void consumePeerRemoved(PeerRemoved peerRemoved) { } - - @Override - public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { - consume(this::consumeNodeData, nodeData); - } - - protected void consumeNodeData(NodeData nodeData) { } - - @Override - public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(this::consumeTestDDAReply, testDDAReply, "Directory"); - } - - protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } - - @Override - public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); - } - - protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } - - @Override - public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { - consume(this::consumePersistentGet, persistentGet); - } - - protected void consumePersistentGet(PersistentGet persistentGet) { } - - @Override - public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { - consume(this::consumePersistentPut, persistentPut); - } - - protected void consumePersistentPut(PersistentPut persistentPut) { } - - @Override - public final void receivedEndListPersistentRequests(FcpConnection fcpConnection, - EndListPersistentRequests endListPersistentRequests) { - consume(this::consumeEndListPersistentRequests, endListPersistentRequests); - } - - protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { } - - @Override - public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { - consume(this::consumeURIGenerated, uriGenerated); - } - - protected void consumeURIGenerated(URIGenerated uriGenerated) { } - - @Override - public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { - consume(this::consumeDataFound, dataFound); - } - - protected void consumeDataFound(DataFound dataFound) { } - - @Override - public final void receivedAllData(FcpConnection fcpConnection, AllData allData) { - consume(this::consumeAllData, allData); - } - - protected void consumeAllData(AllData allData) { } - - @Override - public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { - consume(this::consumeSimpleProgress, simpleProgress); - } - - protected void consumeSimpleProgress(SimpleProgress simpleProgress) { } - - @Override - public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { - consume(this::consumeStartedCompression, startedCompression); - } - - protected void consumeStartedCompression(StartedCompression startedCompression) { } - - @Override - public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { - consume(this::consumeFinishedCompression, finishedCompression); - } - - protected void consumeFinishedCompression(FinishedCompression finishedCompression) { } - - @Override - public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { - consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType); - } - - protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { } - - @Override - public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, - UnknownNodeIdentifier unknownNodeIdentifier) { - consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier); - } - - protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { } - - @Override - public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { - consume(this::consumeConfigData, configData); - } - - protected void consumeConfigData(ConfigData configData) { } - - @Override - public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { - consume(this::consumeGetFailed, getFailed); - } - - protected void consumeGetFailed(GetFailed getFailed) { } - - @Override - public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { - consume(this::consumePutFailed, putFailed); - } - - protected void consumePutFailed(PutFailed putFailed) { } - - @Override - public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { - consume(this::consumeIdentifierCollision, identifierCollision); - } - - protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { } - - @Override - public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { - consume(this::consumePersistentPutDir, persistentPutDir); - } - - protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { } - - @Override - public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection, - PersistentRequestRemoved persistentRequestRemoved) { - consume(this::consumePersistentRequestRemoved, persistentRequestRemoved); - } - - protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { } - - @Override - public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { - consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate); - } - - protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { } - - @Override - public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { - consume(this::consumePluginInfo, pluginInfo); - } - - protected void consumePluginInfo(PluginInfo pluginInfo) { } - - @Override - public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { - consume(this::consumeFCPPluginReply, fcpPluginReply); - } - - protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { } - - @Override - public final void receivedPersistentRequestModified(FcpConnection fcpConnection, - PersistentRequestModified persistentRequestModified) { - consume(this::consumePersistentRequestModified, persistentRequestModified); - } - - protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { } - - @Override - public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { - consume(this::consumePutSuccessful, putSuccessful); - } - - protected void consumePutSuccessful(PutSuccessful putSuccessful) { } - - @Override - public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { - consume(this::consumePutFetchable, putFetchable); - } - - protected void consumePutFetchable(PutFetchable putFetchable) { } - - @Override - public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { - consume(this::consumeSentFeed, sentFeed); - } - - protected void consumeSentFeed(SentFeed sentFeed) { } - - @Override - public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { - consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed); - } - - protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { } - - @Override - public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { - consume(this::consumeProtocolError, protocolError); - } - - protected void consumeProtocolError(ProtocolError protocolError) { } - - @Override - public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { - consumeUnknown(fcpMessage); - } - - protected void consumeUnknownMessage(FcpMessage fcpMessage) { } - - @Override - public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { - consumeClose(throwable); - } - -} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommandImpl.java index 564fa71..5f8cd5b 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommandImpl.java @@ -34,16 +34,16 @@ class GenerateKeypairCommandImpl implements GenerateKeypairCommand { } private FcpKeyPair executeSequence() throws InterruptedException, ExecutionException, IOException { - try (FcpKeyPairReplySequence fcpKeyPairReplySequence = new FcpKeyPairReplySequence()) { - return fcpKeyPairReplySequence.send(new GenerateSSK()).get(); + try (FcpKeyPairDialog fcpKeyPairDialog = new FcpKeyPairDialog()) { + return fcpKeyPairDialog.send(new GenerateSSK()).get(); } } - private class FcpKeyPairReplySequence extends FcpReplySequence { + private class FcpKeyPairDialog extends FcpDialog { private AtomicReference keyPair = new AtomicReference<>(); - public FcpKeyPairReplySequence() throws IOException { + public FcpKeyPairDialog() throws IOException { super(GenerateKeypairCommandImpl.this.threadPool, GenerateKeypairCommandImpl.this.connectionSupplier.get()); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/GetNodeCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/GetNodeCommandImpl.java index 2447179..025cddc 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/GetNodeCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/GetNodeCommandImpl.java @@ -14,7 +14,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Default {@link GetNodeCommandImpl} implementation based on {@link FcpReplySequence}. + * Default {@link GetNodeCommandImpl} implementation based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -57,16 +57,16 @@ public class GetNodeCommandImpl implements GetNodeCommand { private NodeData executeSequence() throws InterruptedException, ExecutionException, IOException { GetNode getNode = new GetNode(new RandomIdentifierGenerator().generate(), giveOpennetRef.get(), includePrivate.get(), includeVolatile.get()); - try (GetNodeReplySequence getNodeReplySequence = new GetNodeReplySequence()) { - return getNodeReplySequence.send(getNode).get(); + try (GetNodeDialog getNodeDialog = new GetNodeDialog()) { + return getNodeDialog.send(getNode).get(); } } - private class GetNodeReplySequence extends FcpReplySequence { + private class GetNodeDialog extends FcpDialog { private final AtomicReference nodeData = new AtomicReference<>(); - public GetNodeReplySequence() throws IOException { + public GetNodeDialog() throws IOException { super(threadPool, connectionSupplier.get()); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ListPeerCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ListPeerCommandImpl.java index dd91133..44648c3 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ListPeerCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ListPeerCommandImpl.java @@ -14,7 +14,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; /** - * Default {@link ListPeerCommand} implementation based on {@link FcpReplySequence}. + * Default {@link ListPeerCommand} implementation based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -58,7 +58,7 @@ public class ListPeerCommandImpl implements ListPeerCommand { } } - private class ListPeerSequence extends FcpReplySequence { + private class ListPeerSequence extends FcpDialog { private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicReference peer = new AtomicReference<>(); diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ListPeersCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ListPeersCommandImpl.java index 3d3baed..921c0fa 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ListPeersCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ListPeersCommandImpl.java @@ -16,7 +16,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Default {@link ListPeersCommand} implementation based on {@link FcpReplySequence}. + * Default {@link ListPeersCommand} implementation based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -52,17 +52,17 @@ public class ListPeersCommandImpl implements ListPeersCommand { private Collection executeSequence() throws InterruptedException, ExecutionException, IOException { String identifier = new RandomIdentifierGenerator().generate(); ListPeers listPeers = new ListPeers(identifier, includeMetadata.get(), includeVolatile.get()); - try (ListPeersReplySequence listPeersReplySequence = new ListPeersReplySequence()) { - return listPeersReplySequence.send(listPeers).get(); + try (ListPeersDialog listPeersDialog = new ListPeersDialog()) { + return listPeersDialog.send(listPeers).get(); } } - private class ListPeersReplySequence extends FcpReplySequence> { + private class ListPeersDialog extends FcpDialog> { private final Collection peers = new HashSet<>(); private final AtomicBoolean finished = new AtomicBoolean(false); - public ListPeersReplySequence() throws IOException { + public ListPeersDialog() throws IOException { super(threadPool, connectionSupplier.get()); } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java new file mode 100644 index 0000000..ee04e6f --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/quelaton/FcpDialogTest.java @@ -0,0 +1,594 @@ +package net.pterodactylus.fcp.quelaton; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; +import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; +import net.pterodactylus.fcp.ConfigData; +import net.pterodactylus.fcp.DataFound; +import net.pterodactylus.fcp.EndListPeerNotes; +import net.pterodactylus.fcp.EndListPeers; +import net.pterodactylus.fcp.EndListPersistentRequests; +import net.pterodactylus.fcp.FCPPluginReply; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpMessage; +import net.pterodactylus.fcp.FinishedCompression; +import net.pterodactylus.fcp.GetFailed; +import net.pterodactylus.fcp.IdentifierCollision; +import net.pterodactylus.fcp.NodeData; +import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Peer; +import net.pterodactylus.fcp.PeerNote; +import net.pterodactylus.fcp.PeerRemoved; +import net.pterodactylus.fcp.PersistentGet; +import net.pterodactylus.fcp.PersistentPut; +import net.pterodactylus.fcp.PersistentPutDir; +import net.pterodactylus.fcp.PersistentRequestModified; +import net.pterodactylus.fcp.PersistentRequestRemoved; +import net.pterodactylus.fcp.PluginInfo; +import net.pterodactylus.fcp.ProtocolError; +import net.pterodactylus.fcp.PutFailed; +import net.pterodactylus.fcp.PutFetchable; +import net.pterodactylus.fcp.PutSuccessful; +import net.pterodactylus.fcp.ReceivedBookmarkFeed; +import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.SentFeed; +import net.pterodactylus.fcp.SimpleProgress; +import net.pterodactylus.fcp.StartedCompression; +import net.pterodactylus.fcp.SubscribedUSKUpdate; +import net.pterodactylus.fcp.TestDDAComplete; +import net.pterodactylus.fcp.TestDDAReply; +import net.pterodactylus.fcp.URIGenerated; +import net.pterodactylus.fcp.UnknownNodeIdentifier; +import net.pterodactylus.fcp.UnknownPeerNoteType; + +import org.junit.Test; + +/** + * Unit test for {@link FcpDialog}. + * + * @author David ‘Bombe’ Roden + */ +public class FcpDialogTest { + + private final FcpConnection fcpConnection = mock(FcpConnection.class); + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final TestFcpDialog dialog = new TestFcpDialog(executorService, fcpConnection); + private final FcpMessage fcpMessage = new FcpMessage("Test"); + + @Test + public void canSendMessage() throws IOException, ExecutionException, InterruptedException { + FcpDialog dialog = createBasicDialog(); + dialog.send(fcpMessage).get(); + verify(fcpConnection).sendMessage(fcpMessage); + } + + private FcpDialog createBasicDialog() { + return new FcpDialog(executorService, fcpConnection) { + @Override + protected boolean isFinished() { + return true; + } + }; + } + + @Test + public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException { + FcpDialog dialog = createBasicDialog(); + dialog.send(fcpMessage); + verify(fcpConnection).addFcpListener(dialog); + } + + @Test + public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException { + FcpDialog dialog = createBasicDialog(); + dialog.send(fcpMessage); + dialog.close(); + verify(fcpConnection).removeFcpListener(dialog); + } + + private void waitForASpecificMessage(MessageReceiver messageReceiver, Class messageClass, MessageCreator messageCreator) throws IOException, InterruptedException, ExecutionException { + waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName()))); + } + + private void waitForASpecificMessage(MessageReceiver messageReceiver, M message) throws IOException, InterruptedException, ExecutionException { + dialog.setExpectedMessage(message.getName()); + Future result = dialog.send(fcpMessage); + messageReceiver.receiveMessage(fcpConnection, message); + assertThat(result.get(), is(true)); + } + + private M createMessage(Class messageClass, MessageCreator messageCreator) { + return messageCreator.create(new FcpMessage(messageClass.getSimpleName())); + } + + private interface MessageCreator { + + M create(FcpMessage fcpMessage); + + } + + @Test + public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException { + waitForASpecificMessage(dialog::receivedNodeHello, NodeHello.class, NodeHello::new); + } + + @Test(expected = ExecutionException.class) + public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException { + dialog.setExpectedMessage(""); + Future result = dialog.send(fcpMessage); + dialog.receivedCloseConnectionDuplicateClientName(fcpConnection, + new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName"))); + result.get(); + } + + @Test + public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new); + } + + @Test + public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPeer, Peer.class, Peer::new); + } + + @Test + public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedEndListPeers, EndListPeers.class, EndListPeers::new); + } + + @Test + public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPeerNote, PeerNote.class, PeerNote::new); + } + + @Test + public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new); + } + + @Test + public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new); + } + + @Test + public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedNodeData, new NodeData( + new FcpMessage("NodeData").put("ark.pubURI", "") + .put("ark.number", "0") + .put("auth.negTypes", "") + .put("version", "0,0,0,0") + .put("lastGoodVersion", "0,0,0,0"))); + } + + @Test + public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new); + } + + @Test + public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new); + } + + @Test + public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPersistentGet, PersistentGet.class, PersistentGet::new); + } + + @Test + public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPersistentPut, PersistentPut.class, PersistentPut::new); + } + + @Test + public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new); + } + + @Test + public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedURIGenerated, URIGenerated.class, URIGenerated::new); + } + + @Test + public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedDataFound, DataFound.class, DataFound::new); + } + + @Test + public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedAllData, new AllData(new FcpMessage("AllData"), null)); + } + + @Test + public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new); + } + + @Test + public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedStartedCompression, StartedCompression.class, StartedCompression::new); + } + + @Test + public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new); + } + + @Test + public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new); + } + + @Test + public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new); + } + + @Test + public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedConfigData, ConfigData.class, ConfigData::new); + } + + @Test + public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedGetFailed, GetFailed.class, GetFailed::new); + } + + @Test + public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPutFailed, PutFailed.class, PutFailed::new); + } + + @Test + public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new); + } + + @Test + public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new); + } + + @Test + public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new); + } + + @Test + public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new); + } + + @Test + public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPluginInfo, PluginInfo.class, PluginInfo::new); + } + + @Test + public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null)); + } + + @Test + public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new); + } + + @Test + public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new); + } + + @Test + public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedPutFetchable, PutFetchable.class, PutFetchable::new); + } + + @Test + public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedSentFeed, SentFeed.class, SentFeed::new); + } + + @Test + public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new); + } + + @Test + public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(dialog::receivedProtocolError, ProtocolError.class, ProtocolError::new); + } + + @Test + public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException { + dialog.setExpectedMessage("SomeFcpMessage"); + Future result = dialog.send(fcpMessage); + dialog.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage")); + assertThat(result.get(), is(true)); + } + + @Test + public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException { + TestFcpDialog testFcpDialog = new TestFcpDialog(executorService, fcpConnection) { + private final AtomicBoolean gotPutFailed = new AtomicBoolean(); + private final AtomicBoolean gotGetFailed = new AtomicBoolean(); + + @Override + protected boolean isFinished() { + return gotPutFailed.get() && gotGetFailed.get(); + } + + @Override + protected Boolean getResult() { + return isFinished(); + } + + @Override + protected void consumePutFailed(PutFailed putFailed) { + gotPutFailed.set(true); + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + gotGetFailed.set(true); + } + }; + Future result = testFcpDialog.send(fcpMessage); + assertThat(result.isDone(), is(false)); + testFcpDialog.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed"))); + assertThat(result.isDone(), is(false)); + testFcpDialog.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed"))); + assertThat(result.get(), is(true)); + } + + @Test + public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException { + dialog.setExpectedMessage("none"); + Future result = dialog.send(fcpMessage); + Throwable throwable = new Throwable(); + dialog.connectionClosed(fcpConnection, throwable); + try { + result.get(); + } catch (ExecutionException e) { + Throwable t = e; + while (t.getCause() != null) { + t = t.getCause(); + } + assertThat(t, sameInstance(throwable)); + } + } + + @FunctionalInterface + private interface MessageReceiver { + + void receiveMessage(FcpConnection fcpConnection, M message); + + } + + private static class TestFcpDialog extends FcpDialog { + + private final AtomicReference gotMessage = new AtomicReference<>(); + private final AtomicReference expectedMessage = new AtomicReference<>(); + + public TestFcpDialog(ExecutorService executorService, FcpConnection fcpConnection) { + super(executorService, fcpConnection); + } + + public void setExpectedMessage(String expectedMessage) { + this.expectedMessage.set(expectedMessage); + } + + @Override + protected boolean isFinished() { + return getResult(); + } + + @Override + protected Boolean getResult() { + return expectedMessage.get().equals(gotMessage.get()); + } + + @Override + protected void consumeNodeHello(NodeHello nodeHello) { + gotMessage.set(nodeHello.getName()); + } + + @Override + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { + gotMessage.set(sskKeypair.getName()); + } + + @Override + protected void consumePeer(Peer peer) { + gotMessage.set(peer.getName()); + } + + @Override + protected void consumeEndListPeers(EndListPeers endListPeers) { + gotMessage.set(endListPeers.getName()); + } + + @Override + protected void consumePeerNote(PeerNote peerNote) { + gotMessage.set(peerNote.getName()); + } + + @Override + protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { + gotMessage.set(endListPeerNotes.getName()); + } + + @Override + protected void consumePeerRemoved(PeerRemoved peerRemoved) { + gotMessage.set(peerRemoved.getName()); + } + + @Override + protected void consumeNodeData(NodeData nodeData) { + gotMessage.set(nodeData.getName()); + } + + @Override + protected void consumeTestDDAReply(TestDDAReply testDDAReply) { + gotMessage.set(testDDAReply.getName()); + } + + @Override + protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { + gotMessage.set(testDDAComplete.getName()); + } + + @Override + protected void consumePersistentGet(PersistentGet persistentGet) { + gotMessage.set(persistentGet.getName()); + } + + @Override + protected void consumePersistentPut(PersistentPut persistentPut) { + gotMessage.set(persistentPut.getName()); + } + + @Override + protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { + gotMessage.set(endListPersistentRequests.getName()); + } + + @Override + protected void consumeURIGenerated(URIGenerated uriGenerated) { + gotMessage.set(uriGenerated.getName()); + } + + @Override + protected void consumeDataFound(DataFound dataFound) { + gotMessage.set(dataFound.getName()); + } + + @Override + protected void consumeAllData(AllData allData) { + gotMessage.set(allData.getName()); + } + + @Override + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { + gotMessage.set(simpleProgress.getName()); + } + + @Override + protected void consumeStartedCompression(StartedCompression startedCompression) { + gotMessage.set(startedCompression.getName()); + } + + @Override + protected void consumeFinishedCompression(FinishedCompression finishedCompression) { + gotMessage.set(finishedCompression.getName()); + } + + @Override + protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { + gotMessage.set(unknownPeerNoteType.getName()); + } + + @Override + protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { + gotMessage.set(unknownNodeIdentifier.getName()); + } + + @Override + protected void consumeConfigData(ConfigData configData) { + gotMessage.set(configData.getName()); + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + gotMessage.set(getFailed.getName()); + } + + @Override + protected void consumePutFailed(PutFailed putFailed) { + gotMessage.set(putFailed.getName()); + } + + @Override + protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { + gotMessage.set(identifierCollision.getName()); + } + + @Override + protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { + gotMessage.set(persistentPutDir.getName()); + } + + @Override + protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { + gotMessage.set(persistentRequestRemoved.getName()); + } + + @Override + protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { + gotMessage.set(subscribedUSKUpdate.getName()); + } + + @Override + protected void consumePluginInfo(PluginInfo pluginInfo) { + gotMessage.set(pluginInfo.getName()); + } + + @Override + protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { + gotMessage.set(fcpPluginReply.getName()); + } + + @Override + protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { + gotMessage.set(persistentRequestModified.getName()); + } + + @Override + protected void consumePutSuccessful(PutSuccessful putSuccessful) { + gotMessage.set(putSuccessful.getName()); + } + + @Override + protected void consumePutFetchable(PutFetchable putFetchable) { + gotMessage.set(putFetchable.getName()); + } + + @Override + protected void consumeSentFeed(SentFeed sentFeed) { + gotMessage.set(sentFeed.getName()); + } + + @Override + protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { + gotMessage.set(receivedBookmarkFeed.getName()); + } + + @Override + protected void consumeProtocolError(ProtocolError protocolError) { + gotMessage.set(protocolError.getName()); + } + + @Override + protected void consumeUnknownMessage(FcpMessage fcpMessage) { + gotMessage.set(fcpMessage.getName()); + } + + } + +} diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java deleted file mode 100644 index 9300ee8..0000000 --- a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java +++ /dev/null @@ -1,595 +0,0 @@ -package net.pterodactylus.fcp.quelaton; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import net.pterodactylus.fcp.AllData; -import net.pterodactylus.fcp.BaseMessage; -import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; -import net.pterodactylus.fcp.ConfigData; -import net.pterodactylus.fcp.DataFound; -import net.pterodactylus.fcp.EndListPeerNotes; -import net.pterodactylus.fcp.EndListPeers; -import net.pterodactylus.fcp.EndListPersistentRequests; -import net.pterodactylus.fcp.FCPPluginReply; -import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpMessage; -import net.pterodactylus.fcp.FinishedCompression; -import net.pterodactylus.fcp.GetFailed; -import net.pterodactylus.fcp.IdentifierCollision; -import net.pterodactylus.fcp.NodeData; -import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.Peer; -import net.pterodactylus.fcp.PeerNote; -import net.pterodactylus.fcp.PeerRemoved; -import net.pterodactylus.fcp.PersistentGet; -import net.pterodactylus.fcp.PersistentPut; -import net.pterodactylus.fcp.PersistentPutDir; -import net.pterodactylus.fcp.PersistentRequestModified; -import net.pterodactylus.fcp.PersistentRequestRemoved; -import net.pterodactylus.fcp.PluginInfo; -import net.pterodactylus.fcp.ProtocolError; -import net.pterodactylus.fcp.PutFailed; -import net.pterodactylus.fcp.PutFetchable; -import net.pterodactylus.fcp.PutSuccessful; -import net.pterodactylus.fcp.ReceivedBookmarkFeed; -import net.pterodactylus.fcp.SSKKeypair; -import net.pterodactylus.fcp.SentFeed; -import net.pterodactylus.fcp.SimpleProgress; -import net.pterodactylus.fcp.StartedCompression; -import net.pterodactylus.fcp.SubscribedUSKUpdate; -import net.pterodactylus.fcp.TestDDAComplete; -import net.pterodactylus.fcp.TestDDAReply; -import net.pterodactylus.fcp.URIGenerated; -import net.pterodactylus.fcp.UnknownNodeIdentifier; -import net.pterodactylus.fcp.UnknownPeerNoteType; - -import org.junit.Test; - -/** - * Unit test for {@link FcpReplySequence}. - * - * @author David ‘Bombe’ Roden - */ -public class FcpReplySequenceTest { - - private final FcpConnection fcpConnection = mock(FcpConnection.class); - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection); - private final FcpMessage fcpMessage = new FcpMessage("Test"); - - @Test - public void canSendMessage() throws IOException, ExecutionException, InterruptedException { - FcpReplySequence replySequence = createBasicReplySequence(); - replySequence.send(fcpMessage).get(); - verify(fcpConnection).sendMessage(fcpMessage); - } - - private FcpReplySequence createBasicReplySequence() { - return new FcpReplySequence(executorService, fcpConnection) { - @Override - protected boolean isFinished() { - return true; - } - }; - } - - @Test - public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException { - FcpReplySequence replySequence = createBasicReplySequence(); - replySequence.send(fcpMessage); - verify(fcpConnection).addFcpListener(replySequence); - } - - @Test - public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException { - FcpReplySequence replySequence = createBasicReplySequence(); - replySequence.send(fcpMessage); - replySequence.close(); - verify(fcpConnection).removeFcpListener(replySequence); - } - - private void waitForASpecificMessage(MessageReceiver messageReceiver, Class messageClass, MessageCreator messageCreator) throws IOException, InterruptedException, ExecutionException { - waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName()))); - } - - private void waitForASpecificMessage(MessageReceiver messageReceiver, M message) throws IOException, InterruptedException, ExecutionException { - replySequence.setExpectedMessage(message.getName()); - Future result = replySequence.send(fcpMessage); - messageReceiver.receiveMessage(fcpConnection, message); - assertThat(result.get(), is(true)); - } - - private M createMessage(Class messageClass, MessageCreator messageCreator) { - return messageCreator.create(new FcpMessage(messageClass.getSimpleName())); - } - - private interface MessageCreator { - - M create(FcpMessage fcpMessage); - - } - - @Test - public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException { - waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new); - } - - @Test(expected = ExecutionException.class) - public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException { - replySequence.setExpectedMessage(""); - Future result = replySequence.send(fcpMessage); - replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection, - new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName"))); - result.get(); - } - - @Test - public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new); - } - - @Test - public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new); - } - - @Test - public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new); - } - - @Test - public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new); - } - - @Test - public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new); - } - - @Test - public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new); - } - - @Test - public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedNodeData, new NodeData( - new FcpMessage("NodeData").put("ark.pubURI", "") - .put("ark.number", "0") - .put("auth.negTypes", "") - .put("version", "0,0,0,0") - .put("lastGoodVersion", "0,0,0,0"))); - } - - @Test - public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new); - } - - @Test - public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new); - } - - @Test - public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new); - } - - @Test - public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new); - } - - @Test - public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new); - } - - @Test - public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new); - } - - @Test - public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new); - } - - @Test - public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null)); - } - - @Test - public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new); - } - - @Test - public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new); - } - - @Test - public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new); - } - - @Test - public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new); - } - - @Test - public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new); - } - - @Test - public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new); - } - - @Test - public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new); - } - - @Test - public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new); - } - - @Test - public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new); - } - - @Test - public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new); - } - - @Test - public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new); - } - - @Test - public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new); - } - - @Test - public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new); - } - - @Test - public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null)); - } - - @Test - public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new); - } - - @Test - public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new); - } - - @Test - public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new); - } - - @Test - public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new); - } - - @Test - public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new); - } - - @Test - public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException { - waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new); - } - - @Test - public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException { - replySequence.setExpectedMessage("SomeFcpMessage"); - Future result = replySequence.send(fcpMessage); - replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage")); - assertThat(result.get(), is(true)); - } - - @Test - public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException { - TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) { - private final AtomicBoolean gotPutFailed = new AtomicBoolean(); - private final AtomicBoolean gotGetFailed = new AtomicBoolean(); - - @Override - protected boolean isFinished() { - return gotPutFailed.get() && gotGetFailed.get(); - } - - @Override - protected Boolean getResult() { - return isFinished(); - } - - @Override - protected void consumePutFailed(PutFailed putFailed) { - gotPutFailed.set(true); - } - - @Override - protected void consumeGetFailed(GetFailed getFailed) { - gotGetFailed.set(true); - } - }; - Future result = replySequence.send(fcpMessage); - assertThat(result.isDone(), is(false)); - replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed"))); - assertThat(result.isDone(), is(false)); - replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed"))); - assertThat(result.get(), is(true)); - } - - @Test - public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException { - replySequence.setExpectedMessage("none"); - Future result = replySequence.send(fcpMessage); - Throwable throwable = new Throwable(); - replySequence.connectionClosed(fcpConnection, throwable); - try { - result.get(); - } catch (ExecutionException e) { - Throwable t = e; - while (t.getCause() != null) { - t = t.getCause(); - } - assertThat(t, sameInstance(throwable)); - } - } - - @FunctionalInterface - private interface MessageReceiver { - - void receiveMessage(FcpConnection fcpConnection, M message); - - } - - private static class TestFcpReplySequence extends FcpReplySequence { - - private final AtomicReference gotMessage = new AtomicReference<>(); - private final AtomicReference expectedMessage = new AtomicReference<>(); - - public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { - super(executorService, fcpConnection); - } - - public void setExpectedMessage(String expectedMessage) { - this.expectedMessage.set(expectedMessage); - } - - @Override - protected boolean isFinished() { - return getResult(); - } - - @Override - protected Boolean getResult() { - return expectedMessage.get().equals(gotMessage.get()); - } - - @Override - protected void consumeNodeHello(NodeHello nodeHello) { - gotMessage.set(nodeHello.getName()); - } - - @Override - protected void consumeSSKKeypair(SSKKeypair sskKeypair) { - gotMessage.set(sskKeypair.getName()); - } - - @Override - protected void consumePeer(Peer peer) { - gotMessage.set(peer.getName()); - } - - @Override - protected void consumeEndListPeers(EndListPeers endListPeers) { - gotMessage.set(endListPeers.getName()); - } - - @Override - protected void consumePeerNote(PeerNote peerNote) { - gotMessage.set(peerNote.getName()); - } - - @Override - protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { - gotMessage.set(endListPeerNotes.getName()); - } - - @Override - protected void consumePeerRemoved(PeerRemoved peerRemoved) { - gotMessage.set(peerRemoved.getName()); - } - - @Override - protected void consumeNodeData(NodeData nodeData) { - gotMessage.set(nodeData.getName()); - } - - @Override - protected void consumeTestDDAReply(TestDDAReply testDDAReply) { - gotMessage.set(testDDAReply.getName()); - } - - @Override - protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { - gotMessage.set(testDDAComplete.getName()); - } - - @Override - protected void consumePersistentGet(PersistentGet persistentGet) { - gotMessage.set(persistentGet.getName()); - } - - @Override - protected void consumePersistentPut(PersistentPut persistentPut) { - gotMessage.set(persistentPut.getName()); - } - - @Override - protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { - gotMessage.set(endListPersistentRequests.getName()); - } - - @Override - protected void consumeURIGenerated(URIGenerated uriGenerated) { - gotMessage.set(uriGenerated.getName()); - } - - @Override - protected void consumeDataFound(DataFound dataFound) { - gotMessage.set(dataFound.getName()); - } - - @Override - protected void consumeAllData(AllData allData) { - gotMessage.set(allData.getName()); - } - - @Override - protected void consumeSimpleProgress(SimpleProgress simpleProgress) { - gotMessage.set(simpleProgress.getName()); - } - - @Override - protected void consumeStartedCompression(StartedCompression startedCompression) { - gotMessage.set(startedCompression.getName()); - } - - @Override - protected void consumeFinishedCompression(FinishedCompression finishedCompression) { - gotMessage.set(finishedCompression.getName()); - } - - @Override - protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { - gotMessage.set(unknownPeerNoteType.getName()); - } - - @Override - protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { - gotMessage.set(unknownNodeIdentifier.getName()); - } - - @Override - protected void consumeConfigData(ConfigData configData) { - gotMessage.set(configData.getName()); - } - - @Override - protected void consumeGetFailed(GetFailed getFailed) { - gotMessage.set(getFailed.getName()); - } - - @Override - protected void consumePutFailed(PutFailed putFailed) { - gotMessage.set(putFailed.getName()); - } - - @Override - protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { - gotMessage.set(identifierCollision.getName()); - } - - @Override - protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { - gotMessage.set(persistentPutDir.getName()); - } - - @Override - protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { - gotMessage.set(persistentRequestRemoved.getName()); - } - - @Override - protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { - gotMessage.set(subscribedUSKUpdate.getName()); - } - - @Override - protected void consumePluginInfo(PluginInfo pluginInfo) { - gotMessage.set(pluginInfo.getName()); - } - - @Override - protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { - gotMessage.set(fcpPluginReply.getName()); - } - - @Override - protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { - gotMessage.set(persistentRequestModified.getName()); - } - - @Override - protected void consumePutSuccessful(PutSuccessful putSuccessful) { - gotMessage.set(putSuccessful.getName()); - } - - @Override - protected void consumePutFetchable(PutFetchable putFetchable) { - gotMessage.set(putFetchable.getName()); - } - - @Override - protected void consumeSentFeed(SentFeed sentFeed) { - gotMessage.set(sentFeed.getName()); - } - - @Override - protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { - gotMessage.set(receivedBookmarkFeed.getName()); - } - - @Override - protected void consumeProtocolError(ProtocolError protocolError) { - gotMessage.set(protocolError.getName()); - } - - @Override - protected void consumeUnknownMessage(FcpMessage fcpMessage) { - gotMessage.set(fcpMessage.getName()); - } - - } - -}