1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashMap;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Future;
10 import java.util.function.Consumer;
11 import java.util.function.Supplier;
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.ProtocolError;
40 import net.pterodactylus.fcp.PutFailed;
41 import net.pterodactylus.fcp.PutFetchable;
42 import net.pterodactylus.fcp.PutSuccessful;
43 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
44 import net.pterodactylus.fcp.SSKKeypair;
45 import net.pterodactylus.fcp.SentFeed;
46 import net.pterodactylus.fcp.SimpleProgress;
47 import net.pterodactylus.fcp.StartedCompression;
48 import net.pterodactylus.fcp.SubscribedUSKUpdate;
49 import net.pterodactylus.fcp.TestDDAComplete;
50 import net.pterodactylus.fcp.TestDDAReply;
51 import net.pterodactylus.fcp.URIGenerated;
52 import net.pterodactylus.fcp.UnknownNodeIdentifier;
53 import net.pterodactylus.fcp.UnknownPeerNoteType;
56 * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
58 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
60 public class FcpReplySequence implements AutoCloseable, FcpListener {
62 private final ExecutorService executorService;
63 private final FcpConnection fcpConnection;
64 private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
65 private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
66 private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
67 private Supplier<Boolean> endPredicate;
69 public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
70 this.executorService = executorService;
71 this.fcpConnection = fcpConnection;
74 public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
75 return new $1<>(messageClass);
78 public class $1<M extends BaseMessage> {
80 private Class<M> messageClass;
82 private $1(Class<M> messageClass) {
83 this.messageClass = messageClass;
86 public FcpReplySequence with(Consumer<M> action) {
87 expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
88 return FcpReplySequence.this;
93 public $2 handleUnknown() {
99 public FcpReplySequence with(Consumer<FcpMessage> consumer) {
100 unknownMessageHandlers.add(consumer);
101 return FcpReplySequence.this;
106 public $3 handleClose() {
112 public FcpReplySequence with(Consumer<Throwable> consumer) {
113 closeHandlers.add(consumer);
114 return FcpReplySequence.this;
119 public void waitFor(Supplier<Boolean> endPredicate) {
120 this.endPredicate = endPredicate;
123 public Future<?> send(FcpMessage fcpMessage) throws IOException {
124 fcpConnection.addFcpListener(this);
125 fcpConnection.sendMessage(fcpMessage);
126 return executorService.submit(() -> {
127 synchronized (endPredicate) {
128 while (!endPredicate.get()) {
137 public void close() {
138 fcpConnection.removeFcpListener(this);
141 private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
142 if (expectedMessageActions.containsKey(fcpMessageClass)) {
143 expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
145 synchronized (endPredicate) {
146 endPredicate.notifyAll();
150 private void consumeUnknown(FcpMessage fcpMessage) {
151 for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
152 unknownMessageHandler.accept(fcpMessage);
154 synchronized (endPredicate) {
155 endPredicate.notifyAll();
159 private void consumeClose(Throwable throwable) {
160 for (Consumer<Throwable> closeHandler : closeHandlers) {
161 closeHandler.accept(throwable);
163 synchronized (endPredicate) {
164 endPredicate.notifyAll();
169 public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
170 consume(NodeHello.class, nodeHello);
174 public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
175 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
176 consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
180 public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
181 consume(SSKKeypair.class, sskKeypair);
185 public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
186 consume(Peer.class, peer);
190 public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
191 consume(EndListPeers.class, endListPeers);
195 public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
196 consume(PeerNote.class, peerNote);
200 public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
201 consume(EndListPeerNotes.class, endListPeerNotes);
205 public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
206 consume(PeerRemoved.class, peerRemoved);
210 public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
211 consume(NodeData.class, nodeData);
215 public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
216 consume(TestDDAReply.class, testDDAReply);
220 public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
221 consume(TestDDAComplete.class, testDDAComplete);
225 public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
226 consume(PersistentGet.class, persistentGet);
230 public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
231 consume(PersistentPut.class, persistentPut);
235 public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
236 EndListPersistentRequests endListPersistentRequests) {
237 consume(EndListPersistentRequests.class, endListPersistentRequests);
241 public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
242 consume(URIGenerated.class, uriGenerated);
246 public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
247 consume(DataFound.class, dataFound);
251 public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
252 consume(AllData.class, allData);
256 public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
257 consume(SimpleProgress.class, simpleProgress);
261 public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
262 consume(StartedCompression.class, startedCompression);
266 public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
267 consume(FinishedCompression.class, finishedCompression);
271 public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
272 consume(UnknownPeerNoteType.class, unknownPeerNoteType);
276 public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
277 UnknownNodeIdentifier unknownNodeIdentifier) {
278 consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
282 public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
283 consume(ConfigData.class, configData);
287 public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
288 consume(GetFailed.class, getFailed);
292 public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
293 consume(PutFailed.class, putFailed);
297 public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
298 consume(IdentifierCollision.class, identifierCollision);
302 public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
303 consume(PersistentPutDir.class, persistentPutDir);
307 public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
308 PersistentRequestRemoved persistentRequestRemoved) {
309 consume(PersistentRequestRemoved.class, persistentRequestRemoved);
313 public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
314 consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
318 public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
319 consume(PluginInfo.class, pluginInfo);
323 public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
324 consume(FCPPluginReply.class, fcpPluginReply);
328 public void receivedPersistentRequestModified(FcpConnection fcpConnection,
329 PersistentRequestModified persistentRequestModified) {
330 consume(PersistentRequestModified.class, persistentRequestModified);
334 public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
335 consume(PutSuccessful.class, putSuccessful);
339 public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
340 consume(PutFetchable.class, putFetchable);
344 public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
345 consume(SentFeed.class, sentFeed);
349 public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
350 consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
354 public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
355 consume(ProtocolError.class, protocolError);
359 public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
360 consumeUnknown(fcpMessage);
364 public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
365 consumeClose(throwable);