1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Consumer;
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.PluginRemoved;
40 import net.pterodactylus.fcp.ProtocolError;
41 import net.pterodactylus.fcp.PutFailed;
42 import net.pterodactylus.fcp.PutFetchable;
43 import net.pterodactylus.fcp.PutSuccessful;
44 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
45 import net.pterodactylus.fcp.SSKKeypair;
46 import net.pterodactylus.fcp.SentFeed;
47 import net.pterodactylus.fcp.SimpleProgress;
48 import net.pterodactylus.fcp.StartedCompression;
49 import net.pterodactylus.fcp.SubscribedUSK;
50 import net.pterodactylus.fcp.SubscribedUSKUpdate;
51 import net.pterodactylus.fcp.TestDDAComplete;
52 import net.pterodactylus.fcp.TestDDAReply;
53 import net.pterodactylus.fcp.URIGenerated;
54 import net.pterodactylus.fcp.UnknownNodeIdentifier;
55 import net.pterodactylus.fcp.UnknownPeerNoteType;
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.ListeningExecutorService;
59 import com.google.common.util.concurrent.MoreExecutors;
62 * An FCP dialog enables you to conveniently wait for a specific set of FCP replies.
64 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
66 public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
68 private final Object syncObject = new Object();
69 private final ListeningExecutorService executorService;
70 private final FcpConnection fcpConnection;
71 private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
72 private final AtomicReference<String> identifier = new AtomicReference<>();
73 private final AtomicBoolean connectionClosed = new AtomicBoolean();
74 private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
75 private final AtomicBoolean finished = new AtomicBoolean();
76 private final AtomicReference<R> result = new AtomicReference<>();
78 public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection, R initialResult) {
79 this.executorService = MoreExecutors.listeningDecorator(executorService);
80 this.fcpConnection = fcpConnection;
81 result.set(initialResult);
84 protected void setIdentifier(String identifier) {
85 this.identifier.set(identifier);
88 public final boolean isFinished() {
89 return finished.get();
92 protected final void finish() {
96 protected final void setResult(R result) {
97 this.result.set(result);
101 public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
102 setIdentifier(fcpMessage.getField("Identifier"));
103 fcpConnection.addFcpListener(this);
104 messages.add(fcpMessage);
105 return executorService.submit(() -> {
106 synchronized (syncObject) {
107 while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
108 while (messages.peek() != null) {
109 FcpMessage message = messages.poll();
110 fcpConnection.sendMessage(message);
112 if (isFinished() || connectionClosed.get()) {
118 Throwable throwable = connectionFailureReason.get();
119 if (throwable != null) {
120 throw new ExecutionException(throwable);
126 protected void sendMessage(FcpMessage fcpMessage) {
127 messages.add(fcpMessage);
131 private void notifySyncObject() {
132 synchronized (syncObject) {
133 syncObject.notifyAll();
137 protected final R getResult() {
142 public void close() {
143 fcpConnection.removeFcpListener(this);
146 private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
147 consume(consumer, message, "Identifier");
150 private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
152 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
153 consumeAlways(consumer, message);
157 private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
158 consumer.accept(message);
162 private void consumeUnknown(FcpMessage fcpMessage) {
163 consumeUnknownMessage(fcpMessage);
167 private void consumeClose(Throwable throwable) {
168 connectionFailureReason.set(throwable);
169 connectionClosed.set(true);
174 public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
175 consume(this::consumeNodeHello, nodeHello);
178 protected void consumeNodeHello(NodeHello nodeHello) { }
181 public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
182 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
183 connectionFailureReason.set(new IOException("duplicate client name"));
184 connectionClosed.set(true);
189 public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
190 consume(this::consumeSSKKeypair, sskKeypair);
193 protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
196 public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
197 consume(this::consumePeer, peer);
200 protected void consumePeer(Peer peer) { }
203 public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
204 consume(this::consumeEndListPeers, endListPeers);
207 protected void consumeEndListPeers(EndListPeers endListPeers) { }
210 public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
211 consume(this::consumePeerNote, peerNote);
214 protected void consumePeerNote(PeerNote peerNote) { }
217 public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
218 consume(this::consumeEndListPeerNotes, endListPeerNotes);
221 protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
224 public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
225 consume(this::consumePeerRemoved, peerRemoved);
228 protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
231 public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
232 consume(this::consumeNodeData, nodeData);
235 protected void consumeNodeData(NodeData nodeData) { }
238 public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
239 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
242 protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
245 public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
246 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
249 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
252 public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
253 consume(this::consumePersistentGet, persistentGet);
256 protected void consumePersistentGet(PersistentGet persistentGet) { }
259 public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
260 consume(this::consumePersistentPut, persistentPut);
263 protected void consumePersistentPut(PersistentPut persistentPut) { }
266 public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
267 EndListPersistentRequests endListPersistentRequests) {
268 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
271 protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
274 public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
275 consume(this::consumeURIGenerated, uriGenerated);
278 protected void consumeURIGenerated(URIGenerated uriGenerated) { }
281 public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
282 consume(this::consumeDataFound, dataFound);
285 protected void consumeDataFound(DataFound dataFound) { }
288 public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
289 consume(this::consumeAllData, allData);
292 protected void consumeAllData(AllData allData) { }
295 public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
296 consume(this::consumeSimpleProgress, simpleProgress);
299 protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
302 public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
303 consume(this::consumeStartedCompression, startedCompression);
306 protected void consumeStartedCompression(StartedCompression startedCompression) { }
309 public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
310 consume(this::consumeFinishedCompression, finishedCompression);
313 protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
316 public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
317 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
320 protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
323 public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
324 UnknownNodeIdentifier unknownNodeIdentifier) {
325 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
328 protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
331 public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
332 consume(this::consumeConfigData, configData);
335 protected void consumeConfigData(ConfigData configData) { }
338 public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
339 consume(this::consumeGetFailed, getFailed);
342 protected void consumeGetFailed(GetFailed getFailed) { }
345 public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
346 consume(this::consumePutFailed, putFailed);
349 protected void consumePutFailed(PutFailed putFailed) { }
352 public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
353 consume(this::consumeIdentifierCollision, identifierCollision);
356 protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
359 public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
360 consume(this::consumePersistentPutDir, persistentPutDir);
363 protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
366 public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
367 PersistentRequestRemoved persistentRequestRemoved) {
368 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
371 protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
374 public final void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK) {
375 consume(this::consumeSubscribedUSK, subscribedUSK);
378 protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) { }
381 public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
382 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
385 protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
388 public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
389 consume(this::consumePluginInfo, pluginInfo);
392 protected void consumePluginInfo(PluginInfo pluginInfo) { }
395 public final void receivedPluginRemoved(FcpConnection fcpConnection, PluginRemoved pluginRemoved) {
396 consume(this::consumePluginRemoved, pluginRemoved);
399 protected void consumePluginRemoved(PluginRemoved pluginRemoved) { }
402 public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
403 consume(this::consumeFCPPluginReply, fcpPluginReply);
406 protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
409 public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
410 PersistentRequestModified persistentRequestModified) {
411 consume(this::consumePersistentRequestModified, persistentRequestModified);
414 protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
417 public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
418 consume(this::consumePutSuccessful, putSuccessful);
421 protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
424 public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
425 consume(this::consumePutFetchable, putFetchable);
428 protected void consumePutFetchable(PutFetchable putFetchable) { }
431 public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
432 consume(this::consumeSentFeed, sentFeed);
435 protected void consumeSentFeed(SentFeed sentFeed) { }
438 public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
439 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
442 protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
445 public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
446 consume(this::consumeProtocolError, protocolError);
449 protected void consumeProtocolError(ProtocolError protocolError) { }
452 public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
453 consumeUnknown(fcpMessage);
456 protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
459 public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
460 consumeClose(throwable);