package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import net.pterodactylus.fcp.AllData;
-import net.pterodactylus.fcp.BaseMessage;
import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.ConfigData;
import net.pterodactylus.fcp.DataFound;
*
* @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
*/
-public class FcpReplySequence implements AutoCloseable, FcpListener {
+public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
+ private final Object syncObject = new Object();
private final ExecutorService executorService;
private final FcpConnection fcpConnection;
- private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
- private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
- private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
- private Supplier<Boolean> endPredicate;
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = executorService;
this.fcpConnection = fcpConnection;
}
- public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
- return new $1<>(messageClass);
- }
-
- public class $1<M extends BaseMessage> {
-
- private Class<M> messageClass;
-
- private $1(Class<M> messageClass) {
- this.messageClass = messageClass;
- }
-
- public FcpReplySequence with(Consumer<M> action) {
- expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
- return FcpReplySequence.this;
- }
-
- }
-
- public $2 handleUnknown() {
- return new $2();
- }
-
- public class $2 {
-
- public FcpReplySequence with(Consumer<FcpMessage> consumer) {
- unknownMessageHandlers.add(consumer);
- return FcpReplySequence.this;
- }
-
- }
-
- public $3 handleClose() {
- return new $3();
- }
+ protected abstract boolean isFinished();
- public class $3 {
+ public Future<R> send(FcpMessage fcpMessage) throws IOException {
+ try {
+ fcpConnection.addFcpListener(this);
- public FcpReplySequence with(Consumer<Throwable> consumer) {
- closeHandlers.add(consumer);
- return FcpReplySequence.this;
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
}
-
- }
-
- public void waitFor(Supplier<Boolean> endPredicate) {
- this.endPredicate = endPredicate;
- }
-
- public Future<?> send(FcpMessage fcpMessage) throws IOException {
- fcpConnection.addFcpListener(this);
fcpConnection.sendMessage(fcpMessage);
return executorService.submit(() -> {
- synchronized (endPredicate) {
- while (!endPredicate.get()) {
- endPredicate.wait();
+ synchronized (syncObject) {
+ while (!isFinished()) {
+ syncObject.wait();
}
}
- return null;
+ return getResult();
});
}
+ protected R getResult() {
+ return null;
+ }
+
@Override
public void close() {
fcpConnection.removeFcpListener(this);
}
- private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
- if (expectedMessageActions.containsKey(fcpMessageClass)) {
- expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ private <M> void consume(Consumer<M> consumer, M message) {
+ consumer.accept(message);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
private void consumeUnknown(FcpMessage fcpMessage) {
- for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
- unknownMessageHandler.accept(fcpMessage);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ consumeUnknownMessage(fcpMessage);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
private void consumeClose(Throwable throwable) {
- for (Consumer<Throwable> closeHandler : closeHandlers) {
- closeHandler.accept(throwable);
- }
- synchronized (endPredicate) {
- endPredicate.notifyAll();
+ consumeConnectionClosed(throwable);
+ synchronized (syncObject) {
+ syncObject.notifyAll();
}
}
@Override
- public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
- consume(NodeHello.class, nodeHello);
+ public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+ consume(this::consumeNodeHello, nodeHello);
}
+ protected void consumeNodeHello(NodeHello nodeHello) { }
+
@Override
- public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+ public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+ consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
}
+ protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
+
@Override
- public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
- consume(SSKKeypair.class, sskKeypair);
+ public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+ consume(this::consumeSSKKeypair, sskKeypair);
}
+ protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
@Override
- public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
- consume(Peer.class, peer);
+ public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+ consume(this::consumePeer, peer);
}
+ protected void consumePeer(Peer peer) { }
+
@Override
- public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
- consume(EndListPeers.class, endListPeers);
+ public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+ consume(this::consumeEndListPeers, endListPeers);
}
+ protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
@Override
- public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
- consume(PeerNote.class, peerNote);
+ public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+ consume(this::consumePeerNote, peerNote);
}
+ protected void consumePeerNote(PeerNote peerNote) { }
+
@Override
- public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
- consume(EndListPeerNotes.class, endListPeerNotes);
+ public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+ consume(this::consumeEndListPeerNotes, endListPeerNotes);
}
+ protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
@Override
- public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
- consume(PeerRemoved.class, peerRemoved);
+ public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+ consume(this::consumePeerRemoved, peerRemoved);
}
+ protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
@Override
- public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
- consume(NodeData.class, nodeData);
+ public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+ consume(this::consumeNodeData, nodeData);
}
+ protected void consumeNodeData(NodeData nodeData) { }
+
@Override
- public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
- consume(TestDDAReply.class, testDDAReply);
+ public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+ consume(this::consumeTestDDAReply, testDDAReply);
}
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
@Override
- public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
- consume(TestDDAComplete.class, testDDAComplete);
+ public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+ consume(this::consumeTestDDAComplete, testDDAComplete);
}
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
@Override
- public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
- consume(PersistentGet.class, persistentGet);
+ public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+ consume(this::consumePersistentGet, persistentGet);
}
+ protected void consumePersistentGet(PersistentGet persistentGet) { }
+
@Override
- public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
- consume(PersistentPut.class, persistentPut);
+ public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+ consume(this::consumePersistentPut, persistentPut);
}
+ protected void consumePersistentPut(PersistentPut persistentPut) { }
+
@Override
- public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+ public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
EndListPersistentRequests endListPersistentRequests) {
- consume(EndListPersistentRequests.class, endListPersistentRequests);
+ consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
}
+ protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
@Override
- public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
- consume(URIGenerated.class, uriGenerated);
+ public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+ consume(this::consumeURIGenerated, uriGenerated);
}
+ protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
@Override
- public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
- consume(DataFound.class, dataFound);
+ public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+ consume(this::consumeDataFound, dataFound);
}
+ protected void consumeDataFound(DataFound dataFound) { }
+
@Override
- public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
- consume(AllData.class, allData);
+ public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+ consume(this::consumeAllData, allData);
}
+ protected void consumeAllData(AllData allData) { }
+
@Override
- public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
- consume(SimpleProgress.class, simpleProgress);
+ public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+ consume(this::consumeSimpleProgress, simpleProgress);
}
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
@Override
- public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
- consume(StartedCompression.class, startedCompression);
+ public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+ consume(this::consumeStartedCompression, startedCompression);
}
+ protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
@Override
- public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
- consume(FinishedCompression.class, finishedCompression);
+ public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+ consume(this::consumeFinishedCompression, finishedCompression);
}
+ protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
@Override
- public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
- consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+ public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+ consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
}
+ protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
@Override
- public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+ public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
UnknownNodeIdentifier unknownNodeIdentifier) {
- consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+ consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
}
+ protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
@Override
- public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
- consume(ConfigData.class, configData);
+ public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+ consume(this::consumeConfigData, configData);
}
+ protected void consumeConfigData(ConfigData configData) { }
+
@Override
- public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
- consume(GetFailed.class, getFailed);
+ public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+ consume(this::consumeGetFailed, getFailed);
}
+ protected void consumeGetFailed(GetFailed getFailed) { }
+
@Override
- public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
- consume(PutFailed.class, putFailed);
+ public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+ consume(this::consumePutFailed, putFailed);
}
+ protected void consumePutFailed(PutFailed putFailed) { }
+
@Override
- public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
- consume(IdentifierCollision.class, identifierCollision);
+ public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+ consume(this::consumeIdentifierCollision, identifierCollision);
}
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
@Override
- public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
- consume(PersistentPutDir.class, persistentPutDir);
+ public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+ consume(this::consumePersistentPutDir, persistentPutDir);
}
+ protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
@Override
- public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+ public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
PersistentRequestRemoved persistentRequestRemoved) {
- consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+ consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
}
+ protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
@Override
- public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
- consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+ public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+ consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
}
+ protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
@Override
- public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
- consume(PluginInfo.class, pluginInfo);
+ public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+ consume(this::consumePluginInfo, pluginInfo);
}
+ protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
@Override
- public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
- consume(FCPPluginReply.class, fcpPluginReply);
+ public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+ consume(this::consumeFCPPluginReply, fcpPluginReply);
}
+ protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
@Override
- public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+ public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
PersistentRequestModified persistentRequestModified) {
- consume(PersistentRequestModified.class, persistentRequestModified);
+ consume(this::consumePersistentRequestModified, persistentRequestModified);
}
+ protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
@Override
- public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
- consume(PutSuccessful.class, putSuccessful);
+ public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+ consume(this::consumePutSuccessful, putSuccessful);
}
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
@Override
- public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
- consume(PutFetchable.class, putFetchable);
+ public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+ consume(this::consumePutFetchable, putFetchable);
}
+ protected void consumePutFetchable(PutFetchable putFetchable) { }
+
@Override
- public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
- consume(SentFeed.class, sentFeed);
+ public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+ consume(this::consumeSentFeed, sentFeed);
}
+ protected void consumeSentFeed(SentFeed sentFeed) { }
+
@Override
- public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
- consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+ public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+ consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
}
+ protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
@Override
- public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
- consume(ProtocolError.class, protocolError);
+ public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+ consume(this::consumeProtocolError, protocolError);
}
+ protected void consumeProtocolError(ProtocolError protocolError) { }
+
@Override
- public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+ public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
consumeUnknown(fcpMessage);
}
+ protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
@Override
- public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+ public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
consumeClose(throwable);
}
+ protected void consumeConnectionClosed(Throwable throwable) { }
+
}