import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link AddPeerCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link AddPeerCommand} implementation based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
*/
}
}
- private class AddPeerSequence extends FcpReplySequence<Optional<Peer>> {
+ private class AddPeerSequence extends FcpDialog<Optional<Peer>> {
private final AtomicBoolean finished = new AtomicBoolean();
private final AtomicReference<Peer> peer = new AtomicReference<>();
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import net.pterodactylus.fcp.AllData;
import net.pterodactylus.fcp.ClientGet;
-import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.FcpUtils.TempInputStream;
import net.pterodactylus.fcp.GetFailed;
import net.pterodactylus.fcp.Priority;
import net.pterodactylus.fcp.ReturnType;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
private Optional<Data> execute(String uri) throws InterruptedException, ExecutionException, IOException {
ClientGet clientGet = createClientGetCommand(uri);
- try (ClientGetReplySequence clientGetReplySequence = new ClientGetReplySequence()) {
- return clientGetReplySequence.send(clientGet).get();
+ try (ClientGetDialog clientGetDialog = new ClientGetDialog()) {
+ return clientGetDialog.send(clientGet).get();
}
}
return clientGet;
}
- private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
+ private class ClientGetDialog extends FcpDialog<Optional<Data>> {
private final AtomicBoolean finished = new AtomicBoolean();
private final AtomicBoolean failed = new AtomicBoolean();
private long dataLength;
private InputStream payload;
- public ClientGetReplySequence() throws IOException {
+ public ClientGetDialog() throws IOException {
super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
}
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.pterodactylus.fcp.ClientHello;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.FcpConnection;
import net.pterodactylus.fcp.NodeHello;
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Internal <code>ClientHello</code> implementation based on {@link FcpReplySequence}.
+ * Internal <code>ClientHello</code> implementation based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
FcpConnection connection = new FcpConnection(hostname, port);
connection.connect();
ClientHello clientHello = new ClientHello(clientName.get(), "2.0");
- try (ClientHelloReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection)) {
+ try (ClientHelloDialog nodeHelloSequence = new ClientHelloDialog(connection)) {
if (nodeHelloSequence.send(clientHello).get()) {
return connection;
}
throw new IOException(String.format("Could not connect to %s:%d.", hostname, port));
}
- private class ClientHelloReplySequence extends FcpReplySequence<Boolean> {
+ private class ClientHelloDialog extends FcpDialog<Boolean> {
private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- public ClientHelloReplySequence(FcpConnection connection) {
+ public ClientHelloDialog(FcpConnection connection) {
super(ClientHelloImpl.this.threadPool, connection);
}
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
+ * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
String identifier = new RandomIdentifierGenerator().generate();
ClientPut clientPut = createClientPutCommand(uri, identifier);
- try (ClientPutReplySequence clientPutReplySequence = new ClientPutReplySequence()) {
- return clientPutReplySequence.send(clientPut).get();
+ try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
+ return clientPutDialog.send(clientPut).get();
}
}
return clientPut;
}
- private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
+ private class ClientPutDialog extends FcpDialog<Optional<Key>> {
private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
private final AtomicReference<String> directory = new AtomicReference<>();
private final AtomicReference<Key> finalKey = new AtomicReference<>();
private final AtomicBoolean putFinished = new AtomicBoolean();
- public ClientPutReplySequence() throws IOException {
+ public ClientPutDialog() throws IOException {
super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
+
+ private final Object syncObject = new Object();
+ private final ListeningExecutorService executorService;
+ private final FcpConnection fcpConnection;
+ private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+ private final AtomicReference<String> identifier = new AtomicReference<>();
+ private final AtomicBoolean connectionClosed = new AtomicBoolean();
+ private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
+
+ public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ this.fcpConnection = fcpConnection;
+ }
+
+ protected void setIdentifier(String identifier) {
+ this.identifier.set(identifier);
+ }
+
+ protected abstract boolean isFinished();
+
+ public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+ setIdentifier(fcpMessage.getField("Identifier"));
+ fcpConnection.addFcpListener(this);
+ messages.add(fcpMessage);
+ return executorService.submit(() -> {
+ synchronized (syncObject) {
+ while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
+ while (messages.peek() != null) {
+ FcpMessage message = messages.poll();
+ fcpConnection.sendMessage(message);
+ }
+ if (isFinished() || connectionClosed.get()) {
+ continue;
+ }
+ syncObject.wait();
+ }
+ }
+ Throwable throwable = connectionFailureReason.get();
+ if (throwable != null) {
+ throw new ExecutionException(throwable);
+ }
+ return getResult();
+ });
+ }
+
+ protected void sendMessage(FcpMessage fcpMessage) {
+ messages.add(fcpMessage);
+ notifySyncObject();
+ }
+
+ private void notifySyncObject() {
+ synchronized (syncObject) {
+ syncObject.notifyAll();
+ }
+ }
+
+ protected R getResult() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ fcpConnection.removeFcpListener(this);
+ }
+
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+ consume(consumer, message, "Identifier");
+ }
+
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+ String identifier) {
+ if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+ consumeAlways(consumer, message);
+ }
+ }
+
+ private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
+ consumer.accept(message);
+ notifySyncObject();
+ }
+
+ private void consumeUnknown(FcpMessage fcpMessage) {
+ consumeUnknownMessage(fcpMessage);
+ notifySyncObject();
+ }
+
+ private void consumeClose(Throwable throwable) {
+ connectionFailureReason.set(throwable);
+ connectionClosed.set(true);
+ notifySyncObject();
+ }
+
+ @Override
+ public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+ consume(this::consumeNodeHello, nodeHello);
+ }
+
+ protected void consumeNodeHello(NodeHello nodeHello) { }
+
+ @Override
+ public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ connectionFailureReason.set(new IOException("duplicate client name"));
+ connectionClosed.set(true);
+ notifySyncObject();
+ }
+
+ @Override
+ public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+ consume(this::consumeSSKKeypair, sskKeypair);
+ }
+
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
+ @Override
+ public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+ consume(this::consumePeer, peer);
+ }
+
+ protected void consumePeer(Peer peer) { }
+
+ @Override
+ public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+ consume(this::consumeEndListPeers, endListPeers);
+ }
+
+ protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
+ @Override
+ public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+ consume(this::consumePeerNote, peerNote);
+ }
+
+ protected void consumePeerNote(PeerNote peerNote) { }
+
+ @Override
+ public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+ consume(this::consumeEndListPeerNotes, endListPeerNotes);
+ }
+
+ protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
+ @Override
+ public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+ consume(this::consumePeerRemoved, peerRemoved);
+ }
+
+ protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
+ @Override
+ public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+ consume(this::consumeNodeData, nodeData);
+ }
+
+ protected void consumeNodeData(NodeData nodeData) { }
+
+ @Override
+ public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+ consume(this::consumeTestDDAReply, testDDAReply, "Directory");
+ }
+
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
+ @Override
+ public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+ consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
+ }
+
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
+ @Override
+ public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+ consume(this::consumePersistentGet, persistentGet);
+ }
+
+ protected void consumePersistentGet(PersistentGet persistentGet) { }
+
+ @Override
+ public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+ consume(this::consumePersistentPut, persistentPut);
+ }
+
+ protected void consumePersistentPut(PersistentPut persistentPut) { }
+
+ @Override
+ public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+ EndListPersistentRequests endListPersistentRequests) {
+ consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
+ }
+
+ protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
+ @Override
+ public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+ consume(this::consumeURIGenerated, uriGenerated);
+ }
+
+ protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
+ @Override
+ public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+ consume(this::consumeDataFound, dataFound);
+ }
+
+ protected void consumeDataFound(DataFound dataFound) { }
+
+ @Override
+ public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+ consume(this::consumeAllData, allData);
+ }
+
+ protected void consumeAllData(AllData allData) { }
+
+ @Override
+ public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+ consume(this::consumeSimpleProgress, simpleProgress);
+ }
+
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
+ @Override
+ public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+ consume(this::consumeStartedCompression, startedCompression);
+ }
+
+ protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
+ @Override
+ public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+ consume(this::consumeFinishedCompression, finishedCompression);
+ }
+
+ protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
+ @Override
+ public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+ consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
+ }
+
+ protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
+ @Override
+ public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+ UnknownNodeIdentifier unknownNodeIdentifier) {
+ consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
+ }
+
+ protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
+ @Override
+ public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+ consume(this::consumeConfigData, configData);
+ }
+
+ protected void consumeConfigData(ConfigData configData) { }
+
+ @Override
+ public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+ consume(this::consumeGetFailed, getFailed);
+ }
+
+ protected void consumeGetFailed(GetFailed getFailed) { }
+
+ @Override
+ public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+ consume(this::consumePutFailed, putFailed);
+ }
+
+ protected void consumePutFailed(PutFailed putFailed) { }
+
+ @Override
+ public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+ consume(this::consumeIdentifierCollision, identifierCollision);
+ }
+
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
+ @Override
+ public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+ consume(this::consumePersistentPutDir, persistentPutDir);
+ }
+
+ protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
+ @Override
+ public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+ PersistentRequestRemoved persistentRequestRemoved) {
+ consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
+ }
+
+ protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
+ @Override
+ public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+ consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
+ }
+
+ protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
+ @Override
+ public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+ consume(this::consumePluginInfo, pluginInfo);
+ }
+
+ protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
+ @Override
+ public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+ consume(this::consumeFCPPluginReply, fcpPluginReply);
+ }
+
+ protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
+ @Override
+ public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
+ PersistentRequestModified persistentRequestModified) {
+ consume(this::consumePersistentRequestModified, persistentRequestModified);
+ }
+
+ protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
+ @Override
+ public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+ consume(this::consumePutSuccessful, putSuccessful);
+ }
+
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
+ @Override
+ public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+ consume(this::consumePutFetchable, putFetchable);
+ }
+
+ protected void consumePutFetchable(PutFetchable putFetchable) { }
+
+ @Override
+ public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+ consume(this::consumeSentFeed, sentFeed);
+ }
+
+ protected void consumeSentFeed(SentFeed sentFeed) { }
+
+ @Override
+ public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+ consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
+ }
+
+ protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
+ @Override
+ public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+ consume(this::consumeProtocolError, protocolError);
+ }
+
+ protected void consumeProtocolError(ProtocolError protocolError) { }
+
+ @Override
+ public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+ consumeUnknown(fcpMessage);
+ }
+
+ protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
+ @Override
+ public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+ consumeClose(throwable);
+ }
+
+}
+++ /dev/null
-package net.pterodactylus.fcp.quelaton;
-
-import java.io.IOException;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-
-import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
-import net.pterodactylus.fcp.ConfigData;
-import net.pterodactylus.fcp.DataFound;
-import net.pterodactylus.fcp.EndListPeerNotes;
-import net.pterodactylus.fcp.EndListPeers;
-import net.pterodactylus.fcp.EndListPersistentRequests;
-import net.pterodactylus.fcp.FCPPluginReply;
-import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpListener;
-import net.pterodactylus.fcp.FcpMessage;
-import net.pterodactylus.fcp.FinishedCompression;
-import net.pterodactylus.fcp.GetFailed;
-import net.pterodactylus.fcp.IdentifierCollision;
-import net.pterodactylus.fcp.NodeData;
-import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.Peer;
-import net.pterodactylus.fcp.PeerNote;
-import net.pterodactylus.fcp.PeerRemoved;
-import net.pterodactylus.fcp.PersistentGet;
-import net.pterodactylus.fcp.PersistentPut;
-import net.pterodactylus.fcp.PersistentPutDir;
-import net.pterodactylus.fcp.PersistentRequestModified;
-import net.pterodactylus.fcp.PersistentRequestRemoved;
-import net.pterodactylus.fcp.PluginInfo;
-import net.pterodactylus.fcp.ProtocolError;
-import net.pterodactylus.fcp.PutFailed;
-import net.pterodactylus.fcp.PutFetchable;
-import net.pterodactylus.fcp.PutSuccessful;
-import net.pterodactylus.fcp.ReceivedBookmarkFeed;
-import net.pterodactylus.fcp.SSKKeypair;
-import net.pterodactylus.fcp.SentFeed;
-import net.pterodactylus.fcp.SimpleProgress;
-import net.pterodactylus.fcp.StartedCompression;
-import net.pterodactylus.fcp.SubscribedUSKUpdate;
-import net.pterodactylus.fcp.TestDDAComplete;
-import net.pterodactylus.fcp.TestDDAReply;
-import net.pterodactylus.fcp.URIGenerated;
-import net.pterodactylus.fcp.UnknownNodeIdentifier;
-import net.pterodactylus.fcp.UnknownPeerNoteType;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
- *
- * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
- */
-public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
-
- private final Object syncObject = new Object();
- private final ListeningExecutorService executorService;
- private final FcpConnection fcpConnection;
- private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
- private final AtomicReference<String> identifier = new AtomicReference<>();
- private final AtomicBoolean connectionClosed = new AtomicBoolean();
- private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
-
- public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
- this.executorService = MoreExecutors.listeningDecorator(executorService);
- this.fcpConnection = fcpConnection;
- }
-
- protected void setIdentifier(String identifier) {
- this.identifier.set(identifier);
- }
-
- protected abstract boolean isFinished();
-
- public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
- setIdentifier(fcpMessage.getField("Identifier"));
- fcpConnection.addFcpListener(this);
- messages.add(fcpMessage);
- return executorService.submit(() -> {
- synchronized (syncObject) {
- while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
- while (messages.peek() != null) {
- FcpMessage message = messages.poll();
- fcpConnection.sendMessage(message);
- }
- if (isFinished() || connectionClosed.get()) {
- continue;
- }
- syncObject.wait();
- }
- }
- Throwable throwable = connectionFailureReason.get();
- if (throwable != null) {
- throw new ExecutionException(throwable);
- }
- return getResult();
- });
- }
-
- protected void sendMessage(FcpMessage fcpMessage) {
- messages.add(fcpMessage);
- notifySyncObject();
- }
-
- private void notifySyncObject() {
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
- }
-
- protected R getResult() {
- return null;
- }
-
- @Override
- public void close() {
- fcpConnection.removeFcpListener(this);
- }
-
- private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
- consume(consumer, message, "Identifier");
- }
-
- private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
- String identifier) {
- if (Objects.equals(message.getField(identifier), this.identifier.get())) {
- consumeAlways(consumer, message);
- }
- }
-
- private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
- consumer.accept(message);
- notifySyncObject();
- }
-
- private void consumeUnknown(FcpMessage fcpMessage) {
- consumeUnknownMessage(fcpMessage);
- notifySyncObject();
- }
-
- private void consumeClose(Throwable throwable) {
- connectionFailureReason.set(throwable);
- connectionClosed.set(true);
- notifySyncObject();
- }
-
- @Override
- public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
- consume(this::consumeNodeHello, nodeHello);
- }
-
- protected void consumeNodeHello(NodeHello nodeHello) { }
-
- @Override
- public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
- CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- connectionFailureReason.set(new IOException("duplicate client name"));
- connectionClosed.set(true);
- notifySyncObject();
- }
-
- @Override
- public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
- consume(this::consumeSSKKeypair, sskKeypair);
- }
-
- protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
-
- @Override
- public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
- consume(this::consumePeer, peer);
- }
-
- protected void consumePeer(Peer peer) { }
-
- @Override
- public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
- consume(this::consumeEndListPeers, endListPeers);
- }
-
- protected void consumeEndListPeers(EndListPeers endListPeers) { }
-
- @Override
- public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
- consume(this::consumePeerNote, peerNote);
- }
-
- protected void consumePeerNote(PeerNote peerNote) { }
-
- @Override
- public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
- consume(this::consumeEndListPeerNotes, endListPeerNotes);
- }
-
- protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
-
- @Override
- public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
- consume(this::consumePeerRemoved, peerRemoved);
- }
-
- protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
-
- @Override
- public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
- consume(this::consumeNodeData, nodeData);
- }
-
- protected void consumeNodeData(NodeData nodeData) { }
-
- @Override
- public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
- consume(this::consumeTestDDAReply, testDDAReply, "Directory");
- }
-
- protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
-
- @Override
- public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
- consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
- }
-
- protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
-
- @Override
- public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
- consume(this::consumePersistentGet, persistentGet);
- }
-
- protected void consumePersistentGet(PersistentGet persistentGet) { }
-
- @Override
- public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
- consume(this::consumePersistentPut, persistentPut);
- }
-
- protected void consumePersistentPut(PersistentPut persistentPut) { }
-
- @Override
- public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
- EndListPersistentRequests endListPersistentRequests) {
- consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
- }
-
- protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
-
- @Override
- public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
- consume(this::consumeURIGenerated, uriGenerated);
- }
-
- protected void consumeURIGenerated(URIGenerated uriGenerated) { }
-
- @Override
- public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
- consume(this::consumeDataFound, dataFound);
- }
-
- protected void consumeDataFound(DataFound dataFound) { }
-
- @Override
- public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
- consume(this::consumeAllData, allData);
- }
-
- protected void consumeAllData(AllData allData) { }
-
- @Override
- public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
- consume(this::consumeSimpleProgress, simpleProgress);
- }
-
- protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
-
- @Override
- public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
- consume(this::consumeStartedCompression, startedCompression);
- }
-
- protected void consumeStartedCompression(StartedCompression startedCompression) { }
-
- @Override
- public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
- consume(this::consumeFinishedCompression, finishedCompression);
- }
-
- protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
-
- @Override
- public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
- consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
- }
-
- protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
-
- @Override
- public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
- UnknownNodeIdentifier unknownNodeIdentifier) {
- consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
- }
-
- protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
-
- @Override
- public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
- consume(this::consumeConfigData, configData);
- }
-
- protected void consumeConfigData(ConfigData configData) { }
-
- @Override
- public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
- consume(this::consumeGetFailed, getFailed);
- }
-
- protected void consumeGetFailed(GetFailed getFailed) { }
-
- @Override
- public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
- consume(this::consumePutFailed, putFailed);
- }
-
- protected void consumePutFailed(PutFailed putFailed) { }
-
- @Override
- public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
- consume(this::consumeIdentifierCollision, identifierCollision);
- }
-
- protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
-
- @Override
- public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
- consume(this::consumePersistentPutDir, persistentPutDir);
- }
-
- protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
-
- @Override
- public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
- PersistentRequestRemoved persistentRequestRemoved) {
- consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
- }
-
- protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
-
- @Override
- public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
- consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
- }
-
- protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
-
- @Override
- public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
- consume(this::consumePluginInfo, pluginInfo);
- }
-
- protected void consumePluginInfo(PluginInfo pluginInfo) { }
-
- @Override
- public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
- consume(this::consumeFCPPluginReply, fcpPluginReply);
- }
-
- protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
-
- @Override
- public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
- PersistentRequestModified persistentRequestModified) {
- consume(this::consumePersistentRequestModified, persistentRequestModified);
- }
-
- protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
-
- @Override
- public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
- consume(this::consumePutSuccessful, putSuccessful);
- }
-
- protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
-
- @Override
- public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
- consume(this::consumePutFetchable, putFetchable);
- }
-
- protected void consumePutFetchable(PutFetchable putFetchable) { }
-
- @Override
- public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
- consume(this::consumeSentFeed, sentFeed);
- }
-
- protected void consumeSentFeed(SentFeed sentFeed) { }
-
- @Override
- public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
- consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
- }
-
- protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
-
- @Override
- public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
- consume(this::consumeProtocolError, protocolError);
- }
-
- protected void consumeProtocolError(ProtocolError protocolError) { }
-
- @Override
- public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
- consumeUnknown(fcpMessage);
- }
-
- protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
-
- @Override
- public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
- consumeClose(throwable);
- }
-
-}
}
private FcpKeyPair executeSequence() throws InterruptedException, ExecutionException, IOException {
- try (FcpKeyPairReplySequence fcpKeyPairReplySequence = new FcpKeyPairReplySequence()) {
- return fcpKeyPairReplySequence.send(new GenerateSSK()).get();
+ try (FcpKeyPairDialog fcpKeyPairDialog = new FcpKeyPairDialog()) {
+ return fcpKeyPairDialog.send(new GenerateSSK()).get();
}
}
- private class FcpKeyPairReplySequence extends FcpReplySequence<FcpKeyPair> {
+ private class FcpKeyPairDialog extends FcpDialog<FcpKeyPair> {
private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
- public FcpKeyPairReplySequence() throws IOException {
+ public FcpKeyPairDialog() throws IOException {
super(GenerateKeypairCommandImpl.this.threadPool, GenerateKeypairCommandImpl.this.connectionSupplier.get());
}
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link GetNodeCommandImpl} implementation based on {@link FcpReplySequence}.
+ * Default {@link GetNodeCommandImpl} implementation based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
*/
private NodeData executeSequence() throws InterruptedException, ExecutionException, IOException {
GetNode getNode = new GetNode(new RandomIdentifierGenerator().generate(), giveOpennetRef.get(),
includePrivate.get(), includeVolatile.get());
- try (GetNodeReplySequence getNodeReplySequence = new GetNodeReplySequence()) {
- return getNodeReplySequence.send(getNode).get();
+ try (GetNodeDialog getNodeDialog = new GetNodeDialog()) {
+ return getNodeDialog.send(getNode).get();
}
}
- private class GetNodeReplySequence extends FcpReplySequence<NodeData> {
+ private class GetNodeDialog extends FcpDialog<NodeData> {
private final AtomicReference<NodeData> nodeData = new AtomicReference<>();
- public GetNodeReplySequence() throws IOException {
+ public GetNodeDialog() throws IOException {
super(threadPool, connectionSupplier.get());
}
import com.google.common.util.concurrent.ListeningExecutorService;
/**
- * Default {@link ListPeerCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link ListPeerCommand} implementation based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
*/
}
}
- private class ListPeerSequence extends FcpReplySequence<Peer> {
+ private class ListPeerSequence extends FcpDialog<Peer> {
private final AtomicBoolean finished = new AtomicBoolean();
private final AtomicReference<Peer> peer = new AtomicReference<>();
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link ListPeersCommand} implementation based on {@link FcpReplySequence}.
+ * Default {@link ListPeersCommand} implementation based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
private Collection<Peer> executeSequence() throws InterruptedException, ExecutionException, IOException {
String identifier = new RandomIdentifierGenerator().generate();
ListPeers listPeers = new ListPeers(identifier, includeMetadata.get(), includeVolatile.get());
- try (ListPeersReplySequence listPeersReplySequence = new ListPeersReplySequence()) {
- return listPeersReplySequence.send(listPeers).get();
+ try (ListPeersDialog listPeersDialog = new ListPeersDialog()) {
+ return listPeersDialog.send(listPeers).get();
}
}
- private class ListPeersReplySequence extends FcpReplySequence<Collection<Peer>> {
+ private class ListPeersDialog extends FcpDialog<Collection<Peer>> {
private final Collection<Peer> peers = new HashSet<>();
private final AtomicBoolean finished = new AtomicBoolean(false);
- public ListPeersReplySequence() throws IOException {
+ public ListPeersDialog() throws IOException {
super(threadPool, connectionSupplier.get());
}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link FcpDialog}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpDialogTest {
+
+ private final FcpConnection fcpConnection = mock(FcpConnection.class);
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final TestFcpDialog dialog = new TestFcpDialog(executorService, fcpConnection);
+ private final FcpMessage fcpMessage = new FcpMessage("Test");
+
+ @Test
+ public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
+ FcpDialog dialog = createBasicDialog();
+ dialog.send(fcpMessage).get();
+ verify(fcpConnection).sendMessage(fcpMessage);
+ }
+
+ private FcpDialog createBasicDialog() {
+ return new FcpDialog(executorService, fcpConnection) {
+ @Override
+ protected boolean isFinished() {
+ return true;
+ }
+ };
+ }
+
+ @Test
+ public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
+ FcpDialog dialog = createBasicDialog();
+ dialog.send(fcpMessage);
+ verify(fcpConnection).addFcpListener(dialog);
+ }
+
+ @Test
+ public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
+ FcpDialog dialog = createBasicDialog();
+ dialog.send(fcpMessage);
+ dialog.close();
+ verify(fcpConnection).removeFcpListener(dialog);
+ }
+
+ private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
+ waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
+ }
+
+ private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
+ dialog.setExpectedMessage(message.getName());
+ Future<Boolean> result = dialog.send(fcpMessage);
+ messageReceiver.receiveMessage(fcpConnection, message);
+ assertThat(result.get(), is(true));
+ }
+
+ private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
+ return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
+ }
+
+ private interface MessageCreator<M extends BaseMessage> {
+
+ M create(FcpMessage fcpMessage);
+
+ }
+
+ @Test
+ public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
+ waitForASpecificMessage(dialog::receivedNodeHello, NodeHello.class, NodeHello::new);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
+ dialog.setExpectedMessage("");
+ Future<Boolean> result = dialog.send(fcpMessage);
+ dialog.receivedCloseConnectionDuplicateClientName(fcpConnection,
+ new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+ result.get();
+ }
+
+ @Test
+ public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
+ }
+
+ @Test
+ public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPeer, Peer.class, Peer::new);
+ }
+
+ @Test
+ public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
+ }
+
+ @Test
+ public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPeerNote, PeerNote.class, PeerNote::new);
+ }
+
+ @Test
+ public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
+ }
+
+ @Test
+ public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
+ }
+
+ @Test
+ public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedNodeData, new NodeData(
+ new FcpMessage("NodeData").put("ark.pubURI", "")
+ .put("ark.number", "0")
+ .put("auth.negTypes", "")
+ .put("version", "0,0,0,0")
+ .put("lastGoodVersion", "0,0,0,0")));
+ }
+
+ @Test
+ public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
+ }
+
+ @Test
+ public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
+ }
+
+ @Test
+ public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
+ }
+
+ @Test
+ public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
+ }
+
+ @Test
+ public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
+ }
+
+ @Test
+ public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
+ }
+
+ @Test
+ public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedDataFound, DataFound.class, DataFound::new);
+ }
+
+ @Test
+ public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedAllData, new AllData(new FcpMessage("AllData"), null));
+ }
+
+ @Test
+ public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
+ }
+
+ @Test
+ public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
+ }
+
+ @Test
+ public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
+ }
+
+ @Test
+ public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
+ }
+
+ @Test
+ public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
+ }
+
+ @Test
+ public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedConfigData, ConfigData.class, ConfigData::new);
+ }
+
+ @Test
+ public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedGetFailed, GetFailed.class, GetFailed::new);
+ }
+
+ @Test
+ public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPutFailed, PutFailed.class, PutFailed::new);
+ }
+
+ @Test
+ public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
+ }
+
+ @Test
+ public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
+ }
+
+ @Test
+ public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
+ }
+
+ @Test
+ public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
+ }
+
+ @Test
+ public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
+ }
+
+ @Test
+ public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+ }
+
+ @Test
+ public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
+ }
+
+ @Test
+ public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
+ }
+
+ @Test
+ public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
+ }
+
+ @Test
+ public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedSentFeed, SentFeed.class, SentFeed::new);
+ }
+
+ @Test
+ public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
+ }
+
+ @Test
+ public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(dialog::receivedProtocolError, ProtocolError.class, ProtocolError::new);
+ }
+
+ @Test
+ public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
+ dialog.setExpectedMessage("SomeFcpMessage");
+ Future<Boolean> result = dialog.send(fcpMessage);
+ dialog.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
+ assertThat(result.get(), is(true));
+ }
+
+ @Test
+ public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
+ TestFcpDialog testFcpDialog = new TestFcpDialog(executorService, fcpConnection) {
+ private final AtomicBoolean gotPutFailed = new AtomicBoolean();
+ private final AtomicBoolean gotGetFailed = new AtomicBoolean();
+
+ @Override
+ protected boolean isFinished() {
+ return gotPutFailed.get() && gotGetFailed.get();
+ }
+
+ @Override
+ protected Boolean getResult() {
+ return isFinished();
+ }
+
+ @Override
+ protected void consumePutFailed(PutFailed putFailed) {
+ gotPutFailed.set(true);
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ gotGetFailed.set(true);
+ }
+ };
+ Future<?> result = testFcpDialog.send(fcpMessage);
+ assertThat(result.isDone(), is(false));
+ testFcpDialog.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+ assertThat(result.isDone(), is(false));
+ testFcpDialog.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+ assertThat(result.get(), is(true));
+ }
+
+ @Test
+ public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
+ dialog.setExpectedMessage("none");
+ Future<Boolean> result = dialog.send(fcpMessage);
+ Throwable throwable = new Throwable();
+ dialog.connectionClosed(fcpConnection, throwable);
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ Throwable t = e;
+ while (t.getCause() != null) {
+ t = t.getCause();
+ }
+ assertThat(t, sameInstance(throwable));
+ }
+ }
+
+ @FunctionalInterface
+ private interface MessageReceiver<M> {
+
+ void receiveMessage(FcpConnection fcpConnection, M message);
+
+ }
+
+ private static class TestFcpDialog extends FcpDialog<Boolean> {
+
+ private final AtomicReference<String> gotMessage = new AtomicReference<>();
+ private final AtomicReference<String> expectedMessage = new AtomicReference<>();
+
+ public TestFcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
+ super(executorService, fcpConnection);
+ }
+
+ public void setExpectedMessage(String expectedMessage) {
+ this.expectedMessage.set(expectedMessage);
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return getResult();
+ }
+
+ @Override
+ protected Boolean getResult() {
+ return expectedMessage.get().equals(gotMessage.get());
+ }
+
+ @Override
+ protected void consumeNodeHello(NodeHello nodeHello) {
+ gotMessage.set(nodeHello.getName());
+ }
+
+ @Override
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+ gotMessage.set(sskKeypair.getName());
+ }
+
+ @Override
+ protected void consumePeer(Peer peer) {
+ gotMessage.set(peer.getName());
+ }
+
+ @Override
+ protected void consumeEndListPeers(EndListPeers endListPeers) {
+ gotMessage.set(endListPeers.getName());
+ }
+
+ @Override
+ protected void consumePeerNote(PeerNote peerNote) {
+ gotMessage.set(peerNote.getName());
+ }
+
+ @Override
+ protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+ gotMessage.set(endListPeerNotes.getName());
+ }
+
+ @Override
+ protected void consumePeerRemoved(PeerRemoved peerRemoved) {
+ gotMessage.set(peerRemoved.getName());
+ }
+
+ @Override
+ protected void consumeNodeData(NodeData nodeData) {
+ gotMessage.set(nodeData.getName());
+ }
+
+ @Override
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+ gotMessage.set(testDDAReply.getName());
+ }
+
+ @Override
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+ gotMessage.set(testDDAComplete.getName());
+ }
+
+ @Override
+ protected void consumePersistentGet(PersistentGet persistentGet) {
+ gotMessage.set(persistentGet.getName());
+ }
+
+ @Override
+ protected void consumePersistentPut(PersistentPut persistentPut) {
+ gotMessage.set(persistentPut.getName());
+ }
+
+ @Override
+ protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+ gotMessage.set(endListPersistentRequests.getName());
+ }
+
+ @Override
+ protected void consumeURIGenerated(URIGenerated uriGenerated) {
+ gotMessage.set(uriGenerated.getName());
+ }
+
+ @Override
+ protected void consumeDataFound(DataFound dataFound) {
+ gotMessage.set(dataFound.getName());
+ }
+
+ @Override
+ protected void consumeAllData(AllData allData) {
+ gotMessage.set(allData.getName());
+ }
+
+ @Override
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+ gotMessage.set(simpleProgress.getName());
+ }
+
+ @Override
+ protected void consumeStartedCompression(StartedCompression startedCompression) {
+ gotMessage.set(startedCompression.getName());
+ }
+
+ @Override
+ protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
+ gotMessage.set(finishedCompression.getName());
+ }
+
+ @Override
+ protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+ gotMessage.set(unknownPeerNoteType.getName());
+ }
+
+ @Override
+ protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+ gotMessage.set(unknownNodeIdentifier.getName());
+ }
+
+ @Override
+ protected void consumeConfigData(ConfigData configData) {
+ gotMessage.set(configData.getName());
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ gotMessage.set(getFailed.getName());
+ }
+
+ @Override
+ protected void consumePutFailed(PutFailed putFailed) {
+ gotMessage.set(putFailed.getName());
+ }
+
+ @Override
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+ gotMessage.set(identifierCollision.getName());
+ }
+
+ @Override
+ protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
+ gotMessage.set(persistentPutDir.getName());
+ }
+
+ @Override
+ protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+ gotMessage.set(persistentRequestRemoved.getName());
+ }
+
+ @Override
+ protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+ gotMessage.set(subscribedUSKUpdate.getName());
+ }
+
+ @Override
+ protected void consumePluginInfo(PluginInfo pluginInfo) {
+ gotMessage.set(pluginInfo.getName());
+ }
+
+ @Override
+ protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
+ gotMessage.set(fcpPluginReply.getName());
+ }
+
+ @Override
+ protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
+ gotMessage.set(persistentRequestModified.getName());
+ }
+
+ @Override
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+ gotMessage.set(putSuccessful.getName());
+ }
+
+ @Override
+ protected void consumePutFetchable(PutFetchable putFetchable) {
+ gotMessage.set(putFetchable.getName());
+ }
+
+ @Override
+ protected void consumeSentFeed(SentFeed sentFeed) {
+ gotMessage.set(sentFeed.getName());
+ }
+
+ @Override
+ protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
+ gotMessage.set(receivedBookmarkFeed.getName());
+ }
+
+ @Override
+ protected void consumeProtocolError(ProtocolError protocolError) {
+ gotMessage.set(protocolError.getName());
+ }
+
+ @Override
+ protected void consumeUnknownMessage(FcpMessage fcpMessage) {
+ gotMessage.set(fcpMessage.getName());
+ }
+
+ }
+
+}
+++ /dev/null
-package net.pterodactylus.fcp.quelaton;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
-import net.pterodactylus.fcp.ConfigData;
-import net.pterodactylus.fcp.DataFound;
-import net.pterodactylus.fcp.EndListPeerNotes;
-import net.pterodactylus.fcp.EndListPeers;
-import net.pterodactylus.fcp.EndListPersistentRequests;
-import net.pterodactylus.fcp.FCPPluginReply;
-import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpMessage;
-import net.pterodactylus.fcp.FinishedCompression;
-import net.pterodactylus.fcp.GetFailed;
-import net.pterodactylus.fcp.IdentifierCollision;
-import net.pterodactylus.fcp.NodeData;
-import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.Peer;
-import net.pterodactylus.fcp.PeerNote;
-import net.pterodactylus.fcp.PeerRemoved;
-import net.pterodactylus.fcp.PersistentGet;
-import net.pterodactylus.fcp.PersistentPut;
-import net.pterodactylus.fcp.PersistentPutDir;
-import net.pterodactylus.fcp.PersistentRequestModified;
-import net.pterodactylus.fcp.PersistentRequestRemoved;
-import net.pterodactylus.fcp.PluginInfo;
-import net.pterodactylus.fcp.ProtocolError;
-import net.pterodactylus.fcp.PutFailed;
-import net.pterodactylus.fcp.PutFetchable;
-import net.pterodactylus.fcp.PutSuccessful;
-import net.pterodactylus.fcp.ReceivedBookmarkFeed;
-import net.pterodactylus.fcp.SSKKeypair;
-import net.pterodactylus.fcp.SentFeed;
-import net.pterodactylus.fcp.SimpleProgress;
-import net.pterodactylus.fcp.StartedCompression;
-import net.pterodactylus.fcp.SubscribedUSKUpdate;
-import net.pterodactylus.fcp.TestDDAComplete;
-import net.pterodactylus.fcp.TestDDAReply;
-import net.pterodactylus.fcp.URIGenerated;
-import net.pterodactylus.fcp.UnknownNodeIdentifier;
-import net.pterodactylus.fcp.UnknownPeerNoteType;
-
-import org.junit.Test;
-
-/**
- * Unit test for {@link FcpReplySequence}.
- *
- * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
- */
-public class FcpReplySequenceTest {
-
- private final FcpConnection fcpConnection = mock(FcpConnection.class);
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
- private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
- private final FcpMessage fcpMessage = new FcpMessage("Test");
-
- @Test
- public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
- FcpReplySequence replySequence = createBasicReplySequence();
- replySequence.send(fcpMessage).get();
- verify(fcpConnection).sendMessage(fcpMessage);
- }
-
- private FcpReplySequence createBasicReplySequence() {
- return new FcpReplySequence(executorService, fcpConnection) {
- @Override
- protected boolean isFinished() {
- return true;
- }
- };
- }
-
- @Test
- public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
- FcpReplySequence replySequence = createBasicReplySequence();
- replySequence.send(fcpMessage);
- verify(fcpConnection).addFcpListener(replySequence);
- }
-
- @Test
- public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
- FcpReplySequence replySequence = createBasicReplySequence();
- replySequence.send(fcpMessage);
- replySequence.close();
- verify(fcpConnection).removeFcpListener(replySequence);
- }
-
- private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
- waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
- }
-
- private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
- replySequence.setExpectedMessage(message.getName());
- Future<Boolean> result = replySequence.send(fcpMessage);
- messageReceiver.receiveMessage(fcpConnection, message);
- assertThat(result.get(), is(true));
- }
-
- private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
- return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
- }
-
- private interface MessageCreator<M extends BaseMessage> {
-
- M create(FcpMessage fcpMessage);
-
- }
-
- @Test
- public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
- waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
- }
-
- @Test(expected = ExecutionException.class)
- public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
- replySequence.setExpectedMessage("");
- Future<Boolean> result = replySequence.send(fcpMessage);
- replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection,
- new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
- result.get();
- }
-
- @Test
- public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
- }
-
- @Test
- public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
- }
-
- @Test
- public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
- }
-
- @Test
- public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
- }
-
- @Test
- public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
- }
-
- @Test
- public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
- }
-
- @Test
- public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
- new FcpMessage("NodeData").put("ark.pubURI", "")
- .put("ark.number", "0")
- .put("auth.negTypes", "")
- .put("version", "0,0,0,0")
- .put("lastGoodVersion", "0,0,0,0")));
- }
-
- @Test
- public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
- }
-
- @Test
- public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
- }
-
- @Test
- public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
- }
-
- @Test
- public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
- }
-
- @Test
- public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
- }
-
- @Test
- public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
- }
-
- @Test
- public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
- }
-
- @Test
- public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
- }
-
- @Test
- public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
- }
-
- @Test
- public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
- }
-
- @Test
- public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
- }
-
- @Test
- public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
- }
-
- @Test
- public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
- }
-
- @Test
- public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
- }
-
- @Test
- public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
- }
-
- @Test
- public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
- }
-
- @Test
- public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
- }
-
- @Test
- public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
- }
-
- @Test
- public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
- }
-
- @Test
- public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
- }
-
- @Test
- public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
- }
-
- @Test
- public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
- }
-
- @Test
- public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
- }
-
- @Test
- public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
- }
-
- @Test
- public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
- }
-
- @Test
- public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
- }
-
- @Test
- public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
- }
-
- @Test
- public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
- }
-
- @Test
- public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
- replySequence.setExpectedMessage("SomeFcpMessage");
- Future<Boolean> result = replySequence.send(fcpMessage);
- replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
- assertThat(result.get(), is(true));
- }
-
- @Test
- public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
- TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
- private final AtomicBoolean gotPutFailed = new AtomicBoolean();
- private final AtomicBoolean gotGetFailed = new AtomicBoolean();
-
- @Override
- protected boolean isFinished() {
- return gotPutFailed.get() && gotGetFailed.get();
- }
-
- @Override
- protected Boolean getResult() {
- return isFinished();
- }
-
- @Override
- protected void consumePutFailed(PutFailed putFailed) {
- gotPutFailed.set(true);
- }
-
- @Override
- protected void consumeGetFailed(GetFailed getFailed) {
- gotGetFailed.set(true);
- }
- };
- Future<?> result = replySequence.send(fcpMessage);
- assertThat(result.isDone(), is(false));
- replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
- assertThat(result.isDone(), is(false));
- replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
- assertThat(result.get(), is(true));
- }
-
- @Test
- public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
- replySequence.setExpectedMessage("none");
- Future<Boolean> result = replySequence.send(fcpMessage);
- Throwable throwable = new Throwable();
- replySequence.connectionClosed(fcpConnection, throwable);
- try {
- result.get();
- } catch (ExecutionException e) {
- Throwable t = e;
- while (t.getCause() != null) {
- t = t.getCause();
- }
- assertThat(t, sameInstance(throwable));
- }
- }
-
- @FunctionalInterface
- private interface MessageReceiver<M> {
-
- void receiveMessage(FcpConnection fcpConnection, M message);
-
- }
-
- private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
-
- private final AtomicReference<String> gotMessage = new AtomicReference<>();
- private final AtomicReference<String> expectedMessage = new AtomicReference<>();
-
- public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
- super(executorService, fcpConnection);
- }
-
- public void setExpectedMessage(String expectedMessage) {
- this.expectedMessage.set(expectedMessage);
- }
-
- @Override
- protected boolean isFinished() {
- return getResult();
- }
-
- @Override
- protected Boolean getResult() {
- return expectedMessage.get().equals(gotMessage.get());
- }
-
- @Override
- protected void consumeNodeHello(NodeHello nodeHello) {
- gotMessage.set(nodeHello.getName());
- }
-
- @Override
- protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
- gotMessage.set(sskKeypair.getName());
- }
-
- @Override
- protected void consumePeer(Peer peer) {
- gotMessage.set(peer.getName());
- }
-
- @Override
- protected void consumeEndListPeers(EndListPeers endListPeers) {
- gotMessage.set(endListPeers.getName());
- }
-
- @Override
- protected void consumePeerNote(PeerNote peerNote) {
- gotMessage.set(peerNote.getName());
- }
-
- @Override
- protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
- gotMessage.set(endListPeerNotes.getName());
- }
-
- @Override
- protected void consumePeerRemoved(PeerRemoved peerRemoved) {
- gotMessage.set(peerRemoved.getName());
- }
-
- @Override
- protected void consumeNodeData(NodeData nodeData) {
- gotMessage.set(nodeData.getName());
- }
-
- @Override
- protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
- gotMessage.set(testDDAReply.getName());
- }
-
- @Override
- protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
- gotMessage.set(testDDAComplete.getName());
- }
-
- @Override
- protected void consumePersistentGet(PersistentGet persistentGet) {
- gotMessage.set(persistentGet.getName());
- }
-
- @Override
- protected void consumePersistentPut(PersistentPut persistentPut) {
- gotMessage.set(persistentPut.getName());
- }
-
- @Override
- protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
- gotMessage.set(endListPersistentRequests.getName());
- }
-
- @Override
- protected void consumeURIGenerated(URIGenerated uriGenerated) {
- gotMessage.set(uriGenerated.getName());
- }
-
- @Override
- protected void consumeDataFound(DataFound dataFound) {
- gotMessage.set(dataFound.getName());
- }
-
- @Override
- protected void consumeAllData(AllData allData) {
- gotMessage.set(allData.getName());
- }
-
- @Override
- protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
- gotMessage.set(simpleProgress.getName());
- }
-
- @Override
- protected void consumeStartedCompression(StartedCompression startedCompression) {
- gotMessage.set(startedCompression.getName());
- }
-
- @Override
- protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
- gotMessage.set(finishedCompression.getName());
- }
-
- @Override
- protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
- gotMessage.set(unknownPeerNoteType.getName());
- }
-
- @Override
- protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
- gotMessage.set(unknownNodeIdentifier.getName());
- }
-
- @Override
- protected void consumeConfigData(ConfigData configData) {
- gotMessage.set(configData.getName());
- }
-
- @Override
- protected void consumeGetFailed(GetFailed getFailed) {
- gotMessage.set(getFailed.getName());
- }
-
- @Override
- protected void consumePutFailed(PutFailed putFailed) {
- gotMessage.set(putFailed.getName());
- }
-
- @Override
- protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
- gotMessage.set(identifierCollision.getName());
- }
-
- @Override
- protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
- gotMessage.set(persistentPutDir.getName());
- }
-
- @Override
- protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
- gotMessage.set(persistentRequestRemoved.getName());
- }
-
- @Override
- protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
- gotMessage.set(subscribedUSKUpdate.getName());
- }
-
- @Override
- protected void consumePluginInfo(PluginInfo pluginInfo) {
- gotMessage.set(pluginInfo.getName());
- }
-
- @Override
- protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
- gotMessage.set(fcpPluginReply.getName());
- }
-
- @Override
- protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
- gotMessage.set(persistentRequestModified.getName());
- }
-
- @Override
- protected void consumePutSuccessful(PutSuccessful putSuccessful) {
- gotMessage.set(putSuccessful.getName());
- }
-
- @Override
- protected void consumePutFetchable(PutFetchable putFetchable) {
- gotMessage.set(putFetchable.getName());
- }
-
- @Override
- protected void consumeSentFeed(SentFeed sentFeed) {
- gotMessage.set(sentFeed.getName());
- }
-
- @Override
- protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
- gotMessage.set(receivedBookmarkFeed.getName());
- }
-
- @Override
- protected void consumeProtocolError(ProtocolError protocolError) {
- gotMessage.set(protocolError.getName());
- }
-
- @Override
- protected void consumeUnknownMessage(FcpMessage fcpMessage) {
- gotMessage.set(fcpMessage.getName());
- }
-
- }
-
-}