1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.is;
5 import static org.mockito.Mockito.mock;
6 import static org.mockito.Mockito.verify;
8 import java.io.IOException;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.atomic.AtomicBoolean;
14 import java.util.concurrent.atomic.AtomicReference;
15 import java.util.function.Supplier;
17 import net.pterodactylus.fcp.AllData;
18 import net.pterodactylus.fcp.BaseMessage;
19 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
20 import net.pterodactylus.fcp.ConfigData;
21 import net.pterodactylus.fcp.DataFound;
22 import net.pterodactylus.fcp.EndListPeerNotes;
23 import net.pterodactylus.fcp.EndListPeers;
24 import net.pterodactylus.fcp.EndListPersistentRequests;
25 import net.pterodactylus.fcp.FCPPluginReply;
26 import net.pterodactylus.fcp.FcpConnection;
27 import net.pterodactylus.fcp.FcpMessage;
28 import net.pterodactylus.fcp.FinishedCompression;
29 import net.pterodactylus.fcp.GetFailed;
30 import net.pterodactylus.fcp.IdentifierCollision;
31 import net.pterodactylus.fcp.NodeData;
32 import net.pterodactylus.fcp.NodeHello;
33 import net.pterodactylus.fcp.Peer;
34 import net.pterodactylus.fcp.PeerNote;
35 import net.pterodactylus.fcp.PeerRemoved;
36 import net.pterodactylus.fcp.PersistentGet;
37 import net.pterodactylus.fcp.PersistentPut;
38 import net.pterodactylus.fcp.PersistentPutDir;
39 import net.pterodactylus.fcp.PersistentRequestModified;
40 import net.pterodactylus.fcp.PersistentRequestRemoved;
41 import net.pterodactylus.fcp.PluginInfo;
42 import net.pterodactylus.fcp.ProtocolError;
43 import net.pterodactylus.fcp.PutFailed;
44 import net.pterodactylus.fcp.PutFetchable;
45 import net.pterodactylus.fcp.PutSuccessful;
46 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
47 import net.pterodactylus.fcp.SSKKeypair;
48 import net.pterodactylus.fcp.SentFeed;
49 import net.pterodactylus.fcp.SimpleProgress;
50 import net.pterodactylus.fcp.StartedCompression;
51 import net.pterodactylus.fcp.SubscribedUSKUpdate;
52 import net.pterodactylus.fcp.TestDDAComplete;
53 import net.pterodactylus.fcp.TestDDAReply;
54 import net.pterodactylus.fcp.URIGenerated;
55 import net.pterodactylus.fcp.UnknownNodeIdentifier;
56 import net.pterodactylus.fcp.UnknownPeerNoteType;
58 import org.junit.Test;
61 * Unit test for {@link FcpReplySequence}.
63 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
65 public class FcpReplySequenceTest {
67 private final FcpConnection fcpConnection = mock(FcpConnection.class);
68 private final ExecutorService executorService = Executors.newSingleThreadExecutor();
69 private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
70 private final FcpMessage fcpMessage = new FcpMessage("Test");
73 public void canSendMessage() throws IOException {
74 replyWaiter.send(fcpMessage);
75 verify(fcpConnection).sendMessage(fcpMessage);
79 public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
80 replyWaiter.send(fcpMessage);
81 verify(fcpConnection).addFcpListener(replyWaiter);
85 public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
86 replyWaiter.send(fcpMessage);
88 verify(fcpConnection).removeFcpListener(replyWaiter);
91 private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
92 Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
93 AtomicBoolean gotMessage = setupMessage(messageClass);
94 Future<?> result = replyWaiter.send(fcpMessage);
95 sendMessage(messageReceiver, message.get());
97 assertThat(gotMessage.get(), is(true));
100 private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
101 messageReceiver.receive(fcpConnection, message);
104 private interface MessageReceiver<M extends BaseMessage> {
106 void receive(FcpConnection fcpConnection, M message);
109 private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
110 AtomicBoolean gotMessage = new AtomicBoolean();
111 replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
112 replyWaiter.waitFor(() -> gotMessage.get());
117 public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
118 waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
119 () -> new NodeHello(new FcpMessage("NodeHello")));
123 public void waitingForConnectionClosedDuplicateClientNameWorks()
124 throws InterruptedException, ExecutionException, IOException {
125 waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
126 CloseConnectionDuplicateClientName.class,
127 () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
131 public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
132 waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
133 () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
137 public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
138 waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
142 public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
143 waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
144 () -> new EndListPeers(new FcpMessage("EndListPeers")));
148 public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
149 waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
150 () -> new PeerNote(new FcpMessage("PeerNote")));
154 public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
155 waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
156 () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
160 public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
161 waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
162 () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
166 public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
167 waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
168 () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
171 .put("auth.negTypes", "")
172 .put("version", "0,0,0,0")
173 .put("lastGoodVersion", "0,0,0,0")));
177 public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
178 waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
179 () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
183 public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
184 waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
185 () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
189 public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
190 waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
191 () -> new PersistentGet(new FcpMessage("PersistentGet")));
195 public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
196 waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
197 () -> new PersistentPut(new FcpMessage("PersistentPut")));
201 public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
202 waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
203 () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
207 public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
208 waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
209 () -> new URIGenerated(new FcpMessage("URIGenerated")));
213 public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
214 waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
215 () -> new DataFound(new FcpMessage("DataFound")));
219 public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
220 waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
221 () -> new AllData(new FcpMessage("AllData"), null));
225 public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
226 waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
227 () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
231 public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
232 waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
233 () -> new StartedCompression(new FcpMessage("StartedCompression")));
237 public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
238 waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
239 () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
243 public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
244 waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
245 () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
249 public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
250 waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
251 () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
255 public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
256 waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
257 () -> new ConfigData(new FcpMessage("ConfigData")));
261 public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
262 waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
263 () -> new GetFailed(new FcpMessage("GetFailed")));
267 public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
268 waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
269 () -> new PutFailed(new FcpMessage("PutFailed")));
273 public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
274 waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
275 () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
279 public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
280 waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
281 () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
285 public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
286 waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
287 () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
291 public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
292 waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
293 () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
297 public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
298 waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
299 () -> new PluginInfo(new FcpMessage("PluginInfo")));
303 public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
304 waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
305 () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
309 public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
310 waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
311 () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
315 public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
316 waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
317 () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
321 public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
322 waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
323 () -> new PutFetchable(new FcpMessage("PutFetchable")));
327 public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
328 waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
329 () -> new SentFeed(new FcpMessage("SentFeed")));
333 public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
334 waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
335 () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
339 public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
340 waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
341 () -> new ProtocolError(new FcpMessage("ProtocolError")));
345 public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
346 AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
347 replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
348 replyWaiter.waitFor(() -> receivedMessage.get() != null);
349 Future<?> result = replyWaiter.send(fcpMessage);
350 replyWaiter.receivedMessage(fcpConnection, fcpMessage);
352 assertThat(receivedMessage.get(), is(fcpMessage));
356 public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
357 AtomicBoolean gotPutFailed = new AtomicBoolean();
358 replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
359 AtomicBoolean gotGetFailed = new AtomicBoolean();
360 replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
361 replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
362 Future<?> result = replyWaiter.send(fcpMessage);
363 assertThat(result.isDone(), is(false));
364 replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
365 assertThat(result.isDone(), is(false));
366 replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
371 public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
372 AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
373 replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
374 replyWaiter.waitFor(() -> receivedThrowable.get() != null);
375 Future<?> result = replyWaiter.send(fcpMessage);
376 Throwable throwable = new Throwable();
377 replyWaiter.connectionClosed(fcpConnection, throwable);
379 assertThat(receivedThrowable.get(), is(throwable));