Ignore identifier when consuming unknown messages
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.function.Consumer;
11
12 import net.pterodactylus.fcp.AllData;
13 import net.pterodactylus.fcp.BaseMessage;
14 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
15 import net.pterodactylus.fcp.ConfigData;
16 import net.pterodactylus.fcp.DataFound;
17 import net.pterodactylus.fcp.EndListPeerNotes;
18 import net.pterodactylus.fcp.EndListPeers;
19 import net.pterodactylus.fcp.EndListPersistentRequests;
20 import net.pterodactylus.fcp.FCPPluginReply;
21 import net.pterodactylus.fcp.FcpConnection;
22 import net.pterodactylus.fcp.FcpListener;
23 import net.pterodactylus.fcp.FcpMessage;
24 import net.pterodactylus.fcp.FinishedCompression;
25 import net.pterodactylus.fcp.GetFailed;
26 import net.pterodactylus.fcp.IdentifierCollision;
27 import net.pterodactylus.fcp.NodeData;
28 import net.pterodactylus.fcp.NodeHello;
29 import net.pterodactylus.fcp.Peer;
30 import net.pterodactylus.fcp.PeerNote;
31 import net.pterodactylus.fcp.PeerRemoved;
32 import net.pterodactylus.fcp.PersistentGet;
33 import net.pterodactylus.fcp.PersistentPut;
34 import net.pterodactylus.fcp.PersistentPutDir;
35 import net.pterodactylus.fcp.PersistentRequestModified;
36 import net.pterodactylus.fcp.PersistentRequestRemoved;
37 import net.pterodactylus.fcp.PluginInfo;
38 import net.pterodactylus.fcp.ProtocolError;
39 import net.pterodactylus.fcp.PutFailed;
40 import net.pterodactylus.fcp.PutFetchable;
41 import net.pterodactylus.fcp.PutSuccessful;
42 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
43 import net.pterodactylus.fcp.SSKKeypair;
44 import net.pterodactylus.fcp.SentFeed;
45 import net.pterodactylus.fcp.SimpleProgress;
46 import net.pterodactylus.fcp.StartedCompression;
47 import net.pterodactylus.fcp.SubscribedUSKUpdate;
48 import net.pterodactylus.fcp.TestDDAComplete;
49 import net.pterodactylus.fcp.TestDDAReply;
50 import net.pterodactylus.fcp.URIGenerated;
51 import net.pterodactylus.fcp.UnknownNodeIdentifier;
52 import net.pterodactylus.fcp.UnknownPeerNoteType;
53
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.ListeningExecutorService;
56 import com.google.common.util.concurrent.MoreExecutors;
57
58 /**
59  * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
60  *
61  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
62  */
63 public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
64
65         private final Object syncObject = new Object();
66         private final ListeningExecutorService executorService;
67         private final FcpConnection fcpConnection;
68         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
69         private final AtomicReference<String> identifier = new AtomicReference<>();
70         private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
71
72         public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
73                 this.executorService = MoreExecutors.listeningDecorator(executorService);
74                 this.fcpConnection = fcpConnection;
75         }
76
77         protected void setIdentifier(String identifier) {
78                 this.identifier.set(identifier);
79         }
80
81         protected abstract boolean isFinished();
82
83         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
84                 setIdentifier(fcpMessage.getField("Identifier"));
85                 fcpConnection.addFcpListener(this);
86                 messages.add(fcpMessage);
87                 return executorService.submit(() -> {
88                         synchronized (syncObject) {
89                                 while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
90                                         while (messages.peek() != null) {
91                                                 FcpMessage message = messages.poll();
92                                                 fcpConnection.sendMessage(message);
93                                         }
94                                         if (isFinished() || (connectionFailureReason.get() != null)) {
95                                                 continue;
96                                         }
97                                         syncObject.wait();
98                                 }
99                         }
100                         Throwable throwable = connectionFailureReason.get();
101                         if (throwable != null) {
102                                 throw new ExecutionException(throwable);
103                         }
104                         return getResult();
105                 });
106         }
107
108         protected void sendMessage(FcpMessage fcpMessage) {
109                 messages.add(fcpMessage);
110                 notifySyncObject();
111         }
112
113         private void notifySyncObject() {
114                 synchronized (syncObject) {
115                         syncObject.notifyAll();
116                 }
117         }
118
119         protected R getResult() {
120                 return null;
121         }
122
123         @Override
124         public void close() {
125                 fcpConnection.removeFcpListener(this);
126         }
127
128         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
129                 consume(consumer, message, "Identifier");
130         }
131
132         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
133                         String identifier) {
134                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
135                         consumeAlways(consumer, message);
136                 }
137         }
138
139         private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
140                 consumer.accept(message);
141                 notifySyncObject();
142         }
143
144         private void consumeUnknown(FcpMessage fcpMessage) {
145                 consumeUnknownMessage(fcpMessage);
146                 notifySyncObject();
147         }
148
149         private void consumeClose(Throwable throwable) {
150                 connectionFailureReason.set(throwable);
151                 notifySyncObject();
152         }
153
154         @Override
155         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
156                 consume(this::consumeNodeHello, nodeHello);
157         }
158
159         protected void consumeNodeHello(NodeHello nodeHello) { }
160
161         @Override
162         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
163                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
164                 consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
165         }
166
167         protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
168
169         @Override
170         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
171                 consume(this::consumeSSKKeypair, sskKeypair);
172         }
173
174         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
175
176         @Override
177         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
178                 consume(this::consumePeer, peer);
179         }
180
181         protected void consumePeer(Peer peer) { }
182
183         @Override
184         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
185                 consume(this::consumeEndListPeers, endListPeers);
186         }
187
188         protected void consumeEndListPeers(EndListPeers endListPeers) { }
189
190         @Override
191         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
192                 consume(this::consumePeerNote, peerNote);
193         }
194
195         protected void consumePeerNote(PeerNote peerNote) { }
196
197         @Override
198         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
199                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
200         }
201
202         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
203
204         @Override
205         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
206                 consume(this::consumePeerRemoved, peerRemoved);
207         }
208
209         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
210
211         @Override
212         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
213                 consume(this::consumeNodeData, nodeData);
214         }
215
216         protected void consumeNodeData(NodeData nodeData) { }
217
218         @Override
219         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
220                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
221         }
222
223         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
224
225         @Override
226         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
227                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
228         }
229
230         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
231
232         @Override
233         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
234                 consume(this::consumePersistentGet, persistentGet);
235         }
236
237         protected void consumePersistentGet(PersistentGet persistentGet) { }
238
239         @Override
240         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
241                 consume(this::consumePersistentPut, persistentPut);
242         }
243
244         protected void consumePersistentPut(PersistentPut persistentPut) { }
245
246         @Override
247         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
248                         EndListPersistentRequests endListPersistentRequests) {
249                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
250         }
251
252         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
253
254         @Override
255         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
256                 consume(this::consumeURIGenerated, uriGenerated);
257         }
258
259         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
260
261         @Override
262         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
263                 consume(this::consumeDataFound, dataFound);
264         }
265
266         protected void consumeDataFound(DataFound dataFound) { }
267
268         @Override
269         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
270                 consume(this::consumeAllData, allData);
271         }
272
273         protected void consumeAllData(AllData allData) { }
274
275         @Override
276         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
277                 consume(this::consumeSimpleProgress, simpleProgress);
278         }
279
280         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
281
282         @Override
283         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
284                 consume(this::consumeStartedCompression, startedCompression);
285         }
286
287         protected void consumeStartedCompression(StartedCompression startedCompression) { }
288
289         @Override
290         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
291                 consume(this::consumeFinishedCompression, finishedCompression);
292         }
293
294         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
295
296         @Override
297         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
298                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
299         }
300
301         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
302
303         @Override
304         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
305                         UnknownNodeIdentifier unknownNodeIdentifier) {
306                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
307         }
308
309         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
310
311         @Override
312         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
313                 consume(this::consumeConfigData, configData);
314         }
315
316         protected void consumeConfigData(ConfigData configData) { }
317
318         @Override
319         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
320                 consume(this::consumeGetFailed, getFailed);
321         }
322
323         protected void consumeGetFailed(GetFailed getFailed) { }
324
325         @Override
326         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
327                 consume(this::consumePutFailed, putFailed);
328         }
329
330         protected void consumePutFailed(PutFailed putFailed) { }
331
332         @Override
333         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
334                 consume(this::consumeIdentifierCollision, identifierCollision);
335         }
336
337         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
338
339         @Override
340         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
341                 consume(this::consumePersistentPutDir, persistentPutDir);
342         }
343
344         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
345
346         @Override
347         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
348                         PersistentRequestRemoved persistentRequestRemoved) {
349                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
350         }
351
352         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
353
354         @Override
355         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
356                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
357         }
358
359         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
360
361         @Override
362         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
363                 consume(this::consumePluginInfo, pluginInfo);
364         }
365
366         protected void consumePluginInfo(PluginInfo pluginInfo) { }
367
368         @Override
369         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
370                 consume(this::consumeFCPPluginReply, fcpPluginReply);
371         }
372
373         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
374
375         @Override
376         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
377                         PersistentRequestModified persistentRequestModified) {
378                 consume(this::consumePersistentRequestModified, persistentRequestModified);
379         }
380
381         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
382
383         @Override
384         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
385                 consume(this::consumePutSuccessful, putSuccessful);
386         }
387
388         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
389
390         @Override
391         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
392                 consume(this::consumePutFetchable, putFetchable);
393         }
394
395         protected void consumePutFetchable(PutFetchable putFetchable) { }
396
397         @Override
398         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
399                 consume(this::consumeSentFeed, sentFeed);
400         }
401
402         protected void consumeSentFeed(SentFeed sentFeed) { }
403
404         @Override
405         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
406                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
407         }
408
409         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
410
411         @Override
412         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
413                 consume(this::consumeProtocolError, protocolError);
414         }
415
416         protected void consumeProtocolError(ProtocolError protocolError) { }
417
418         @Override
419         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
420                 consumeUnknown(fcpMessage);
421         }
422
423         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
424
425         @Override
426         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
427                 consumeClose(throwable);
428         }
429
430 }