Follow redirects in ClientGet
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpDialog.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Consumer;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.PluginRemoved;
40 import net.pterodactylus.fcp.ProtocolError;
41 import net.pterodactylus.fcp.PutFailed;
42 import net.pterodactylus.fcp.PutFetchable;
43 import net.pterodactylus.fcp.PutSuccessful;
44 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
45 import net.pterodactylus.fcp.SSKKeypair;
46 import net.pterodactylus.fcp.SentFeed;
47 import net.pterodactylus.fcp.SimpleProgress;
48 import net.pterodactylus.fcp.StartedCompression;
49 import net.pterodactylus.fcp.SubscribedUSK;
50 import net.pterodactylus.fcp.SubscribedUSKUpdate;
51 import net.pterodactylus.fcp.TestDDAComplete;
52 import net.pterodactylus.fcp.TestDDAReply;
53 import net.pterodactylus.fcp.URIGenerated;
54 import net.pterodactylus.fcp.UnknownNodeIdentifier;
55 import net.pterodactylus.fcp.UnknownPeerNoteType;
56
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.ListeningExecutorService;
59 import com.google.common.util.concurrent.MoreExecutors;
60
61 /**
62  * An FCP dialog enables you to conveniently wait for a specific set of FCP replies.
63  *
64  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
65  */
66 public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
67
68         private final Object syncObject = new Object();
69         private final ListeningExecutorService executorService;
70         private final FcpConnection fcpConnection;
71         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
72         private final AtomicReference<String> identifier = new AtomicReference<>();
73         private final AtomicBoolean connectionClosed = new AtomicBoolean();
74         private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
75         private final AtomicBoolean finished = new AtomicBoolean();
76         private final AtomicReference<R> result = new AtomicReference<>();
77
78         public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection, R initialResult) {
79                 this.executorService = MoreExecutors.listeningDecorator(executorService);
80                 this.fcpConnection = fcpConnection;
81                 result.set(initialResult);
82         }
83
84         protected void setIdentifier(String identifier) {
85                 this.identifier.set(identifier);
86         }
87
88         protected String getIdentifier() {
89                 return identifier.get();
90         }
91
92         public final boolean isFinished() {
93                 return finished.get();
94         }
95
96         protected final void finish() {
97                 finished.set(true);
98         }
99
100         protected final void setResult(R result) {
101                 this.result.set(result);
102                 finish();
103         }
104
105         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
106                 setIdentifier(fcpMessage.getField("Identifier"));
107                 fcpConnection.addFcpListener(this);
108                 messages.add(fcpMessage);
109                 return executorService.submit(() -> {
110                         synchronized (syncObject) {
111                                 while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
112                                         while (messages.peek() != null) {
113                                                 FcpMessage message = messages.poll();
114                                                 fcpConnection.sendMessage(message);
115                                         }
116                                         if (isFinished() || connectionClosed.get()) {
117                                                 continue;
118                                         }
119                                         syncObject.wait();
120                                 }
121                         }
122                         Throwable throwable = connectionFailureReason.get();
123                         if (throwable != null) {
124                                 throw new ExecutionException(throwable);
125                         }
126                         return getResult();
127                 });
128         }
129
130         protected void sendMessage(FcpMessage fcpMessage) {
131                 messages.add(fcpMessage);
132                 notifySyncObject();
133         }
134
135         private void notifySyncObject() {
136                 synchronized (syncObject) {
137                         syncObject.notifyAll();
138                 }
139         }
140
141         protected final R getResult() {
142                 return result.get();
143         }
144
145         @Override
146         public void close() {
147                 fcpConnection.removeFcpListener(this);
148         }
149
150         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
151                 consume(consumer, message, "Identifier");
152         }
153
154         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
155                         String identifier) {
156                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
157                         consumeAlways(consumer, message);
158                 }
159         }
160
161         private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
162                 consumer.accept(message);
163                 notifySyncObject();
164         }
165
166         private void consumeUnknown(FcpMessage fcpMessage) {
167                 consumeUnknownMessage(fcpMessage);
168                 notifySyncObject();
169         }
170
171         private void consumeClose(Throwable throwable) {
172                 connectionFailureReason.set(throwable);
173                 connectionClosed.set(true);
174                 notifySyncObject();
175         }
176
177         @Override
178         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
179                 consume(this::consumeNodeHello, nodeHello);
180         }
181
182         protected void consumeNodeHello(NodeHello nodeHello) { }
183
184         @Override
185         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
186                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
187                 connectionFailureReason.set(new IOException("duplicate client name"));
188                 connectionClosed.set(true);
189                 notifySyncObject();
190         }
191
192         @Override
193         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
194                 consume(this::consumeSSKKeypair, sskKeypair);
195         }
196
197         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
198
199         @Override
200         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
201                 consume(this::consumePeer, peer);
202         }
203
204         protected void consumePeer(Peer peer) { }
205
206         @Override
207         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
208                 consume(this::consumeEndListPeers, endListPeers);
209         }
210
211         protected void consumeEndListPeers(EndListPeers endListPeers) { }
212
213         @Override
214         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
215                 consume(this::consumePeerNote, peerNote);
216         }
217
218         protected void consumePeerNote(PeerNote peerNote) { }
219
220         @Override
221         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
222                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
223         }
224
225         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
226
227         @Override
228         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
229                 consume(this::consumePeerRemoved, peerRemoved);
230         }
231
232         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
233
234         @Override
235         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
236                 consume(this::consumeNodeData, nodeData);
237         }
238
239         protected void consumeNodeData(NodeData nodeData) { }
240
241         @Override
242         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
243                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
244         }
245
246         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
247
248         @Override
249         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
250                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
251         }
252
253         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
254
255         @Override
256         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
257                 consume(this::consumePersistentGet, persistentGet);
258         }
259
260         protected void consumePersistentGet(PersistentGet persistentGet) { }
261
262         @Override
263         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
264                 consume(this::consumePersistentPut, persistentPut);
265         }
266
267         protected void consumePersistentPut(PersistentPut persistentPut) { }
268
269         @Override
270         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
271                         EndListPersistentRequests endListPersistentRequests) {
272                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
273         }
274
275         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
276
277         @Override
278         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
279                 consume(this::consumeURIGenerated, uriGenerated);
280         }
281
282         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
283
284         @Override
285         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
286                 consume(this::consumeDataFound, dataFound);
287         }
288
289         protected void consumeDataFound(DataFound dataFound) { }
290
291         @Override
292         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
293                 consume(this::consumeAllData, allData);
294         }
295
296         protected void consumeAllData(AllData allData) { }
297
298         @Override
299         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
300                 consume(this::consumeSimpleProgress, simpleProgress);
301         }
302
303         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
304
305         @Override
306         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
307                 consume(this::consumeStartedCompression, startedCompression);
308         }
309
310         protected void consumeStartedCompression(StartedCompression startedCompression) { }
311
312         @Override
313         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
314                 consume(this::consumeFinishedCompression, finishedCompression);
315         }
316
317         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
318
319         @Override
320         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
321                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
322         }
323
324         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
325
326         @Override
327         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
328                         UnknownNodeIdentifier unknownNodeIdentifier) {
329                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
330         }
331
332         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
333
334         @Override
335         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
336                 consume(this::consumeConfigData, configData);
337         }
338
339         protected void consumeConfigData(ConfigData configData) { }
340
341         @Override
342         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
343                 consume(this::consumeGetFailed, getFailed);
344         }
345
346         protected void consumeGetFailed(GetFailed getFailed) { }
347
348         @Override
349         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
350                 consume(this::consumePutFailed, putFailed);
351         }
352
353         protected void consumePutFailed(PutFailed putFailed) { }
354
355         @Override
356         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
357                 consume(this::consumeIdentifierCollision, identifierCollision);
358         }
359
360         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
361
362         @Override
363         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
364                 consume(this::consumePersistentPutDir, persistentPutDir);
365         }
366
367         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
368
369         @Override
370         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
371                         PersistentRequestRemoved persistentRequestRemoved) {
372                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
373         }
374
375         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
376
377         @Override
378         public final void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK) {
379                 consume(this::consumeSubscribedUSK, subscribedUSK);
380         }
381
382         protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) { }
383
384         @Override
385         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
386                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
387         }
388
389         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
390
391         @Override
392         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
393                 consume(this::consumePluginInfo, pluginInfo);
394         }
395
396         protected void consumePluginInfo(PluginInfo pluginInfo) { }
397
398         @Override
399         public final void receivedPluginRemoved(FcpConnection fcpConnection, PluginRemoved pluginRemoved) {
400                 consume(this::consumePluginRemoved, pluginRemoved);
401         }
402
403         protected void consumePluginRemoved(PluginRemoved pluginRemoved) { }
404
405         @Override
406         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
407                 consume(this::consumeFCPPluginReply, fcpPluginReply);
408         }
409
410         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
411
412         @Override
413         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
414                         PersistentRequestModified persistentRequestModified) {
415                 consume(this::consumePersistentRequestModified, persistentRequestModified);
416         }
417
418         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
419
420         @Override
421         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
422                 consume(this::consumePutSuccessful, putSuccessful);
423         }
424
425         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
426
427         @Override
428         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
429                 consume(this::consumePutFetchable, putFetchable);
430         }
431
432         protected void consumePutFetchable(PutFetchable putFetchable) { }
433
434         @Override
435         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
436                 consume(this::consumeSentFeed, sentFeed);
437         }
438
439         protected void consumeSentFeed(SentFeed sentFeed) { }
440
441         @Override
442         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
443                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
444         }
445
446         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
447
448         @Override
449         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
450                 consume(this::consumeProtocolError, protocolError);
451         }
452
453         protected void consumeProtocolError(ProtocolError protocolError) { }
454
455         @Override
456         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
457                 consumeUnknown(fcpMessage);
458         }
459
460         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
461
462         @Override
463         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
464                 consumeClose(throwable);
465         }
466
467 }