Refactor FCP dialog
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpDialog.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Consumer;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.PluginRemoved;
40 import net.pterodactylus.fcp.ProtocolError;
41 import net.pterodactylus.fcp.PutFailed;
42 import net.pterodactylus.fcp.PutFetchable;
43 import net.pterodactylus.fcp.PutSuccessful;
44 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
45 import net.pterodactylus.fcp.SSKKeypair;
46 import net.pterodactylus.fcp.SentFeed;
47 import net.pterodactylus.fcp.SimpleProgress;
48 import net.pterodactylus.fcp.StartedCompression;
49 import net.pterodactylus.fcp.SubscribedUSK;
50 import net.pterodactylus.fcp.SubscribedUSKUpdate;
51 import net.pterodactylus.fcp.TestDDAComplete;
52 import net.pterodactylus.fcp.TestDDAReply;
53 import net.pterodactylus.fcp.URIGenerated;
54 import net.pterodactylus.fcp.UnknownNodeIdentifier;
55 import net.pterodactylus.fcp.UnknownPeerNoteType;
56
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.ListeningExecutorService;
59 import com.google.common.util.concurrent.MoreExecutors;
60
61 /**
62  * An FCP dialog enables you to conveniently wait for a specific set of FCP replies.
63  *
64  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
65  */
66 public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
67
68         private final Object syncObject = new Object();
69         private final ListeningExecutorService executorService;
70         private final FcpConnection fcpConnection;
71         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
72         private final AtomicReference<String> identifier = new AtomicReference<>();
73         private final AtomicBoolean connectionClosed = new AtomicBoolean();
74         private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
75         private final AtomicBoolean finished = new AtomicBoolean();
76         private final AtomicReference<R> result = new AtomicReference<>();
77
78         public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection, R initialResult) {
79                 this.executorService = MoreExecutors.listeningDecorator(executorService);
80                 this.fcpConnection = fcpConnection;
81                 result.set(initialResult);
82         }
83
84         protected void setIdentifier(String identifier) {
85                 this.identifier.set(identifier);
86         }
87
88         public final boolean isFinished() {
89                 return finished.get();
90         }
91
92         protected final void finish() {
93                 finished.set(true);
94         }
95
96         protected final void setResult(R result) {
97                 this.result.set(result);
98                 finish();
99         }
100
101         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
102                 setIdentifier(fcpMessage.getField("Identifier"));
103                 fcpConnection.addFcpListener(this);
104                 messages.add(fcpMessage);
105                 return executorService.submit(() -> {
106                         synchronized (syncObject) {
107                                 while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
108                                         while (messages.peek() != null) {
109                                                 FcpMessage message = messages.poll();
110                                                 fcpConnection.sendMessage(message);
111                                         }
112                                         if (isFinished() || connectionClosed.get()) {
113                                                 continue;
114                                         }
115                                         syncObject.wait();
116                                 }
117                         }
118                         Throwable throwable = connectionFailureReason.get();
119                         if (throwable != null) {
120                                 throw new ExecutionException(throwable);
121                         }
122                         return getResult();
123                 });
124         }
125
126         protected void sendMessage(FcpMessage fcpMessage) {
127                 messages.add(fcpMessage);
128                 notifySyncObject();
129         }
130
131         private void notifySyncObject() {
132                 synchronized (syncObject) {
133                         syncObject.notifyAll();
134                 }
135         }
136
137         protected final R getResult() {
138                 return result.get();
139         }
140
141         @Override
142         public void close() {
143                 fcpConnection.removeFcpListener(this);
144         }
145
146         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
147                 consume(consumer, message, "Identifier");
148         }
149
150         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
151                         String identifier) {
152                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
153                         consumeAlways(consumer, message);
154                 }
155         }
156
157         private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
158                 consumer.accept(message);
159                 notifySyncObject();
160         }
161
162         private void consumeUnknown(FcpMessage fcpMessage) {
163                 consumeUnknownMessage(fcpMessage);
164                 notifySyncObject();
165         }
166
167         private void consumeClose(Throwable throwable) {
168                 connectionFailureReason.set(throwable);
169                 connectionClosed.set(true);
170                 notifySyncObject();
171         }
172
173         @Override
174         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
175                 consume(this::consumeNodeHello, nodeHello);
176         }
177
178         protected void consumeNodeHello(NodeHello nodeHello) { }
179
180         @Override
181         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
182                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
183                 connectionFailureReason.set(new IOException("duplicate client name"));
184                 connectionClosed.set(true);
185                 notifySyncObject();
186         }
187
188         @Override
189         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
190                 consume(this::consumeSSKKeypair, sskKeypair);
191         }
192
193         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
194
195         @Override
196         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
197                 consume(this::consumePeer, peer);
198         }
199
200         protected void consumePeer(Peer peer) { }
201
202         @Override
203         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
204                 consume(this::consumeEndListPeers, endListPeers);
205         }
206
207         protected void consumeEndListPeers(EndListPeers endListPeers) { }
208
209         @Override
210         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
211                 consume(this::consumePeerNote, peerNote);
212         }
213
214         protected void consumePeerNote(PeerNote peerNote) { }
215
216         @Override
217         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
218                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
219         }
220
221         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
222
223         @Override
224         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
225                 consume(this::consumePeerRemoved, peerRemoved);
226         }
227
228         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
229
230         @Override
231         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
232                 consume(this::consumeNodeData, nodeData);
233         }
234
235         protected void consumeNodeData(NodeData nodeData) { }
236
237         @Override
238         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
239                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
240         }
241
242         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
243
244         @Override
245         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
246                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
247         }
248
249         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
250
251         @Override
252         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
253                 consume(this::consumePersistentGet, persistentGet);
254         }
255
256         protected void consumePersistentGet(PersistentGet persistentGet) { }
257
258         @Override
259         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
260                 consume(this::consumePersistentPut, persistentPut);
261         }
262
263         protected void consumePersistentPut(PersistentPut persistentPut) { }
264
265         @Override
266         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
267                         EndListPersistentRequests endListPersistentRequests) {
268                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
269         }
270
271         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
272
273         @Override
274         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
275                 consume(this::consumeURIGenerated, uriGenerated);
276         }
277
278         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
279
280         @Override
281         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
282                 consume(this::consumeDataFound, dataFound);
283         }
284
285         protected void consumeDataFound(DataFound dataFound) { }
286
287         @Override
288         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
289                 consume(this::consumeAllData, allData);
290         }
291
292         protected void consumeAllData(AllData allData) { }
293
294         @Override
295         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
296                 consume(this::consumeSimpleProgress, simpleProgress);
297         }
298
299         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
300
301         @Override
302         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
303                 consume(this::consumeStartedCompression, startedCompression);
304         }
305
306         protected void consumeStartedCompression(StartedCompression startedCompression) { }
307
308         @Override
309         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
310                 consume(this::consumeFinishedCompression, finishedCompression);
311         }
312
313         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
314
315         @Override
316         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
317                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
318         }
319
320         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
321
322         @Override
323         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
324                         UnknownNodeIdentifier unknownNodeIdentifier) {
325                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
326         }
327
328         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
329
330         @Override
331         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
332                 consume(this::consumeConfigData, configData);
333         }
334
335         protected void consumeConfigData(ConfigData configData) { }
336
337         @Override
338         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
339                 consume(this::consumeGetFailed, getFailed);
340         }
341
342         protected void consumeGetFailed(GetFailed getFailed) { }
343
344         @Override
345         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
346                 consume(this::consumePutFailed, putFailed);
347         }
348
349         protected void consumePutFailed(PutFailed putFailed) { }
350
351         @Override
352         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
353                 consume(this::consumeIdentifierCollision, identifierCollision);
354         }
355
356         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
357
358         @Override
359         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
360                 consume(this::consumePersistentPutDir, persistentPutDir);
361         }
362
363         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
364
365         @Override
366         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
367                         PersistentRequestRemoved persistentRequestRemoved) {
368                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
369         }
370
371         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
372
373         @Override
374         public final void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK) {
375                 consume(this::consumeSubscribedUSK, subscribedUSK);
376         }
377
378         protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) { }
379
380         @Override
381         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
382                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
383         }
384
385         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
386
387         @Override
388         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
389                 consume(this::consumePluginInfo, pluginInfo);
390         }
391
392         protected void consumePluginInfo(PluginInfo pluginInfo) { }
393
394         @Override
395         public final void receivedPluginRemoved(FcpConnection fcpConnection, PluginRemoved pluginRemoved) {
396                 consume(this::consumePluginRemoved, pluginRemoved);
397         }
398
399         protected void consumePluginRemoved(PluginRemoved pluginRemoved) { }
400
401         @Override
402         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
403                 consume(this::consumeFCPPluginReply, fcpPluginReply);
404         }
405
406         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
407
408         @Override
409         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
410                         PersistentRequestModified persistentRequestModified) {
411                 consume(this::consumePersistentRequestModified, persistentRequestModified);
412         }
413
414         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
415
416         @Override
417         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
418                 consume(this::consumePutSuccessful, putSuccessful);
419         }
420
421         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
422
423         @Override
424         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
425                 consume(this::consumePutFetchable, putFetchable);
426         }
427
428         protected void consumePutFetchable(PutFetchable putFetchable) { }
429
430         @Override
431         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
432                 consume(this::consumeSentFeed, sentFeed);
433         }
434
435         protected void consumeSentFeed(SentFeed sentFeed) { }
436
437         @Override
438         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
439                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
440         }
441
442         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
443
444         @Override
445         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
446                 consume(this::consumeProtocolError, protocolError);
447         }
448
449         protected void consumeProtocolError(ProtocolError protocolError) { }
450
451         @Override
452         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
453                 consumeUnknown(fcpMessage);
454         }
455
456         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
457
458         @Override
459         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
460                 consumeClose(throwable);
461         }
462
463 }