Rename all remaining occurences of “sequence” to “dialog”
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpDialog.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Consumer;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.BaseMessage;
15 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
16 import net.pterodactylus.fcp.ConfigData;
17 import net.pterodactylus.fcp.DataFound;
18 import net.pterodactylus.fcp.EndListPeerNotes;
19 import net.pterodactylus.fcp.EndListPeers;
20 import net.pterodactylus.fcp.EndListPersistentRequests;
21 import net.pterodactylus.fcp.FCPPluginReply;
22 import net.pterodactylus.fcp.FcpConnection;
23 import net.pterodactylus.fcp.FcpListener;
24 import net.pterodactylus.fcp.FcpMessage;
25 import net.pterodactylus.fcp.FinishedCompression;
26 import net.pterodactylus.fcp.GetFailed;
27 import net.pterodactylus.fcp.IdentifierCollision;
28 import net.pterodactylus.fcp.NodeData;
29 import net.pterodactylus.fcp.NodeHello;
30 import net.pterodactylus.fcp.Peer;
31 import net.pterodactylus.fcp.PeerNote;
32 import net.pterodactylus.fcp.PeerRemoved;
33 import net.pterodactylus.fcp.PersistentGet;
34 import net.pterodactylus.fcp.PersistentPut;
35 import net.pterodactylus.fcp.PersistentPutDir;
36 import net.pterodactylus.fcp.PersistentRequestModified;
37 import net.pterodactylus.fcp.PersistentRequestRemoved;
38 import net.pterodactylus.fcp.PluginInfo;
39 import net.pterodactylus.fcp.ProtocolError;
40 import net.pterodactylus.fcp.PutFailed;
41 import net.pterodactylus.fcp.PutFetchable;
42 import net.pterodactylus.fcp.PutSuccessful;
43 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
44 import net.pterodactylus.fcp.SSKKeypair;
45 import net.pterodactylus.fcp.SentFeed;
46 import net.pterodactylus.fcp.SimpleProgress;
47 import net.pterodactylus.fcp.StartedCompression;
48 import net.pterodactylus.fcp.SubscribedUSKUpdate;
49 import net.pterodactylus.fcp.TestDDAComplete;
50 import net.pterodactylus.fcp.TestDDAReply;
51 import net.pterodactylus.fcp.URIGenerated;
52 import net.pterodactylus.fcp.UnknownNodeIdentifier;
53 import net.pterodactylus.fcp.UnknownPeerNoteType;
54
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.ListeningExecutorService;
57 import com.google.common.util.concurrent.MoreExecutors;
58
59 /**
60  * An FCP dialog enables you to conveniently wait for a specific set of FCP replies.
61  *
62  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
63  */
64 public abstract class FcpDialog<R> implements AutoCloseable, FcpListener {
65
66         private final Object syncObject = new Object();
67         private final ListeningExecutorService executorService;
68         private final FcpConnection fcpConnection;
69         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
70         private final AtomicReference<String> identifier = new AtomicReference<>();
71         private final AtomicBoolean connectionClosed = new AtomicBoolean();
72         private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
73
74         public FcpDialog(ExecutorService executorService, FcpConnection fcpConnection) {
75                 this.executorService = MoreExecutors.listeningDecorator(executorService);
76                 this.fcpConnection = fcpConnection;
77         }
78
79         protected void setIdentifier(String identifier) {
80                 this.identifier.set(identifier);
81         }
82
83         protected abstract boolean isFinished();
84
85         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
86                 setIdentifier(fcpMessage.getField("Identifier"));
87                 fcpConnection.addFcpListener(this);
88                 messages.add(fcpMessage);
89                 return executorService.submit(() -> {
90                         synchronized (syncObject) {
91                                 while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
92                                         while (messages.peek() != null) {
93                                                 FcpMessage message = messages.poll();
94                                                 fcpConnection.sendMessage(message);
95                                         }
96                                         if (isFinished() || connectionClosed.get()) {
97                                                 continue;
98                                         }
99                                         syncObject.wait();
100                                 }
101                         }
102                         Throwable throwable = connectionFailureReason.get();
103                         if (throwable != null) {
104                                 throw new ExecutionException(throwable);
105                         }
106                         return getResult();
107                 });
108         }
109
110         protected void sendMessage(FcpMessage fcpMessage) {
111                 messages.add(fcpMessage);
112                 notifySyncObject();
113         }
114
115         private void notifySyncObject() {
116                 synchronized (syncObject) {
117                         syncObject.notifyAll();
118                 }
119         }
120
121         protected R getResult() {
122                 return null;
123         }
124
125         @Override
126         public void close() {
127                 fcpConnection.removeFcpListener(this);
128         }
129
130         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
131                 consume(consumer, message, "Identifier");
132         }
133
134         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
135                         String identifier) {
136                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
137                         consumeAlways(consumer, message);
138                 }
139         }
140
141         private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
142                 consumer.accept(message);
143                 notifySyncObject();
144         }
145
146         private void consumeUnknown(FcpMessage fcpMessage) {
147                 consumeUnknownMessage(fcpMessage);
148                 notifySyncObject();
149         }
150
151         private void consumeClose(Throwable throwable) {
152                 connectionFailureReason.set(throwable);
153                 connectionClosed.set(true);
154                 notifySyncObject();
155         }
156
157         @Override
158         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
159                 consume(this::consumeNodeHello, nodeHello);
160         }
161
162         protected void consumeNodeHello(NodeHello nodeHello) { }
163
164         @Override
165         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
166                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
167                 connectionFailureReason.set(new IOException("duplicate client name"));
168                 connectionClosed.set(true);
169                 notifySyncObject();
170         }
171
172         @Override
173         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
174                 consume(this::consumeSSKKeypair, sskKeypair);
175         }
176
177         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
178
179         @Override
180         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
181                 consume(this::consumePeer, peer);
182         }
183
184         protected void consumePeer(Peer peer) { }
185
186         @Override
187         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
188                 consume(this::consumeEndListPeers, endListPeers);
189         }
190
191         protected void consumeEndListPeers(EndListPeers endListPeers) { }
192
193         @Override
194         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
195                 consume(this::consumePeerNote, peerNote);
196         }
197
198         protected void consumePeerNote(PeerNote peerNote) { }
199
200         @Override
201         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
202                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
203         }
204
205         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
206
207         @Override
208         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
209                 consume(this::consumePeerRemoved, peerRemoved);
210         }
211
212         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
213
214         @Override
215         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
216                 consume(this::consumeNodeData, nodeData);
217         }
218
219         protected void consumeNodeData(NodeData nodeData) { }
220
221         @Override
222         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
223                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
224         }
225
226         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
227
228         @Override
229         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
230                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
231         }
232
233         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
234
235         @Override
236         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
237                 consume(this::consumePersistentGet, persistentGet);
238         }
239
240         protected void consumePersistentGet(PersistentGet persistentGet) { }
241
242         @Override
243         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
244                 consume(this::consumePersistentPut, persistentPut);
245         }
246
247         protected void consumePersistentPut(PersistentPut persistentPut) { }
248
249         @Override
250         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
251                         EndListPersistentRequests endListPersistentRequests) {
252                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
253         }
254
255         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
256
257         @Override
258         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
259                 consume(this::consumeURIGenerated, uriGenerated);
260         }
261
262         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
263
264         @Override
265         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
266                 consume(this::consumeDataFound, dataFound);
267         }
268
269         protected void consumeDataFound(DataFound dataFound) { }
270
271         @Override
272         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
273                 consume(this::consumeAllData, allData);
274         }
275
276         protected void consumeAllData(AllData allData) { }
277
278         @Override
279         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
280                 consume(this::consumeSimpleProgress, simpleProgress);
281         }
282
283         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
284
285         @Override
286         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
287                 consume(this::consumeStartedCompression, startedCompression);
288         }
289
290         protected void consumeStartedCompression(StartedCompression startedCompression) { }
291
292         @Override
293         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
294                 consume(this::consumeFinishedCompression, finishedCompression);
295         }
296
297         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
298
299         @Override
300         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
301                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
302         }
303
304         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
305
306         @Override
307         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
308                         UnknownNodeIdentifier unknownNodeIdentifier) {
309                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
310         }
311
312         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
313
314         @Override
315         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
316                 consume(this::consumeConfigData, configData);
317         }
318
319         protected void consumeConfigData(ConfigData configData) { }
320
321         @Override
322         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
323                 consume(this::consumeGetFailed, getFailed);
324         }
325
326         protected void consumeGetFailed(GetFailed getFailed) { }
327
328         @Override
329         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
330                 consume(this::consumePutFailed, putFailed);
331         }
332
333         protected void consumePutFailed(PutFailed putFailed) { }
334
335         @Override
336         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
337                 consume(this::consumeIdentifierCollision, identifierCollision);
338         }
339
340         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
341
342         @Override
343         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
344                 consume(this::consumePersistentPutDir, persistentPutDir);
345         }
346
347         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
348
349         @Override
350         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
351                         PersistentRequestRemoved persistentRequestRemoved) {
352                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
353         }
354
355         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
356
357         @Override
358         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
359                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
360         }
361
362         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
363
364         @Override
365         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
366                 consume(this::consumePluginInfo, pluginInfo);
367         }
368
369         protected void consumePluginInfo(PluginInfo pluginInfo) { }
370
371         @Override
372         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
373                 consume(this::consumeFCPPluginReply, fcpPluginReply);
374         }
375
376         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
377
378         @Override
379         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
380                         PersistentRequestModified persistentRequestModified) {
381                 consume(this::consumePersistentRequestModified, persistentRequestModified);
382         }
383
384         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
385
386         @Override
387         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
388                 consume(this::consumePutSuccessful, putSuccessful);
389         }
390
391         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
392
393         @Override
394         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
395                 consume(this::consumePutFetchable, putFetchable);
396         }
397
398         protected void consumePutFetchable(PutFetchable putFetchable) { }
399
400         @Override
401         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
402                 consume(this::consumeSentFeed, sentFeed);
403         }
404
405         protected void consumeSentFeed(SentFeed sentFeed) { }
406
407         @Override
408         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
409                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
410         }
411
412         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
413
414         @Override
415         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
416                 consume(this::consumeProtocolError, protocolError);
417         }
418
419         protected void consumeProtocolError(ProtocolError protocolError) { }
420
421         @Override
422         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
423                 consumeUnknown(fcpMessage);
424         }
425
426         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
427
428         @Override
429         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
430                 consumeClose(throwable);
431         }
432
433 }