1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.util.concurrent.ExecutorService;
5 import java.util.function.Consumer;
7 import net.pterodactylus.fcp.AllData;
8 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
9 import net.pterodactylus.fcp.ConfigData;
10 import net.pterodactylus.fcp.DataFound;
11 import net.pterodactylus.fcp.EndListPeerNotes;
12 import net.pterodactylus.fcp.EndListPeers;
13 import net.pterodactylus.fcp.EndListPersistentRequests;
14 import net.pterodactylus.fcp.FCPPluginReply;
15 import net.pterodactylus.fcp.FcpConnection;
16 import net.pterodactylus.fcp.FcpListener;
17 import net.pterodactylus.fcp.FcpMessage;
18 import net.pterodactylus.fcp.FinishedCompression;
19 import net.pterodactylus.fcp.GetFailed;
20 import net.pterodactylus.fcp.IdentifierCollision;
21 import net.pterodactylus.fcp.NodeData;
22 import net.pterodactylus.fcp.NodeHello;
23 import net.pterodactylus.fcp.Peer;
24 import net.pterodactylus.fcp.PeerNote;
25 import net.pterodactylus.fcp.PeerRemoved;
26 import net.pterodactylus.fcp.PersistentGet;
27 import net.pterodactylus.fcp.PersistentPut;
28 import net.pterodactylus.fcp.PersistentPutDir;
29 import net.pterodactylus.fcp.PersistentRequestModified;
30 import net.pterodactylus.fcp.PersistentRequestRemoved;
31 import net.pterodactylus.fcp.PluginInfo;
32 import net.pterodactylus.fcp.ProtocolError;
33 import net.pterodactylus.fcp.PutFailed;
34 import net.pterodactylus.fcp.PutFetchable;
35 import net.pterodactylus.fcp.PutSuccessful;
36 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
37 import net.pterodactylus.fcp.SSKKeypair;
38 import net.pterodactylus.fcp.SentFeed;
39 import net.pterodactylus.fcp.SimpleProgress;
40 import net.pterodactylus.fcp.StartedCompression;
41 import net.pterodactylus.fcp.SubscribedUSKUpdate;
42 import net.pterodactylus.fcp.TestDDAComplete;
43 import net.pterodactylus.fcp.TestDDAReply;
44 import net.pterodactylus.fcp.URIGenerated;
45 import net.pterodactylus.fcp.UnknownNodeIdentifier;
46 import net.pterodactylus.fcp.UnknownPeerNoteType;
48 import com.google.common.util.concurrent.ListenableFuture;
49 import com.google.common.util.concurrent.ListeningExecutorService;
50 import com.google.common.util.concurrent.MoreExecutors;
53 * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
55 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
57 public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
59 private final Object syncObject = new Object();
60 private final ListeningExecutorService executorService;
61 private final FcpConnection fcpConnection;
63 public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
64 this.executorService = MoreExecutors.listeningDecorator(executorService);
65 this.fcpConnection = fcpConnection;
68 protected abstract boolean isFinished();
70 public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
72 fcpConnection.addFcpListener(this);
74 } catch (Throwable throwable) {
75 throwable.printStackTrace();
77 fcpConnection.sendMessage(fcpMessage);
78 return executorService.submit(() -> {
79 synchronized (syncObject) {
80 while (!isFinished()) {
88 protected R getResult() {
94 fcpConnection.removeFcpListener(this);
97 private <M> void consume(Consumer<M> consumer, M message) {
98 consumer.accept(message);
99 synchronized (syncObject) {
100 syncObject.notifyAll();
104 private void consumeUnknown(FcpMessage fcpMessage) {
105 consumeUnknownMessage(fcpMessage);
106 synchronized (syncObject) {
107 syncObject.notifyAll();
111 private void consumeClose(Throwable throwable) {
112 consumeConnectionClosed(throwable);
113 synchronized (syncObject) {
114 syncObject.notifyAll();
119 public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
120 consume(this::consumeNodeHello, nodeHello);
123 protected void consumeNodeHello(NodeHello nodeHello) { }
126 public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
127 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
128 consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
131 protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
134 public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
135 consume(this::consumeSSKKeypair, sskKeypair);
138 protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
141 public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
142 consume(this::consumePeer, peer);
145 protected void consumePeer(Peer peer) { }
148 public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
149 consume(this::consumeEndListPeers, endListPeers);
152 protected void consumeEndListPeers(EndListPeers endListPeers) { }
155 public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
156 consume(this::consumePeerNote, peerNote);
159 protected void consumePeerNote(PeerNote peerNote) { }
162 public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
163 consume(this::consumeEndListPeerNotes, endListPeerNotes);
166 protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
169 public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
170 consume(this::consumePeerRemoved, peerRemoved);
173 protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
176 public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
177 consume(this::consumeNodeData, nodeData);
180 protected void consumeNodeData(NodeData nodeData) { }
183 public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
184 consume(this::consumeTestDDAReply, testDDAReply);
187 protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
190 public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
191 consume(this::consumeTestDDAComplete, testDDAComplete);
194 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
197 public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
198 consume(this::consumePersistentGet, persistentGet);
201 protected void consumePersistentGet(PersistentGet persistentGet) { }
204 public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
205 consume(this::consumePersistentPut, persistentPut);
208 protected void consumePersistentPut(PersistentPut persistentPut) { }
211 public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
212 EndListPersistentRequests endListPersistentRequests) {
213 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
216 protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
219 public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
220 consume(this::consumeURIGenerated, uriGenerated);
223 protected void consumeURIGenerated(URIGenerated uriGenerated) { }
226 public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
227 consume(this::consumeDataFound, dataFound);
230 protected void consumeDataFound(DataFound dataFound) { }
233 public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
234 consume(this::consumeAllData, allData);
237 protected void consumeAllData(AllData allData) { }
240 public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
241 consume(this::consumeSimpleProgress, simpleProgress);
244 protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
247 public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
248 consume(this::consumeStartedCompression, startedCompression);
251 protected void consumeStartedCompression(StartedCompression startedCompression) { }
254 public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
255 consume(this::consumeFinishedCompression, finishedCompression);
258 protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
261 public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
262 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
265 protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
268 public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
269 UnknownNodeIdentifier unknownNodeIdentifier) {
270 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
273 protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
276 public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
277 consume(this::consumeConfigData, configData);
280 protected void consumeConfigData(ConfigData configData) { }
283 public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
284 consume(this::consumeGetFailed, getFailed);
287 protected void consumeGetFailed(GetFailed getFailed) { }
290 public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
291 consume(this::consumePutFailed, putFailed);
294 protected void consumePutFailed(PutFailed putFailed) { }
297 public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
298 consume(this::consumeIdentifierCollision, identifierCollision);
301 protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
304 public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
305 consume(this::consumePersistentPutDir, persistentPutDir);
308 protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
311 public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
312 PersistentRequestRemoved persistentRequestRemoved) {
313 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
316 protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
319 public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
320 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
323 protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
326 public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
327 consume(this::consumePluginInfo, pluginInfo);
330 protected void consumePluginInfo(PluginInfo pluginInfo) { }
333 public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
334 consume(this::consumeFCPPluginReply, fcpPluginReply);
337 protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
340 public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
341 PersistentRequestModified persistentRequestModified) {
342 consume(this::consumePersistentRequestModified, persistentRequestModified);
345 protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
348 public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
349 consume(this::consumePutSuccessful, putSuccessful);
352 protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
355 public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
356 consume(this::consumePutFetchable, putFetchable);
359 protected void consumePutFetchable(PutFetchable putFetchable) { }
362 public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
363 consume(this::consumeSentFeed, sentFeed);
366 protected void consumeSentFeed(SentFeed sentFeed) { }
369 public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
370 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
373 protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
376 public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
377 consume(this::consumeProtocolError, protocolError);
380 protected void consumeProtocolError(ProtocolError protocolError) { }
383 public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
384 consumeUnknown(fcpMessage);
387 protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
390 public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
391 consumeClose(throwable);
394 protected void consumeConnectionClosed(Throwable throwable) { }