import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import net.pterodactylus.fcp.ReturnType;
import net.pterodactylus.fcp.SSKKeypair;
-import com.google.common.io.ByteStreams;
-
/**
* Default {@link FcpClient} implementation.
*
private FcpConnection createConnection() throws IOException {
FcpConnection connection = new FcpConnection(hostname, port);
connection.connect();
- AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- AtomicBoolean receivedClosed = new AtomicBoolean();
- FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
- nodeHelloSequence
- .handle(NodeHello.class)
- .with((nodeHello) -> receivedNodeHello.set(nodeHello));
- nodeHelloSequence
- .handle(CloseConnectionDuplicateClientName.class)
- .with((closeConnection) -> receivedClosed.set(true));
- nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+ FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
+ private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+ private final AtomicBoolean receivedClosed = new AtomicBoolean();
+ @Override
+ protected boolean isFinished() {
+ return receivedNodeHello.get() != null || receivedClosed.get();
+ }
+
+ @Override
+ protected void consumeNodeHello(NodeHello nodeHello) {
+ receivedNodeHello.set(nodeHello);
+ }
+
+ @Override
+ protected void consumeCloseConnectionDuplicateClientName(
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ receivedClosed.set(true);
+ }
+ };
ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
try {
nodeHelloSequence.send(clientHello).get();
public Future<FcpKeyPair> execute() {
return threadPool.submit(() -> {
connect();
- Sequence sequence = new Sequence();
- FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
- replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
- replySequence.waitFor(sequence::isFinished);
- replySequence.send(new GenerateSSK()).get();
- return sequence.getKeyPair();
- });
- }
-
- private class Sequence {
-
- private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+ return new FcpReplySequence<FcpKeyPair>(threadPool, fcpConnection.get()) {
+ private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
- public void handleSSKKeypair(SSKKeypair sskKeypair) {
- keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
- }
-
- public boolean isFinished() {
- return keyPair.get() != null;
- }
+ @Override
+ protected boolean isFinished() {
+ return keyPair.get() != null;
+ }
- public FcpKeyPair getKeyPair() {
- return keyPair.get();
- }
+ @Override
+ protected FcpKeyPair getResult() {
+ return keyPair.get();
+ }
+ @Override
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+ keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
+ }
+ }.send(new GenerateSSK()).get();
+ });
}
}
@Override
public Future<Optional<Data>> uri(String uri) {
- return threadPool.submit(() -> execute(uri));
- }
-
- private Optional<Data> execute(String uri) throws IOException, ExecutionException, InterruptedException {
- DefaultFcpClient.this.connect();
ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
if (ignoreDataStore) {
clientGet.setIgnoreDataStore(true);
if (global) {
clientGet.setGlobal(true);
}
- try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
- Sequence sequence = new Sequence(identifier);
- replySequence.handle(AllData.class).with(sequence::allData);
- replySequence.handle(GetFailed.class).with(sequence::getFailed);
- replySequence.handleClose().with(sequence::disconnect);
- replySequence.waitFor(sequence::isFinished);
- replySequence.send(clientGet).get();
- return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
- }
- }
-
- private class Sequence {
-
- private final AtomicBoolean finished = new AtomicBoolean();
- private final AtomicBoolean failed = new AtomicBoolean();
-
- private final String identifier;
-
- private String contentType;
- private long dataLength;
- private InputStream payload;
-
- private Sequence(String identifier) {
- this.identifier = identifier;
- }
+ return threadPool.submit(() -> {
+ connect();
+ FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, fcpConnection.get()) {
+ private final AtomicBoolean finished = new AtomicBoolean();
+ private final AtomicBoolean failed = new AtomicBoolean();
- public boolean isFinished() {
- return finished.get() || failed.get();
- }
+ private final String identifier = ClientGetCommandImpl.this.identifier;
- public boolean isSuccessful() {
- return !failed.get();
- }
+ private String contentType;
+ private long dataLength;
+ private InputStream payload;
- public Data getData() {
- return new Data() {
@Override
- public String getMimeType() {
- synchronized (Sequence.this) {
- return contentType;
- }
+ protected boolean isFinished() {
+ return finished.get() || failed.get();
}
@Override
- public long size() {
- synchronized (Sequence.this) {
- return dataLength;
- }
+ protected Optional<Data> getResult() {
+ return failed.get() ? Optional.empty() : Optional.of(new Data() {
+ @Override
+ public String getMimeType() {
+ return contentType;
+ }
+
+ @Override
+ public long size() {
+ return dataLength;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return payload;
+ }
+ });
}
@Override
- public InputStream getInputStream() {
- synchronized (Sequence.this) {
- return payload;
+ protected void consumeAllData(AllData allData) {
+ if (allData.getIdentifier().equals(identifier)) {
+ synchronized (this) {
+ contentType = allData.getContentType();
+ dataLength = allData.getDataLength();
+ try {
+ payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+ finished.set(true);
+ } catch (IOException e) {
+ // TODO – logging
+ failed.set(true);
+ }
+ }
}
}
- };
- }
- public void allData(AllData allData) {
- if (allData.getIdentifier().equals(identifier)) {
- synchronized (this) {
- contentType = allData.getContentType();
- dataLength = allData.getDataLength();
- try {
- payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
- finished.set(true);
- } catch (IOException e) {
- // TODO – logging
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ if (getFailed.getIdentifier().equals(identifier)) {
failed.set(true);
}
}
- }
- }
-
- public void getFailed(GetFailed getFailed) {
- if (getFailed.getIdentifier().equals(identifier)) {
- failed.set(true);
- }
- }
-
- public void disconnect(Throwable t) {
- failed.set(true);
- }
+ @Override
+ protected void consumeConnectionClosed(Throwable throwable) {
+ failed.set(true);
+ }
+ };
+ return replySequence.send(clientGet).get();
+ });
}
}
package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.ConfigData;
import net.pterodactylus.fcp.DataFound;
*
* @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
*/
-public class FcpReplySequence implements AutoCloseable, FcpListener {
+public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
+ private final Object syncObject = new Object();
private final ExecutorService executorService;
private final FcpConnection fcpConnection;
- private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
- private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
- private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
- private Supplier<Boolean> endPredicate;
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = executorService;
this.fcpConnection = fcpConnection;
}
- public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
- return new $1<>(messageClass);
- }
-
- public class $1<M extends BaseMessage> {
-
- private Class<M> messageClass;
-
- private $1(Class<M> messageClass) {
- this.messageClass = messageClass;
- }
-
- public FcpReplySequence with(Consumer<M> action) {
- expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
- return FcpReplySequence.this;
- }
-
- }
-
- public $2 handleUnknown() {
- return new $2();
- }
-
- public class $2 {
-
- public FcpReplySequence with(Consumer<FcpMessage> consumer) {
- unknownMessageHandlers.add(consumer);
- return FcpReplySequence.this;
- }
-
- }
-
- public $3 handleClose() {
- return new $3();
- }
+ protected abstract boolean isFinished();
- public class $3 {
+ public Future<R> send(FcpMessage fcpMessage) throws IOException {
+ try {
+ fcpConnection.addFcpListener(this);
- public FcpReplySequence with(Consumer<Throwable> consumer) {
- closeHandlers.add(consumer);
- return FcpReplySequence.this;
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
}
-
- }
-
- public void waitFor(Supplier<Boolean> endPredicate) {
- this.endPredicate = endPredicate;
- }
-
- public Future<?> send(FcpMessage fcpMessage) throws IOException {
- fcpConnection.addFcpListener(this);
fcpConnection.sendMessage(fcpMessage);
return executorService.submit(() -> {
- synchronized (endPredicate) {
- while (!endPredicate.get()) {
- endPredicate.wait();
+ synchronized (syncObject) {
+ while (!isFinished()) {
+ syncObject.wait();
}
}
- return null;
+ return getResult();
});
}
+ protected R getResult() {
+ return null;
+ }
+
@Override
public void close() {
fcpConnection.removeFcpListener(this);
}
- private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
- if (expectedMessageActions.containsKey(fcpMessageClass)) {
- expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ private <M> void consume(Consumer<M> consumer, M message) {
+ consumer.accept(message);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
private void consumeUnknown(FcpMessage fcpMessage) {
- for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
- unknownMessageHandler.accept(fcpMessage);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ consumeUnknownMessage(fcpMessage);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
private void consumeClose(Throwable throwable) {
- for (Consumer<Throwable> closeHandler : closeHandlers) {
- closeHandler.accept(throwable);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ consumeConnectionClosed(throwable);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
@Override
- public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
- consume(NodeHello.class, nodeHello);
+ public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+ consume(this::consumeNodeHello, nodeHello);
}
+ protected void consumeNodeHello(NodeHello nodeHello) { }
+
@Override
- public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+ public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+ consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
}
+ protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
+
@Override
- public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
- consume(SSKKeypair.class, sskKeypair);
+ public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+ consume(this::consumeSSKKeypair, sskKeypair);
}
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
@Override
- public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
- consume(Peer.class, peer);
+ public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+ consume(this::consumePeer, peer);
}
+ protected void consumePeer(Peer peer) { }
+
@Override
- public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
- consume(EndListPeers.class, endListPeers);
+ public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+ consume(this::consumeEndListPeers, endListPeers);
}
+ protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
@Override
- public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
- consume(PeerNote.class, peerNote);
+ public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+ consume(this::consumePeerNote, peerNote);
}
+ protected void consumePeerNote(PeerNote peerNote) { }
+
@Override
- public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
- consume(EndListPeerNotes.class, endListPeerNotes);
+ public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+ consume(this::consumeEndListPeerNotes, endListPeerNotes);
}
+ protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
@Override
- public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
- consume(PeerRemoved.class, peerRemoved);
+ public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+ consume(this::consumePeerRemoved, peerRemoved);
}
+ protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
@Override
- public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
- consume(NodeData.class, nodeData);
+ public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+ consume(this::consumeNodeData, nodeData);
}
+ protected void consumeNodeData(NodeData nodeData) { }
+
@Override
- public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
- consume(TestDDAReply.class, testDDAReply);
+ public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+ consume(this::consumeTestDDAReply, testDDAReply);
}
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
@Override
- public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
- consume(TestDDAComplete.class, testDDAComplete);
+ public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+ consume(this::consumeTestDDAComplete, testDDAComplete);
}
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
@Override
- public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
- consume(PersistentGet.class, persistentGet);
+ public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+ consume(this::consumePersistentGet, persistentGet);
}
+ protected void consumePersistentGet(PersistentGet persistentGet) { }
+
@Override
- public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
- consume(PersistentPut.class, persistentPut);
+ public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+ consume(this::consumePersistentPut, persistentPut);
}
+ protected void consumePersistentPut(PersistentPut persistentPut) { }
+
@Override
- public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+ public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
EndListPersistentRequests endListPersistentRequests) {
- consume(EndListPersistentRequests.class, endListPersistentRequests);
+ consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
}
+ protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
@Override
- public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
- consume(URIGenerated.class, uriGenerated);
+ public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+ consume(this::consumeURIGenerated, uriGenerated);
}
+ protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
@Override
- public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
- consume(DataFound.class, dataFound);
+ public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+ consume(this::consumeDataFound, dataFound);
}
+ protected void consumeDataFound(DataFound dataFound) { }
+
@Override
- public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
- consume(AllData.class, allData);
+ public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+ consume(this::consumeAllData, allData);
}
+ protected void consumeAllData(AllData allData) { }
+
@Override
- public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
- consume(SimpleProgress.class, simpleProgress);
+ public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+ consume(this::consumeSimpleProgress, simpleProgress);
}
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
@Override
- public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
- consume(StartedCompression.class, startedCompression);
+ public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+ consume(this::consumeStartedCompression, startedCompression);
}
+ protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
@Override
- public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
- consume(FinishedCompression.class, finishedCompression);
+ public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+ consume(this::consumeFinishedCompression, finishedCompression);
}
+ protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
@Override
- public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
- consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+ public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+ consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
}
+ protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
@Override
- public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+ public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
UnknownNodeIdentifier unknownNodeIdentifier) {
- consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+ consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
}
+ protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
@Override
- public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
- consume(ConfigData.class, configData);
+ public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+ consume(this::consumeConfigData, configData);
}
+ protected void consumeConfigData(ConfigData configData) { }
+
@Override
- public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
- consume(GetFailed.class, getFailed);
+ public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+ consume(this::consumeGetFailed, getFailed);
}
+ protected void consumeGetFailed(GetFailed getFailed) { }
+
@Override
- public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
- consume(PutFailed.class, putFailed);
+ public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+ consume(this::consumePutFailed, putFailed);
}
+ protected void consumePutFailed(PutFailed putFailed) { }
+
@Override
- public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
- consume(IdentifierCollision.class, identifierCollision);
+ public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+ consume(this::consumeIdentifierCollision, identifierCollision);
}
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
@Override
- public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
- consume(PersistentPutDir.class, persistentPutDir);
+ public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+ consume(this::consumePersistentPutDir, persistentPutDir);
}
+ protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
@Override
- public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+ public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
PersistentRequestRemoved persistentRequestRemoved) {
- consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+ consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
}
+ protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
@Override
- public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
- consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+ public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+ consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
}
+ protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
@Override
- public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
- consume(PluginInfo.class, pluginInfo);
+ public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+ consume(this::consumePluginInfo, pluginInfo);
}
+ protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
@Override
- public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
- consume(FCPPluginReply.class, fcpPluginReply);
+ public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+ consume(this::consumeFCPPluginReply, fcpPluginReply);
}
+ protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
@Override
- public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+ public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
PersistentRequestModified persistentRequestModified) {
- consume(PersistentRequestModified.class, persistentRequestModified);
+ consume(this::consumePersistentRequestModified, persistentRequestModified);
}
+ protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
@Override
- public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
- consume(PutSuccessful.class, putSuccessful);
+ public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+ consume(this::consumePutSuccessful, putSuccessful);
}
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
@Override
- public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
- consume(PutFetchable.class, putFetchable);
+ public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+ consume(this::consumePutFetchable, putFetchable);
}
+ protected void consumePutFetchable(PutFetchable putFetchable) { }
+
@Override
- public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
- consume(SentFeed.class, sentFeed);
+ public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+ consume(this::consumeSentFeed, sentFeed);
}
+ protected void consumeSentFeed(SentFeed sentFeed) { }
+
@Override
- public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
- consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+ public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+ consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
}
+ protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
@Override
- public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
- consume(ProtocolError.class, protocolError);
+ public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+ consume(this::consumeProtocolError, protocolError);
}
+ protected void consumeProtocolError(ProtocolError protocolError) { }
+
@Override
- public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+ public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
consumeUnknown(fcpMessage);
}
+ protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
@Override
- public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+ public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
consumeClose(throwable);
}
+ protected void consumeConnectionClosed(Throwable throwable) { }
+
}
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
import net.pterodactylus.fcp.AllData;
import net.pterodactylus.fcp.BaseMessage;
private final FcpConnection fcpConnection = mock(FcpConnection.class);
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
- private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
+ private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
private final FcpMessage fcpMessage = new FcpMessage("Test");
@Test
public void canSendMessage() throws IOException {
- replyWaiter.send(fcpMessage);
+ FcpReplySequence replySequence = createBasicReplySequence();
+ replySequence.send(fcpMessage);
verify(fcpConnection).sendMessage(fcpMessage);
}
+ private FcpReplySequence createBasicReplySequence() {
+ return new FcpReplySequence(executorService, fcpConnection) {
+ @Override
+ protected boolean isFinished() {
+ return true;
+ }
+ };
+ }
+
@Test
public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
- replyWaiter.send(fcpMessage);
- verify(fcpConnection).addFcpListener(replyWaiter);
+ FcpReplySequence replySequence = createBasicReplySequence();
+ replySequence.send(fcpMessage);
+ verify(fcpConnection).addFcpListener(replySequence);
}
@Test
public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
- replyWaiter.send(fcpMessage);
- replyWaiter.close();
- verify(fcpConnection).removeFcpListener(replyWaiter);
+ FcpReplySequence replySequence = createBasicReplySequence();
+ replySequence.send(fcpMessage);
+ replySequence.close();
+ verify(fcpConnection).removeFcpListener(replySequence);
}
- private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
- Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
- AtomicBoolean gotMessage = setupMessage(messageClass);
- Future<?> result = replyWaiter.send(fcpMessage);
- sendMessage(messageReceiver, message.get());
- result.get();
- assertThat(gotMessage.get(), is(true));
+ private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
+ waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
}
- private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
- messageReceiver.receive(fcpConnection, message);
+ private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
+ replySequence.setExpectedMessage(message.getName());
+ Future<Boolean> result = replySequence.send(fcpMessage);
+ messageReceiver.receiveMessage(fcpConnection, message);
+ assertThat(result.get(), is(true));
}
- private interface MessageReceiver<M extends BaseMessage> {
-
- void receive(FcpConnection fcpConnection, M message);
+ private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
+ return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
}
- private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
- AtomicBoolean gotMessage = new AtomicBoolean();
- replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
- replyWaiter.waitFor(() -> gotMessage.get());
- return gotMessage;
+ private interface MessageCreator<M extends BaseMessage> {
+
+ M create(FcpMessage fcpMessage);
+
}
@Test
public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
- waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
- () -> new NodeHello(new FcpMessage("NodeHello")));
+ waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
}
@Test
- public void waitingForConnectionClosedDuplicateClientNameWorks()
- throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
- CloseConnectionDuplicateClientName.class,
- () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+ public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
+ waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
}
@Test
public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
- () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
+ waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
}
@Test
public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
+ waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
}
@Test
public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
- () -> new EndListPeers(new FcpMessage("EndListPeers")));
+ waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
}
@Test
public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
- () -> new PeerNote(new FcpMessage("PeerNote")));
+ waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
}
@Test
public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
- () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
+ waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
}
@Test
public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
- () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
+ waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
}
@Test
public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
- () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
- .put(
- "ark.number", "0")
- .put("auth.negTypes", "")
- .put("version", "0,0,0,0")
- .put("lastGoodVersion", "0,0,0,0")));
+ waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
+ new FcpMessage("NodeData").put("ark.pubURI", "")
+ .put("ark.number", "0")
+ .put("auth.negTypes", "")
+ .put("version", "0,0,0,0")
+ .put("lastGoodVersion", "0,0,0,0")));
}
@Test
public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
- () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
+ waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
}
@Test
public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
- () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
+ waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
}
@Test
public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
- () -> new PersistentGet(new FcpMessage("PersistentGet")));
+ waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
}
@Test
public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
- () -> new PersistentPut(new FcpMessage("PersistentPut")));
+ waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
}
@Test
public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
- () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
+ waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
}
@Test
public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
- () -> new URIGenerated(new FcpMessage("URIGenerated")));
+ waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
}
@Test
public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
- () -> new DataFound(new FcpMessage("DataFound")));
+ waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
}
@Test
public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
- () -> new AllData(new FcpMessage("AllData"), null));
+ waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
}
@Test
public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
- () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
+ waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
}
@Test
public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
- () -> new StartedCompression(new FcpMessage("StartedCompression")));
+ waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
}
@Test
public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
- () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
+ waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
}
@Test
public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
- () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
+ waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
}
@Test
public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
- () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
+ waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
}
@Test
public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
- () -> new ConfigData(new FcpMessage("ConfigData")));
+ waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
}
@Test
public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
- () -> new GetFailed(new FcpMessage("GetFailed")));
+ waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
}
@Test
public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
- () -> new PutFailed(new FcpMessage("PutFailed")));
+ waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
}
@Test
public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
- () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
+ waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
}
@Test
public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
- () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
+ waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
}
@Test
public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
- () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
+ waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
}
@Test
public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
- () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
+ waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
}
@Test
public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
- () -> new PluginInfo(new FcpMessage("PluginInfo")));
+ waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
}
@Test
public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
- () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+ waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
}
@Test
public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
- () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
+ waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
}
@Test
public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
- () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
+ waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
}
@Test
public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
- () -> new PutFetchable(new FcpMessage("PutFetchable")));
+ waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
}
@Test
public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
- () -> new SentFeed(new FcpMessage("SentFeed")));
+ waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
}
@Test
public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
- () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
+ waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
}
@Test
public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
- waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
- () -> new ProtocolError(new FcpMessage("ProtocolError")));
+ waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
}
@Test
public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
- AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
- replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
- replyWaiter.waitFor(() -> receivedMessage.get() != null);
- Future<?> result = replyWaiter.send(fcpMessage);
- replyWaiter.receivedMessage(fcpConnection, fcpMessage);
- result.get();
- assertThat(receivedMessage.get(), is(fcpMessage));
+ replySequence.setExpectedMessage("SomeFcpMessage");
+ Future<Boolean> result = replySequence.send(fcpMessage);
+ replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
+ assertThat(result.get(), is(true));
}
@Test
public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
- AtomicBoolean gotPutFailed = new AtomicBoolean();
- replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
- AtomicBoolean gotGetFailed = new AtomicBoolean();
- replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
- replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
- Future<?> result = replyWaiter.send(fcpMessage);
+ TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
+ private final AtomicBoolean gotPutFailed = new AtomicBoolean();
+ private final AtomicBoolean gotGetFailed = new AtomicBoolean();
+
+ @Override
+ protected boolean isFinished() {
+ return gotPutFailed.get() && gotGetFailed.get();
+ }
+
+ @Override
+ protected Boolean getResult() {
+ return isFinished();
+ }
+
+ @Override
+ protected void consumePutFailed(PutFailed putFailed) {
+ gotPutFailed.set(true);
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ gotGetFailed.set(true);
+ }
+ };
+ Future<?> result = replySequence.send(fcpMessage);
assertThat(result.isDone(), is(false));
- replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+ replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
assertThat(result.isDone(), is(false));
- replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
- result.get();
+ replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+ assertThat(result.get(), is(true));
}
@Test
public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
- AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
- replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
- replyWaiter.waitFor(() -> receivedThrowable.get() != null);
- Future<?> result = replyWaiter.send(fcpMessage);
+ replySequence.setExpectedMessage("ConnectionClosed");
+ Future<Boolean> result = replySequence.send(fcpMessage);
Throwable throwable = new Throwable();
- replyWaiter.connectionClosed(fcpConnection, throwable);
- result.get();
- assertThat(receivedThrowable.get(), is(throwable));
+ replySequence.connectionClosed(fcpConnection, throwable);
+ assertThat(result.get(), is(true));
+ assertThat(replySequence.receivedThrowable.get(), is(throwable));
+ }
+
+ @FunctionalInterface
+ private interface MessageReceiver<M> {
+
+ void receiveMessage(FcpConnection fcpConnection, M message);
+
+ }
+
+ private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
+
+ private final AtomicReference<String> gotMessage = new AtomicReference<>();
+ private final AtomicReference<String> expectedMessage = new AtomicReference<>();
+ private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
+
+ public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
+ super(executorService, fcpConnection);
+ }
+
+ public void setExpectedMessage(String expectedMessage) {
+ this.expectedMessage.set(expectedMessage);
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return getResult();
+ }
+
+ @Override
+ protected Boolean getResult() {
+ return expectedMessage.get().equals(gotMessage.get());
+ }
+
+ @Override
+ protected void consumeNodeHello(NodeHello nodeHello) {
+ gotMessage.set(nodeHello.getName());
+ }
+
+ @Override
+ protected void consumeCloseConnectionDuplicateClientName(
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ gotMessage.set(closeConnectionDuplicateClientName.getName());
+ }
+
+ @Override
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+ gotMessage.set(sskKeypair.getName());
+ }
+
+ @Override
+ protected void consumePeer(Peer peer) {
+ gotMessage.set(peer.getName());
+ }
+
+ @Override
+ protected void consumeEndListPeers(EndListPeers endListPeers) {
+ gotMessage.set(endListPeers.getName());
+ }
+
+ @Override
+ protected void consumePeerNote(PeerNote peerNote) {
+ gotMessage.set(peerNote.getName());
+ }
+
+ @Override
+ protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+ gotMessage.set(endListPeerNotes.getName());
+ }
+
+ @Override
+ protected void consumePeerRemoved(PeerRemoved peerRemoved) {
+ gotMessage.set(peerRemoved.getName());
+ }
+
+ @Override
+ protected void consumeNodeData(NodeData nodeData) {
+ gotMessage.set(nodeData.getName());
+ }
+
+ @Override
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+ gotMessage.set(testDDAReply.getName());
+ }
+
+ @Override
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+ gotMessage.set(testDDAComplete.getName());
+ }
+
+ @Override
+ protected void consumePersistentGet(PersistentGet persistentGet) {
+ gotMessage.set(persistentGet.getName());
+ }
+
+ @Override
+ protected void consumePersistentPut(PersistentPut persistentPut) {
+ gotMessage.set(persistentPut.getName());
+ }
+
+ @Override
+ protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+ gotMessage.set(endListPersistentRequests.getName());
+ }
+
+ @Override
+ protected void consumeURIGenerated(URIGenerated uriGenerated) {
+ gotMessage.set(uriGenerated.getName());
+ }
+
+ @Override
+ protected void consumeDataFound(DataFound dataFound) {
+ gotMessage.set(dataFound.getName());
+ }
+
+ @Override
+ protected void consumeAllData(AllData allData) {
+ gotMessage.set(allData.getName());
+ }
+
+ @Override
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+ gotMessage.set(simpleProgress.getName());
+ }
+
+ @Override
+ protected void consumeStartedCompression(StartedCompression startedCompression) {
+ gotMessage.set(startedCompression.getName());
+ }
+
+ @Override
+ protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
+ gotMessage.set(finishedCompression.getName());
+ }
+
+ @Override
+ protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+ gotMessage.set(unknownPeerNoteType.getName());
+ }
+
+ @Override
+ protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+ gotMessage.set(unknownNodeIdentifier.getName());
+ }
+
+ @Override
+ protected void consumeConfigData(ConfigData configData) {
+ gotMessage.set(configData.getName());
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ gotMessage.set(getFailed.getName());
+ }
+
+ @Override
+ protected void consumePutFailed(PutFailed putFailed) {
+ gotMessage.set(putFailed.getName());
+ }
+
+ @Override
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+ gotMessage.set(identifierCollision.getName());
+ }
+
+ @Override
+ protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
+ gotMessage.set(persistentPutDir.getName());
+ }
+
+ @Override
+ protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+ gotMessage.set(persistentRequestRemoved.getName());
+ }
+
+ @Override
+ protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+ gotMessage.set(subscribedUSKUpdate.getName());
+ }
+
+ @Override
+ protected void consumePluginInfo(PluginInfo pluginInfo) {
+ gotMessage.set(pluginInfo.getName());
+ }
+
+ @Override
+ protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
+ gotMessage.set(fcpPluginReply.getName());
+ }
+
+ @Override
+ protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
+ gotMessage.set(persistentRequestModified.getName());
+ }
+
+ @Override
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+ gotMessage.set(putSuccessful.getName());
+ }
+
+ @Override
+ protected void consumePutFetchable(PutFetchable putFetchable) {
+ gotMessage.set(putFetchable.getName());
+ }
+
+ @Override
+ protected void consumeSentFeed(SentFeed sentFeed) {
+ gotMessage.set(sentFeed.getName());
+ }
+
+ @Override
+ protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
+ gotMessage.set(receivedBookmarkFeed.getName());
+ }
+
+ @Override
+ protected void consumeProtocolError(ProtocolError protocolError) {
+ gotMessage.set(protocolError.getName());
+ }
+
+ @Override
+ protected void consumeUnknownMessage(FcpMessage fcpMessage) {
+ gotMessage.set(fcpMessage.getName());
+ }
+
+ @Override
+ protected void consumeConnectionClosed(Throwable throwable) {
+ receivedThrowable.set(throwable);
+ gotMessage.set("ConnectionClosed");
+ }
+
}
}