In an FCP reply sequence, only match replies with the correct identifier
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.util.Objects;
5 import java.util.Queue;
6 import java.util.concurrent.ConcurrentLinkedQueue;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.atomic.AtomicReference;
9 import java.util.function.Consumer;
10
11 import net.pterodactylus.fcp.AllData;
12 import net.pterodactylus.fcp.BaseMessage;
13 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
14 import net.pterodactylus.fcp.ConfigData;
15 import net.pterodactylus.fcp.DataFound;
16 import net.pterodactylus.fcp.EndListPeerNotes;
17 import net.pterodactylus.fcp.EndListPeers;
18 import net.pterodactylus.fcp.EndListPersistentRequests;
19 import net.pterodactylus.fcp.FCPPluginReply;
20 import net.pterodactylus.fcp.FcpConnection;
21 import net.pterodactylus.fcp.FcpListener;
22 import net.pterodactylus.fcp.FcpMessage;
23 import net.pterodactylus.fcp.FinishedCompression;
24 import net.pterodactylus.fcp.GetFailed;
25 import net.pterodactylus.fcp.IdentifierCollision;
26 import net.pterodactylus.fcp.NodeData;
27 import net.pterodactylus.fcp.NodeHello;
28 import net.pterodactylus.fcp.Peer;
29 import net.pterodactylus.fcp.PeerNote;
30 import net.pterodactylus.fcp.PeerRemoved;
31 import net.pterodactylus.fcp.PersistentGet;
32 import net.pterodactylus.fcp.PersistentPut;
33 import net.pterodactylus.fcp.PersistentPutDir;
34 import net.pterodactylus.fcp.PersistentRequestModified;
35 import net.pterodactylus.fcp.PersistentRequestRemoved;
36 import net.pterodactylus.fcp.PluginInfo;
37 import net.pterodactylus.fcp.ProtocolError;
38 import net.pterodactylus.fcp.PutFailed;
39 import net.pterodactylus.fcp.PutFetchable;
40 import net.pterodactylus.fcp.PutSuccessful;
41 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
42 import net.pterodactylus.fcp.SSKKeypair;
43 import net.pterodactylus.fcp.SentFeed;
44 import net.pterodactylus.fcp.SimpleProgress;
45 import net.pterodactylus.fcp.StartedCompression;
46 import net.pterodactylus.fcp.SubscribedUSKUpdate;
47 import net.pterodactylus.fcp.TestDDAComplete;
48 import net.pterodactylus.fcp.TestDDAReply;
49 import net.pterodactylus.fcp.URIGenerated;
50 import net.pterodactylus.fcp.UnknownNodeIdentifier;
51 import net.pterodactylus.fcp.UnknownPeerNoteType;
52
53 import com.google.common.util.concurrent.ListenableFuture;
54 import com.google.common.util.concurrent.ListeningExecutorService;
55 import com.google.common.util.concurrent.MoreExecutors;
56
57 /**
58  * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
59  *
60  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
61  */
62 public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
63
64         private final Object syncObject = new Object();
65         private final ListeningExecutorService executorService;
66         private final FcpConnection fcpConnection;
67         private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
68         private final AtomicReference<String> identifier = new AtomicReference<>();
69
70         public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
71                 this.executorService = MoreExecutors.listeningDecorator(executorService);
72                 this.fcpConnection = fcpConnection;
73         }
74
75         protected void setIdentifier(String identifier) {
76                 this.identifier.set(identifier);
77         }
78
79         protected abstract boolean isFinished();
80
81         public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
82                 setIdentifier(fcpMessage.getField("Identifier"));
83                 fcpConnection.addFcpListener(this);
84                 messages.add(fcpMessage);
85                 return executorService.submit(() -> {
86                         synchronized (syncObject) {
87                                 while (!isFinished() || !messages.isEmpty()) {
88                                         while (messages.peek() != null) {
89                                                 FcpMessage message = messages.poll();
90                                                 fcpConnection.sendMessage(message);
91                                         }
92                                         if (isFinished()) {
93                                                 continue;
94                                         }
95                                         syncObject.wait();
96                                 }
97                         }
98                         return getResult();
99                 });
100         }
101
102         protected void sendMessage(FcpMessage fcpMessage) {
103                 messages.add(fcpMessage);
104                 notifySyncObject();
105         }
106
107         private void notifySyncObject() {
108                 synchronized (syncObject) {
109                         syncObject.notifyAll();
110                 }
111         }
112
113         protected R getResult() {
114                 return null;
115         }
116
117         @Override
118         public void close() {
119                 fcpConnection.removeFcpListener(this);
120         }
121
122         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
123                 consume(consumer, message, "Identifier");
124         }
125
126         private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
127                         String identifier) {
128                 if (Objects.equals(message.getField(identifier), this.identifier.get())) {
129                         consumer.accept(message);
130                         notifySyncObject();
131                 }
132         }
133
134         private void consumeUnknown(FcpMessage fcpMessage) {
135                 if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
136                         consumeUnknownMessage(fcpMessage);
137                         notifySyncObject();
138                 }
139         }
140
141         private void consumeClose(Throwable throwable) {
142                 consumeConnectionClosed(throwable);
143                 notifySyncObject();
144         }
145
146         @Override
147         public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
148                 consume(this::consumeNodeHello, nodeHello);
149         }
150
151         protected void consumeNodeHello(NodeHello nodeHello) { }
152
153         @Override
154         public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
155                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
156                 consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
157         }
158
159         protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
160
161         @Override
162         public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
163                 consume(this::consumeSSKKeypair, sskKeypair);
164         }
165
166         protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
167
168         @Override
169         public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
170                 consume(this::consumePeer, peer);
171         }
172
173         protected void consumePeer(Peer peer) { }
174
175         @Override
176         public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
177                 consume(this::consumeEndListPeers, endListPeers);
178         }
179
180         protected void consumeEndListPeers(EndListPeers endListPeers) { }
181
182         @Override
183         public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
184                 consume(this::consumePeerNote, peerNote);
185         }
186
187         protected void consumePeerNote(PeerNote peerNote) { }
188
189         @Override
190         public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
191                 consume(this::consumeEndListPeerNotes, endListPeerNotes);
192         }
193
194         protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
195
196         @Override
197         public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
198                 consume(this::consumePeerRemoved, peerRemoved);
199         }
200
201         protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
202
203         @Override
204         public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
205                 consume(this::consumeNodeData, nodeData);
206         }
207
208         protected void consumeNodeData(NodeData nodeData) { }
209
210         @Override
211         public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
212                 consume(this::consumeTestDDAReply, testDDAReply, "Directory");
213         }
214
215         protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
216
217         @Override
218         public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
219                 consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
220         }
221
222         protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
223
224         @Override
225         public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
226                 consume(this::consumePersistentGet, persistentGet);
227         }
228
229         protected void consumePersistentGet(PersistentGet persistentGet) { }
230
231         @Override
232         public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
233                 consume(this::consumePersistentPut, persistentPut);
234         }
235
236         protected void consumePersistentPut(PersistentPut persistentPut) { }
237
238         @Override
239         public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
240                         EndListPersistentRequests endListPersistentRequests) {
241                 consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
242         }
243
244         protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
245
246         @Override
247         public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
248                 consume(this::consumeURIGenerated, uriGenerated);
249         }
250
251         protected void consumeURIGenerated(URIGenerated uriGenerated) { }
252
253         @Override
254         public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
255                 consume(this::consumeDataFound, dataFound);
256         }
257
258         protected void consumeDataFound(DataFound dataFound) { }
259
260         @Override
261         public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
262                 consume(this::consumeAllData, allData);
263         }
264
265         protected void consumeAllData(AllData allData) { }
266
267         @Override
268         public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
269                 consume(this::consumeSimpleProgress, simpleProgress);
270         }
271
272         protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
273
274         @Override
275         public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
276                 consume(this::consumeStartedCompression, startedCompression);
277         }
278
279         protected void consumeStartedCompression(StartedCompression startedCompression) { }
280
281         @Override
282         public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
283                 consume(this::consumeFinishedCompression, finishedCompression);
284         }
285
286         protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
287
288         @Override
289         public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
290                 consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
291         }
292
293         protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
294
295         @Override
296         public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
297                         UnknownNodeIdentifier unknownNodeIdentifier) {
298                 consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
299         }
300
301         protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
302
303         @Override
304         public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
305                 consume(this::consumeConfigData, configData);
306         }
307
308         protected void consumeConfigData(ConfigData configData) { }
309
310         @Override
311         public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
312                 consume(this::consumeGetFailed, getFailed);
313         }
314
315         protected void consumeGetFailed(GetFailed getFailed) { }
316
317         @Override
318         public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
319                 consume(this::consumePutFailed, putFailed);
320         }
321
322         protected void consumePutFailed(PutFailed putFailed) { }
323
324         @Override
325         public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
326                 consume(this::consumeIdentifierCollision, identifierCollision);
327         }
328
329         protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
330
331         @Override
332         public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
333                 consume(this::consumePersistentPutDir, persistentPutDir);
334         }
335
336         protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
337
338         @Override
339         public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
340                         PersistentRequestRemoved persistentRequestRemoved) {
341                 consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
342         }
343
344         protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
345
346         @Override
347         public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
348                 consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
349         }
350
351         protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
352
353         @Override
354         public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
355                 consume(this::consumePluginInfo, pluginInfo);
356         }
357
358         protected void consumePluginInfo(PluginInfo pluginInfo) { }
359
360         @Override
361         public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
362                 consume(this::consumeFCPPluginReply, fcpPluginReply);
363         }
364
365         protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
366
367         @Override
368         public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
369                         PersistentRequestModified persistentRequestModified) {
370                 consume(this::consumePersistentRequestModified, persistentRequestModified);
371         }
372
373         protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
374
375         @Override
376         public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
377                 consume(this::consumePutSuccessful, putSuccessful);
378         }
379
380         protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
381
382         @Override
383         public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
384                 consume(this::consumePutFetchable, putFetchable);
385         }
386
387         protected void consumePutFetchable(PutFetchable putFetchable) { }
388
389         @Override
390         public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
391                 consume(this::consumeSentFeed, sentFeed);
392         }
393
394         protected void consumeSentFeed(SentFeed sentFeed) { }
395
396         @Override
397         public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
398                 consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
399         }
400
401         protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
402
403         @Override
404         public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
405                 consume(this::consumeProtocolError, protocolError);
406         }
407
408         protected void consumeProtocolError(ProtocolError protocolError) { }
409
410         @Override
411         public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
412                 consumeUnknown(fcpMessage);
413         }
414
415         protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
416
417         @Override
418         public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
419                 consumeClose(throwable);
420         }
421
422         protected void consumeConnectionClosed(Throwable throwable) { }
423
424 }