5be102c5b91ad0b92fdc813f9971e07c2b640c72
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpDialog.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Consumer;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.PluginRemoved;
40 import net.pterodactylus.fcp.ProtocolError;
41 import net.pterodactylus.fcp.PutFailed;
42 import net.pterodactylus.fcp.PutFetchable;
43 import net.pterodactylus.fcp.PutSuccessful;
44 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
45 import net.pterodactylus.fcp.SSKKeypair;
46 import net.pterodactylus.fcp.SentFeed;
47 import net.pterodactylus.fcp.SimpleProgress;
48 import net.pterodactylus.fcp.StartedCompression;
49 import net.pterodactylus.fcp.SubscribedUSKUpdate;
50 import net.pterodactylus.fcp.TestDDAComplete;
51 import net.pterodactylus.fcp.TestDDAReply;
52 import net.pterodactylus.fcp.URIGenerated;
53 import net.pterodactylus.fcp.UnknownNodeIdentifier;
54 import net.pterodactylus.fcp.UnknownPeerNoteType;
55
56 import com.google.common.util.concurrent.ListenableFuture;
57 import com.google.common.util.concurrent.ListeningExecutorService;
58 import com.google.common.util.concurrent.MoreExecutors;
59
60 /**
61  * An FCP dialog enables you to conveniently wait for a specific set of FCP replies.
62  *
63  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
64  */
65 public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
66
67         private final Object syncObject = new Object();
68         private final ListeningExecutorService executorService;
69         private final FcpConnection fcpConnection;
70         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
71         private final AtomicReference<String> identifier = new AtomicReference<>();
72         private final AtomicBoolean connectionClosed = new AtomicBoolean();
73         private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
74
75         public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
76                 this.executorService = MoreExecutors.listeningDecorator(executorService);
77                 this.fcpConnection = fcpConnection;
78         }
79
80         protected void setIdentifier(String identifier) {
81                 this.identifier.set(identifier);
82         }
83
84         protected abstract boolean isFinished();
85
86         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
87                 setIdentifier(fcpMessage.getField("Identifier"));
88                 fcpConnection.addFcpListener(this);
89                 messages.add(fcpMessage);
90                 return executorService.submit(() -> {
91                         synchronized (syncObject) {
92                                 while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
93                                         while (messages.peek() != null) {
94                                                 FcpMessage message = messages.poll();
95                                                 fcpConnection.sendMessage(message);
96                                         }
97                                         if (isFinished() || connectionClosed.get()) {
98                                                 continue;
99                                         }
100                                         syncObject.wait();
101                                 }
102                         }
103                         Throwable throwable = connectionFailureReason.get();
104                         if (throwable != null) {
105                                 throw new ExecutionException(throwable);
106                         }
107                         return getResult();
108                 });
109         }
110
111         protected void sendMessage(FcpMessage fcpMessage) {
112                 messages.add(fcpMessage);
113                 notifySyncObject();
114         }
115
116         private void notifySyncObject() {
117                 synchronized (syncObject) {
118                         syncObject.notifyAll();
119                 }
120         }
121
122         protected R getResult() {
123                 return null;
124         }
125
126         @Override
127         public void close() {
128                 fcpConnection.removeFcpListener(this);
129         }
130
131         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
132                 consume(consumer, message, "Identifier");
133         }
134
135         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
136                         String identifier) {
137                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
138                         consumeAlways(consumer, message);
139                 }
140         }
141
142         private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
143                 consumer.accept(message);
144                 notifySyncObject();
145         }
146
147         private void consumeUnknown(FcpMessage fcpMessage) {
148                 consumeUnknownMessage(fcpMessage);
149                 notifySyncObject();
150         }
151
152         private void consumeClose(Throwable throwable) {
153                 connectionFailureReason.set(throwable);
154                 connectionClosed.set(true);
155                 notifySyncObject();
156         }
157
158         @Override
159         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
160                 consume(this::consumeNodeHello, nodeHello);
161         }
162
163         protected void consumeNodeHello(NodeHello nodeHello) { }
164
165         @Override
166         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
167                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
168                 connectionFailureReason.set(new IOException("duplicate client name"));
169                 connectionClosed.set(true);
170                 notifySyncObject();
171         }
172
173         @Override
174         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
175                 consume(this::consumeSSKKeypair, sskKeypair);
176         }
177
178         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
179
180         @Override
181         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
182                 consume(this::consumePeer, peer);
183         }
184
185         protected void consumePeer(Peer peer) { }
186
187         @Override
188         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
189                 consume(this::consumeEndListPeers, endListPeers);
190         }
191
192         protected void consumeEndListPeers(EndListPeers endListPeers) { }
193
194         @Override
195         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
196                 consume(this::consumePeerNote, peerNote);
197         }
198
199         protected void consumePeerNote(PeerNote peerNote) { }
200
201         @Override
202         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
203                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
204         }
205
206         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
207
208         @Override
209         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
210                 consume(this::consumePeerRemoved, peerRemoved);
211         }
212
213         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
214
215         @Override
216         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
217                 consume(this::consumeNodeData, nodeData);
218         }
219
220         protected void consumeNodeData(NodeData nodeData) { }
221
222         @Override
223         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
224                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
225         }
226
227         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
228
229         @Override
230         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
231                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
232         }
233
234         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
235
236         @Override
237         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
238                 consume(this::consumePersistentGet, persistentGet);
239         }
240
241         protected void consumePersistentGet(PersistentGet persistentGet) { }
242
243         @Override
244         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
245                 consume(this::consumePersistentPut, persistentPut);
246         }
247
248         protected void consumePersistentPut(PersistentPut persistentPut) { }
249
250         @Override
251         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
252                         EndListPersistentRequests endListPersistentRequests) {
253                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
254         }
255
256         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
257
258         @Override
259         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
260                 consume(this::consumeURIGenerated, uriGenerated);
261         }
262
263         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
264
265         @Override
266         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
267                 consume(this::consumeDataFound, dataFound);
268         }
269
270         protected void consumeDataFound(DataFound dataFound) { }
271
272         @Override
273         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
274                 consume(this::consumeAllData, allData);
275         }
276
277         protected void consumeAllData(AllData allData) { }
278
279         @Override
280         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
281                 consume(this::consumeSimpleProgress, simpleProgress);
282         }
283
284         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
285
286         @Override
287         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
288                 consume(this::consumeStartedCompression, startedCompression);
289         }
290
291         protected void consumeStartedCompression(StartedCompression startedCompression) { }
292
293         @Override
294         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
295                 consume(this::consumeFinishedCompression, finishedCompression);
296         }
297
298         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
299
300         @Override
301         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
302                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
303         }
304
305         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
306
307         @Override
308         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
309                         UnknownNodeIdentifier unknownNodeIdentifier) {
310                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
311         }
312
313         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
314
315         @Override
316         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
317                 consume(this::consumeConfigData, configData);
318         }
319
320         protected void consumeConfigData(ConfigData configData) { }
321
322         @Override
323         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
324                 consume(this::consumeGetFailed, getFailed);
325         }
326
327         protected void consumeGetFailed(GetFailed getFailed) { }
328
329         @Override
330         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
331                 consume(this::consumePutFailed, putFailed);
332         }
333
334         protected void consumePutFailed(PutFailed putFailed) { }
335
336         @Override
337         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
338                 consume(this::consumeIdentifierCollision, identifierCollision);
339         }
340
341         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
342
343         @Override
344         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
345                 consume(this::consumePersistentPutDir, persistentPutDir);
346         }
347
348         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
349
350         @Override
351         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
352                         PersistentRequestRemoved persistentRequestRemoved) {
353                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
354         }
355
356         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
357
358         @Override
359         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
360                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
361         }
362
363         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
364
365         @Override
366         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
367                 consume(this::consumePluginInfo, pluginInfo);
368         }
369
370         protected void consumePluginInfo(PluginInfo pluginInfo) { }
371
372         @Override
373         public final void receivedPluginRemoved(FcpConnection fcpConnection, PluginRemoved pluginRemoved) {
374                 consume(this::consumePluginRemoved, pluginRemoved);
375         }
376
377         protected void consumePluginRemoved(PluginRemoved pluginRemoved) { }
378
379         @Override
380         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
381                 consume(this::consumeFCPPluginReply, fcpPluginReply);
382         }
383
384         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
385
386         @Override
387         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
388                         PersistentRequestModified persistentRequestModified) {
389                 consume(this::consumePersistentRequestModified, persistentRequestModified);
390         }
391
392         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
393
394         @Override
395         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
396                 consume(this::consumePutSuccessful, putSuccessful);
397         }
398
399         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
400
401         @Override
402         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
403                 consume(this::consumePutFetchable, putFetchable);
404         }
405
406         protected void consumePutFetchable(PutFetchable putFetchable) { }
407
408         @Override
409         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
410                 consume(this::consumeSentFeed, sentFeed);
411         }
412
413         protected void consumeSentFeed(SentFeed sentFeed) { }
414
415         @Override
416         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
417                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
418         }
419
420         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
421
422         @Override
423         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
424                 consume(this::consumeProtocolError, protocolError);
425         }
426
427         protected void consumeProtocolError(ProtocolError protocolError) { }
428
429         @Override
430         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
431                 consumeUnknown(fcpMessage);
432         }
433
434         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
435
436         @Override
437         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
438                 consumeClose(throwable);
439         }
440
441 }