1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Consumer;
12 import net.pterodactylus.fcp.AllData;
13 import net.pterodactylus.fcp.BaseMessage;
14 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
15 import net.pterodactylus.fcp.ConfigData;
16 import net.pterodactylus.fcp.DataFound;
17 import net.pterodactylus.fcp.EndListPeerNotes;
18 import net.pterodactylus.fcp.EndListPeers;
19 import net.pterodactylus.fcp.EndListPersistentRequests;
20 import net.pterodactylus.fcp.FCPPluginReply;
21 import net.pterodactylus.fcp.FcpConnection;
22 import net.pterodactylus.fcp.FcpListener;
23 import net.pterodactylus.fcp.FcpMessage;
24 import net.pterodactylus.fcp.FinishedCompression;
25 import net.pterodactylus.fcp.GetFailed;
26 import net.pterodactylus.fcp.IdentifierCollision;
27 import net.pterodactylus.fcp.NodeData;
28 import net.pterodactylus.fcp.NodeHello;
29 import net.pterodactylus.fcp.Peer;
30 import net.pterodactylus.fcp.PeerNote;
31 import net.pterodactylus.fcp.PeerRemoved;
32 import net.pterodactylus.fcp.PersistentGet;
33 import net.pterodactylus.fcp.PersistentPut;
34 import net.pterodactylus.fcp.PersistentPutDir;
35 import net.pterodactylus.fcp.PersistentRequestModified;
36 import net.pterodactylus.fcp.PersistentRequestRemoved;
37 import net.pterodactylus.fcp.PluginInfo;
38 import net.pterodactylus.fcp.ProtocolError;
39 import net.pterodactylus.fcp.PutFailed;
40 import net.pterodactylus.fcp.PutFetchable;
41 import net.pterodactylus.fcp.PutSuccessful;
42 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
43 import net.pterodactylus.fcp.SSKKeypair;
44 import net.pterodactylus.fcp.SentFeed;
45 import net.pterodactylus.fcp.SimpleProgress;
46 import net.pterodactylus.fcp.StartedCompression;
47 import net.pterodactylus.fcp.SubscribedUSKUpdate;
48 import net.pterodactylus.fcp.TestDDAComplete;
49 import net.pterodactylus.fcp.TestDDAReply;
50 import net.pterodactylus.fcp.URIGenerated;
51 import net.pterodactylus.fcp.UnknownNodeIdentifier;
52 import net.pterodactylus.fcp.UnknownPeerNoteType;
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.ListeningExecutorService;
56 import com.google.common.util.concurrent.MoreExecutors;
59 * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
61 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
63 public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
65 private final Object syncObject = new Object();
66 private final ListeningExecutorService executorService;
67 private final FcpConnection fcpConnection;
68 private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
69 private final AtomicReference<String> identifier = new AtomicReference<>();
70 private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
72 public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
73 this.executorService = MoreExecutors.listeningDecorator(executorService);
74 this.fcpConnection = fcpConnection;
77 protected void setIdentifier(String identifier) {
78 this.identifier.set(identifier);
81 protected abstract boolean isFinished();
83 public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
84 setIdentifier(fcpMessage.getField("Identifier"));
85 fcpConnection.addFcpListener(this);
86 messages.add(fcpMessage);
87 return executorService.submit(() -> {
88 synchronized (syncObject) {
89 while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
90 while (messages.peek() != null) {
91 FcpMessage message = messages.poll();
92 fcpConnection.sendMessage(message);
94 if (isFinished() || (connectionFailureReason.get() != null)) {
100 Throwable throwable = connectionFailureReason.get();
101 if (throwable != null) {
102 throw new ExecutionException(throwable);
108 protected void sendMessage(FcpMessage fcpMessage) {
109 messages.add(fcpMessage);
113 private void notifySyncObject() {
114 synchronized (syncObject) {
115 syncObject.notifyAll();
119 protected R getResult() {
124 public void close() {
125 fcpConnection.removeFcpListener(this);
128 private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
129 consume(consumer, message, "Identifier");
132 private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
134 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
135 consumeAlways(consumer, message);
139 private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
140 consumer.accept(message);
144 private void consumeUnknown(FcpMessage fcpMessage) {
145 if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
146 consumeUnknownMessage(fcpMessage);
151 private void consumeClose(Throwable throwable) {
152 connectionFailureReason.set(throwable);
157 public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
158 consume(this::consumeNodeHello, nodeHello);
161 protected void consumeNodeHello(NodeHello nodeHello) { }
164 public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
165 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
166 consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
169 protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
172 public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
173 consume(this::consumeSSKKeypair, sskKeypair);
176 protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
179 public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
180 consume(this::consumePeer, peer);
183 protected void consumePeer(Peer peer) { }
186 public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
187 consume(this::consumeEndListPeers, endListPeers);
190 protected void consumeEndListPeers(EndListPeers endListPeers) { }
193 public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
194 consume(this::consumePeerNote, peerNote);
197 protected void consumePeerNote(PeerNote peerNote) { }
200 public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
201 consume(this::consumeEndListPeerNotes, endListPeerNotes);
204 protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
207 public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
208 consume(this::consumePeerRemoved, peerRemoved);
211 protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
214 public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
215 consume(this::consumeNodeData, nodeData);
218 protected void consumeNodeData(NodeData nodeData) { }
221 public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
222 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
225 protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
228 public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
229 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
232 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
235 public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
236 consume(this::consumePersistentGet, persistentGet);
239 protected void consumePersistentGet(PersistentGet persistentGet) { }
242 public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
243 consume(this::consumePersistentPut, persistentPut);
246 protected void consumePersistentPut(PersistentPut persistentPut) { }
249 public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
250 EndListPersistentRequests endListPersistentRequests) {
251 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
254 protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
257 public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
258 consume(this::consumeURIGenerated, uriGenerated);
261 protected void consumeURIGenerated(URIGenerated uriGenerated) { }
264 public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
265 consume(this::consumeDataFound, dataFound);
268 protected void consumeDataFound(DataFound dataFound) { }
271 public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
272 consume(this::consumeAllData, allData);
275 protected void consumeAllData(AllData allData) { }
278 public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
279 consume(this::consumeSimpleProgress, simpleProgress);
282 protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
285 public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
286 consume(this::consumeStartedCompression, startedCompression);
289 protected void consumeStartedCompression(StartedCompression startedCompression) { }
292 public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
293 consume(this::consumeFinishedCompression, finishedCompression);
296 protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
299 public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
300 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
303 protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
306 public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
307 UnknownNodeIdentifier unknownNodeIdentifier) {
308 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
311 protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
314 public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
315 consume(this::consumeConfigData, configData);
318 protected void consumeConfigData(ConfigData configData) { }
321 public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
322 consume(this::consumeGetFailed, getFailed);
325 protected void consumeGetFailed(GetFailed getFailed) { }
328 public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
329 consume(this::consumePutFailed, putFailed);
332 protected void consumePutFailed(PutFailed putFailed) { }
335 public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
336 consume(this::consumeIdentifierCollision, identifierCollision);
339 protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
342 public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
343 consume(this::consumePersistentPutDir, persistentPutDir);
346 protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
349 public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
350 PersistentRequestRemoved persistentRequestRemoved) {
351 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
354 protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
357 public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
358 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
361 protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
364 public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
365 consume(this::consumePluginInfo, pluginInfo);
368 protected void consumePluginInfo(PluginInfo pluginInfo) { }
371 public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
372 consume(this::consumeFCPPluginReply, fcpPluginReply);
375 protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
378 public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
379 PersistentRequestModified persistentRequestModified) {
380 consume(this::consumePersistentRequestModified, persistentRequestModified);
383 protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
386 public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
387 consume(this::consumePutSuccessful, putSuccessful);
390 protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
393 public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
394 consume(this::consumePutFetchable, putFetchable);
397 protected void consumePutFetchable(PutFetchable putFetchable) { }
400 public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
401 consume(this::consumeSentFeed, sentFeed);
404 protected void consumeSentFeed(SentFeed sentFeed) { }
407 public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
408 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
411 protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
414 public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
415 consume(this::consumeProtocolError, protocolError);
418 protected void consumeProtocolError(ProtocolError protocolError) { }
421 public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
422 consumeUnknown(fcpMessage);
425 protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
428 public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
429 consumeClose(throwable);