Add new “quelaton” FCP client API
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / FcpReplySequenceTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.is;
5 import static org.mockito.Mockito.mock;
6 import static org.mockito.Mockito.verify;
7
8 import java.io.IOException;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.atomic.AtomicBoolean;
14 import java.util.concurrent.atomic.AtomicReference;
15 import java.util.function.Supplier;
16
17 import net.pterodactylus.fcp.AllData;
18 import net.pterodactylus.fcp.BaseMessage;
19 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
20 import net.pterodactylus.fcp.ConfigData;
21 import net.pterodactylus.fcp.DataFound;
22 import net.pterodactylus.fcp.EndListPeerNotes;
23 import net.pterodactylus.fcp.EndListPeers;
24 import net.pterodactylus.fcp.EndListPersistentRequests;
25 import net.pterodactylus.fcp.FCPPluginReply;
26 import net.pterodactylus.fcp.FcpConnection;
27 import net.pterodactylus.fcp.FcpMessage;
28 import net.pterodactylus.fcp.FinishedCompression;
29 import net.pterodactylus.fcp.GetFailed;
30 import net.pterodactylus.fcp.IdentifierCollision;
31 import net.pterodactylus.fcp.NodeData;
32 import net.pterodactylus.fcp.NodeHello;
33 import net.pterodactylus.fcp.Peer;
34 import net.pterodactylus.fcp.PeerNote;
35 import net.pterodactylus.fcp.PeerRemoved;
36 import net.pterodactylus.fcp.PersistentGet;
37 import net.pterodactylus.fcp.PersistentPut;
38 import net.pterodactylus.fcp.PersistentPutDir;
39 import net.pterodactylus.fcp.PersistentRequestModified;
40 import net.pterodactylus.fcp.PersistentRequestRemoved;
41 import net.pterodactylus.fcp.PluginInfo;
42 import net.pterodactylus.fcp.ProtocolError;
43 import net.pterodactylus.fcp.PutFailed;
44 import net.pterodactylus.fcp.PutFetchable;
45 import net.pterodactylus.fcp.PutSuccessful;
46 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
47 import net.pterodactylus.fcp.SSKKeypair;
48 import net.pterodactylus.fcp.SentFeed;
49 import net.pterodactylus.fcp.SimpleProgress;
50 import net.pterodactylus.fcp.StartedCompression;
51 import net.pterodactylus.fcp.SubscribedUSKUpdate;
52 import net.pterodactylus.fcp.TestDDAComplete;
53 import net.pterodactylus.fcp.TestDDAReply;
54 import net.pterodactylus.fcp.URIGenerated;
55 import net.pterodactylus.fcp.UnknownNodeIdentifier;
56 import net.pterodactylus.fcp.UnknownPeerNoteType;
57
58 import org.junit.Test;
59
60 /**
61  * Unit test for {@link FcpReplySequence}.
62  *
63  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
64  */
65 public class FcpReplySequenceTest {
66
67         private final FcpConnection fcpConnection = mock(FcpConnection.class);
68         private final ExecutorService executorService = Executors.newSingleThreadExecutor();
69         private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
70         private final FcpMessage fcpMessage = new FcpMessage("Test");
71
72         @Test
73         public void canSendMessage() throws IOException {
74                 replyWaiter.send(fcpMessage);
75                 verify(fcpConnection).sendMessage(fcpMessage);
76         }
77
78         @Test
79         public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
80                 replyWaiter.send(fcpMessage);
81                 verify(fcpConnection).addFcpListener(replyWaiter);
82         }
83
84         @Test
85         public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
86                 replyWaiter.send(fcpMessage);
87                 replyWaiter.close();
88                 verify(fcpConnection).removeFcpListener(replyWaiter);
89         }
90
91         private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
92                         Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
93                 AtomicBoolean gotMessage = setupMessage(messageClass);
94                 Future<?> result = replyWaiter.send(fcpMessage);
95                 sendMessage(messageReceiver, message.get());
96                 result.get();
97                 assertThat(gotMessage.get(), is(true));
98         }
99
100         private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
101                 messageReceiver.receive(fcpConnection, message);
102         }
103
104         private interface MessageReceiver<M extends BaseMessage> {
105
106                 void receive(FcpConnection fcpConnection, M message);
107         }
108
109         private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
110                 AtomicBoolean gotMessage = new AtomicBoolean();
111                 replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
112                 replyWaiter.waitFor(() -> gotMessage.get());
113                 return gotMessage;
114         }
115
116         @Test
117         public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
118                 waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
119                                 () -> new NodeHello(new FcpMessage("NodeHello")));
120         }
121
122         @Test
123         public void waitingForConnectionClosedDuplicateClientNameWorks()
124         throws InterruptedException, ExecutionException, IOException {
125                 waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
126                                 CloseConnectionDuplicateClientName.class,
127                                 () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
128         }
129
130         @Test
131         public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
132                 waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
133                                 () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
134         }
135
136         @Test
137         public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
138                 waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
139         }
140
141         @Test
142         public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
143                 waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
144                                 () -> new EndListPeers(new FcpMessage("EndListPeers")));
145         }
146
147         @Test
148         public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
149                 waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
150                                 () -> new PeerNote(new FcpMessage("PeerNote")));
151         }
152
153         @Test
154         public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
155                 waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
156                                 () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
157         }
158
159         @Test
160         public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
161                 waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
162                                 () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
163         }
164
165         @Test
166         public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
167                 waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
168                                 () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
169                                                 .put(
170                                                                 "ark.number", "0")
171                                                 .put("auth.negTypes", "")
172                                                 .put("version", "0,0,0,0")
173                                                 .put("lastGoodVersion", "0,0,0,0")));
174         }
175
176         @Test
177         public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
178                 waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
179                                 () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
180         }
181
182         @Test
183         public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
184                 waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
185                                 () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
186         }
187
188         @Test
189         public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
190                 waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
191                                 () -> new PersistentGet(new FcpMessage("PersistentGet")));
192         }
193
194         @Test
195         public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
196                 waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
197                                 () -> new PersistentPut(new FcpMessage("PersistentPut")));
198         }
199
200         @Test
201         public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
202                 waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
203                                 () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
204         }
205
206         @Test
207         public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
208                 waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
209                                 () -> new URIGenerated(new FcpMessage("URIGenerated")));
210         }
211
212         @Test
213         public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
214                 waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
215                                 () -> new DataFound(new FcpMessage("DataFound")));
216         }
217
218         @Test
219         public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
220                 waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
221                                 () -> new AllData(new FcpMessage("AllData"), null));
222         }
223
224         @Test
225         public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
226                 waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
227                                 () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
228         }
229
230         @Test
231         public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
232                 waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
233                                 () -> new StartedCompression(new FcpMessage("StartedCompression")));
234         }
235
236         @Test
237         public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
238                 waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
239                                 () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
240         }
241
242         @Test
243         public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
244                 waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
245                                 () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
246         }
247
248         @Test
249         public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
250                 waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
251                                 () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
252         }
253
254         @Test
255         public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
256                 waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
257                                 () -> new ConfigData(new FcpMessage("ConfigData")));
258         }
259
260         @Test
261         public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
262                 waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
263                                 () -> new GetFailed(new FcpMessage("GetFailed")));
264         }
265
266         @Test
267         public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
268                 waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
269                                 () -> new PutFailed(new FcpMessage("PutFailed")));
270         }
271
272         @Test
273         public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
274                 waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
275                                 () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
276         }
277
278         @Test
279         public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
280                 waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
281                                 () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
282         }
283
284         @Test
285         public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
286                 waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
287                                 () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
288         }
289
290         @Test
291         public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
292                 waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
293                                 () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
294         }
295
296         @Test
297         public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
298                 waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
299                                 () -> new PluginInfo(new FcpMessage("PluginInfo")));
300         }
301
302         @Test
303         public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
304                 waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
305                                 () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
306         }
307
308         @Test
309         public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
310                 waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
311                                 () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
312         }
313
314         @Test
315         public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
316                 waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
317                                 () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
318         }
319
320         @Test
321         public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
322                 waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
323                                 () -> new PutFetchable(new FcpMessage("PutFetchable")));
324         }
325
326         @Test
327         public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
328                 waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
329                                 () -> new SentFeed(new FcpMessage("SentFeed")));
330         }
331
332         @Test
333         public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
334                 waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
335                                 () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
336         }
337
338         @Test
339         public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
340                 waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
341                                 () -> new ProtocolError(new FcpMessage("ProtocolError")));
342         }
343
344         @Test
345         public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
346                 AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
347                 replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
348                 replyWaiter.waitFor(() -> receivedMessage.get() != null);
349                 Future<?> result = replyWaiter.send(fcpMessage);
350                 replyWaiter.receivedMessage(fcpConnection, fcpMessage);
351                 result.get();
352                 assertThat(receivedMessage.get(), is(fcpMessage));
353         }
354
355         @Test
356         public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
357                 AtomicBoolean gotPutFailed = new AtomicBoolean();
358                 replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
359                 AtomicBoolean gotGetFailed = new AtomicBoolean();
360                 replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
361                 replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
362                 Future<?> result = replyWaiter.send(fcpMessage);
363                 assertThat(result.isDone(), is(false));
364                 replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
365                 assertThat(result.isDone(), is(false));
366                 replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
367                 result.get();
368         }
369
370         @Test
371         public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
372                 AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
373                 replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
374                 replyWaiter.waitFor(() -> receivedThrowable.get() != null);
375                 Future<?> result = replyWaiter.send(fcpMessage);
376                 Throwable throwable = new Throwable();
377                 replyWaiter.connectionClosed(fcpConnection, throwable);
378                 result.get();
379                 assertThat(receivedThrowable.get(), is(throwable));
380         }
381
382 }