Allow the reply sequence to initiate own messages
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / FcpReplySequenceTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.is;
5 import static org.mockito.Mockito.mock;
6 import static org.mockito.Mockito.verify;
7
8 import java.io.IOException;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.atomic.AtomicBoolean;
14 import java.util.concurrent.atomic.AtomicReference;
15
16 import net.pterodactylus.fcp.AllData;
17 import net.pterodactylus.fcp.BaseMessage;
18 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
19 import net.pterodactylus.fcp.ConfigData;
20 import net.pterodactylus.fcp.DataFound;
21 import net.pterodactylus.fcp.EndListPeerNotes;
22 import net.pterodactylus.fcp.EndListPeers;
23 import net.pterodactylus.fcp.EndListPersistentRequests;
24 import net.pterodactylus.fcp.FCPPluginReply;
25 import net.pterodactylus.fcp.FcpConnection;
26 import net.pterodactylus.fcp.FcpMessage;
27 import net.pterodactylus.fcp.FinishedCompression;
28 import net.pterodactylus.fcp.GetFailed;
29 import net.pterodactylus.fcp.IdentifierCollision;
30 import net.pterodactylus.fcp.NodeData;
31 import net.pterodactylus.fcp.NodeHello;
32 import net.pterodactylus.fcp.Peer;
33 import net.pterodactylus.fcp.PeerNote;
34 import net.pterodactylus.fcp.PeerRemoved;
35 import net.pterodactylus.fcp.PersistentGet;
36 import net.pterodactylus.fcp.PersistentPut;
37 import net.pterodactylus.fcp.PersistentPutDir;
38 import net.pterodactylus.fcp.PersistentRequestModified;
39 import net.pterodactylus.fcp.PersistentRequestRemoved;
40 import net.pterodactylus.fcp.PluginInfo;
41 import net.pterodactylus.fcp.ProtocolError;
42 import net.pterodactylus.fcp.PutFailed;
43 import net.pterodactylus.fcp.PutFetchable;
44 import net.pterodactylus.fcp.PutSuccessful;
45 import net.pterodactylus.fcp.ReceivedBookmarkFeed;
46 import net.pterodactylus.fcp.SSKKeypair;
47 import net.pterodactylus.fcp.SentFeed;
48 import net.pterodactylus.fcp.SimpleProgress;
49 import net.pterodactylus.fcp.StartedCompression;
50 import net.pterodactylus.fcp.SubscribedUSKUpdate;
51 import net.pterodactylus.fcp.TestDDAComplete;
52 import net.pterodactylus.fcp.TestDDAReply;
53 import net.pterodactylus.fcp.URIGenerated;
54 import net.pterodactylus.fcp.UnknownNodeIdentifier;
55 import net.pterodactylus.fcp.UnknownPeerNoteType;
56
57 import org.junit.Test;
58
59 /**
60  * Unit test for {@link FcpReplySequence}.
61  *
62  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
63  */
64 public class FcpReplySequenceTest {
65
66         private final FcpConnection fcpConnection = mock(FcpConnection.class);
67         private final ExecutorService executorService = Executors.newSingleThreadExecutor();
68         private final TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection);
69         private final FcpMessage fcpMessage = new FcpMessage("Test");
70
71         @Test
72         public void canSendMessage() throws IOException, ExecutionException, InterruptedException {
73                 FcpReplySequence replySequence = createBasicReplySequence();
74                 replySequence.send(fcpMessage).get();
75                 verify(fcpConnection).sendMessage(fcpMessage);
76         }
77
78         private FcpReplySequence createBasicReplySequence() {
79                 return new FcpReplySequence(executorService, fcpConnection) {
80                                 @Override
81                                 protected boolean isFinished() {
82                                         return true;
83                                 }
84                         };
85         }
86
87         @Test
88         public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
89                 FcpReplySequence replySequence = createBasicReplySequence();
90                 replySequence.send(fcpMessage);
91                 verify(fcpConnection).addFcpListener(replySequence);
92         }
93
94         @Test
95         public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
96                 FcpReplySequence replySequence = createBasicReplySequence();
97                 replySequence.send(fcpMessage);
98                 replySequence.close();
99                 verify(fcpConnection).removeFcpListener(replySequence);
100         }
101
102         private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, Class<M> messageClass, MessageCreator<M> messageCreator) throws IOException, InterruptedException, ExecutionException {
103                 waitForASpecificMessage(messageReceiver, messageCreator.create(new FcpMessage(messageClass.getSimpleName())));
104         }
105
106         private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver, M message) throws IOException, InterruptedException, ExecutionException {
107                 replySequence.setExpectedMessage(message.getName());
108                 Future<Boolean> result = replySequence.send(fcpMessage);
109                 messageReceiver.receiveMessage(fcpConnection, message);
110                 assertThat(result.get(), is(true));
111         }
112
113         private <M extends BaseMessage> M createMessage(Class<M> messageClass, MessageCreator<M> messageCreator) {
114                 return messageCreator.create(new FcpMessage(messageClass.getSimpleName()));
115         }
116
117         private interface MessageCreator<M extends BaseMessage> {
118
119                 M create(FcpMessage fcpMessage);
120
121         }
122
123         @Test
124         public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
125                 waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
126         }
127
128         @Test
129         public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
130                 waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
131         }
132
133         @Test
134         public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
135                 waitForASpecificMessage(replySequence::receivedSSKKeypair, SSKKeypair.class, SSKKeypair::new);
136         }
137
138         @Test
139         public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
140                 waitForASpecificMessage(replySequence::receivedPeer, Peer.class, Peer::new);
141         }
142
143         @Test
144         public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
145                 waitForASpecificMessage(replySequence::receivedEndListPeers, EndListPeers.class, EndListPeers::new);
146         }
147
148         @Test
149         public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
150                 waitForASpecificMessage(replySequence::receivedPeerNote, PeerNote.class, PeerNote::new);
151         }
152
153         @Test
154         public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
155                 waitForASpecificMessage(replySequence::receivedEndListPeerNotes, EndListPeerNotes.class, EndListPeerNotes::new);
156         }
157
158         @Test
159         public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
160                 waitForASpecificMessage(replySequence::receivedPeerRemoved, PeerRemoved.class, PeerRemoved::new);
161         }
162
163         @Test
164         public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
165                 waitForASpecificMessage(replySequence::receivedNodeData, new NodeData(
166                         new FcpMessage("NodeData").put("ark.pubURI", "")
167                                         .put("ark.number", "0")
168                                         .put("auth.negTypes", "")
169                                         .put("version", "0,0,0,0")
170                                         .put("lastGoodVersion", "0,0,0,0")));
171         }
172
173         @Test
174         public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
175                 waitForASpecificMessage(replySequence::receivedTestDDAReply, TestDDAReply.class, TestDDAReply::new);
176         }
177
178         @Test
179         public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
180                 waitForASpecificMessage(replySequence::receivedTestDDAComplete, TestDDAComplete.class, TestDDAComplete::new);
181         }
182
183         @Test
184         public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
185                 waitForASpecificMessage(replySequence::receivedPersistentGet, PersistentGet.class, PersistentGet::new);
186         }
187
188         @Test
189         public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
190                 waitForASpecificMessage(replySequence::receivedPersistentPut, PersistentPut.class, PersistentPut::new);
191         }
192
193         @Test
194         public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
195                 waitForASpecificMessage(replySequence::receivedEndListPersistentRequests, EndListPersistentRequests.class, EndListPersistentRequests::new);
196         }
197
198         @Test
199         public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
200                 waitForASpecificMessage(replySequence::receivedURIGenerated, URIGenerated.class, URIGenerated::new);
201         }
202
203         @Test
204         public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
205                 waitForASpecificMessage(replySequence::receivedDataFound, DataFound.class, DataFound::new);
206         }
207
208         @Test
209         public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
210                 waitForASpecificMessage(replySequence::receivedAllData, new AllData(new FcpMessage("AllData"), null));
211         }
212
213         @Test
214         public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
215                 waitForASpecificMessage(replySequence::receivedSimpleProgress, SimpleProgress.class, SimpleProgress::new);
216         }
217
218         @Test
219         public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
220                 waitForASpecificMessage(replySequence::receivedStartedCompression, StartedCompression.class, StartedCompression::new);
221         }
222
223         @Test
224         public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
225                 waitForASpecificMessage(replySequence::receivedFinishedCompression, FinishedCompression.class, FinishedCompression::new);
226         }
227
228         @Test
229         public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
230                 waitForASpecificMessage(replySequence::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, UnknownPeerNoteType::new);
231         }
232
233         @Test
234         public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
235                 waitForASpecificMessage(replySequence::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, UnknownNodeIdentifier::new);
236         }
237
238         @Test
239         public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
240                 waitForASpecificMessage(replySequence::receivedConfigData, ConfigData.class, ConfigData::new);
241         }
242
243         @Test
244         public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
245                 waitForASpecificMessage(replySequence::receivedGetFailed, GetFailed.class, GetFailed::new);
246         }
247
248         @Test
249         public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
250                 waitForASpecificMessage(replySequence::receivedPutFailed, PutFailed.class, PutFailed::new);
251         }
252
253         @Test
254         public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
255                 waitForASpecificMessage(replySequence::receivedIdentifierCollision, IdentifierCollision.class, IdentifierCollision::new);
256         }
257
258         @Test
259         public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
260                 waitForASpecificMessage(replySequence::receivedPersistentPutDir, PersistentPutDir.class, PersistentPutDir::new);
261         }
262
263         @Test
264         public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
265                 waitForASpecificMessage(replySequence::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, PersistentRequestRemoved::new);
266         }
267
268         @Test
269         public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
270                 waitForASpecificMessage(replySequence::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, SubscribedUSKUpdate::new);
271         }
272
273         @Test
274         public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
275                 waitForASpecificMessage(replySequence::receivedPluginInfo, PluginInfo.class, PluginInfo::new);
276         }
277
278         @Test
279         public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
280                 waitForASpecificMessage(replySequence::receivedFCPPluginReply, new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
281         }
282
283         @Test
284         public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
285                 waitForASpecificMessage(replySequence::receivedPersistentRequestModified, PersistentRequestModified.class, PersistentRequestModified::new);
286         }
287
288         @Test
289         public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
290                 waitForASpecificMessage(replySequence::receivedPutSuccessful, PutSuccessful.class, PutSuccessful::new);
291         }
292
293         @Test
294         public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
295                 waitForASpecificMessage(replySequence::receivedPutFetchable, PutFetchable.class, PutFetchable::new);
296         }
297
298         @Test
299         public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
300                 waitForASpecificMessage(replySequence::receivedSentFeed, SentFeed.class, SentFeed::new);
301         }
302
303         @Test
304         public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
305                 waitForASpecificMessage(replySequence::receivedBookmarkFeed, ReceivedBookmarkFeed.class, ReceivedBookmarkFeed::new);
306         }
307
308         @Test
309         public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
310                 waitForASpecificMessage(replySequence::receivedProtocolError, ProtocolError.class, ProtocolError::new);
311         }
312
313         @Test
314         public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
315                 replySequence.setExpectedMessage("SomeFcpMessage");
316                 Future<Boolean> result = replySequence.send(fcpMessage);
317                 replySequence.receivedMessage(fcpConnection, new FcpMessage("SomeFcpMessage"));
318                 assertThat(result.get(), is(true));
319         }
320
321         @Test
322         public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
323                 TestFcpReplySequence replySequence = new TestFcpReplySequence(executorService, fcpConnection) {
324                         private final AtomicBoolean gotPutFailed = new AtomicBoolean();
325                         private final AtomicBoolean gotGetFailed = new AtomicBoolean();
326
327                         @Override
328                         protected boolean isFinished() {
329                                 return gotPutFailed.get() && gotGetFailed.get();
330                         }
331
332                         @Override
333                         protected Boolean getResult() {
334                                 return isFinished();
335                         }
336
337                         @Override
338                         protected void consumePutFailed(PutFailed putFailed) {
339                                 gotPutFailed.set(true);
340                         }
341
342                         @Override
343                         protected void consumeGetFailed(GetFailed getFailed) {
344                                 gotGetFailed.set(true);
345                         }
346                 };
347                 Future<?> result = replySequence.send(fcpMessage);
348                 assertThat(result.isDone(), is(false));
349                 replySequence.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
350                 assertThat(result.isDone(), is(false));
351                 replySequence.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
352                 assertThat(result.get(), is(true));
353         }
354
355         @Test
356         public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
357                 replySequence.setExpectedMessage("ConnectionClosed");
358                 Future<Boolean> result = replySequence.send(fcpMessage);
359                 Throwable throwable = new Throwable();
360                 replySequence.connectionClosed(fcpConnection, throwable);
361                 assertThat(result.get(), is(true));
362                 assertThat(replySequence.receivedThrowable.get(), is(throwable));
363         }
364
365         @FunctionalInterface
366         private interface MessageReceiver<M> {
367
368                 void receiveMessage(FcpConnection fcpConnection, M message);
369
370         }
371
372         private static class TestFcpReplySequence extends FcpReplySequence<Boolean> {
373
374                 private final AtomicReference<String> gotMessage = new AtomicReference<>();
375                 private final AtomicReference<String> expectedMessage = new AtomicReference<>();
376                 private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
377
378                 public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
379                         super(executorService, fcpConnection);
380                 }
381
382                 public void setExpectedMessage(String expectedMessage) {
383                         this.expectedMessage.set(expectedMessage);
384                 }
385
386                 @Override
387                 protected boolean isFinished() {
388                         return getResult();
389                 }
390
391                 @Override
392                 protected Boolean getResult() {
393                         return expectedMessage.get().equals(gotMessage.get());
394                 }
395
396                 @Override
397                 protected void consumeNodeHello(NodeHello nodeHello) {
398                         gotMessage.set(nodeHello.getName());
399                 }
400
401                 @Override
402                 protected void consumeCloseConnectionDuplicateClientName(
403                         CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
404                         gotMessage.set(closeConnectionDuplicateClientName.getName());
405                 }
406
407                 @Override
408                 protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
409                         gotMessage.set(sskKeypair.getName());
410                 }
411
412                 @Override
413                 protected void consumePeer(Peer peer) {
414                         gotMessage.set(peer.getName());
415                 }
416
417                 @Override
418                 protected void consumeEndListPeers(EndListPeers endListPeers) {
419                         gotMessage.set(endListPeers.getName());
420                 }
421
422                 @Override
423                 protected void consumePeerNote(PeerNote peerNote) {
424                         gotMessage.set(peerNote.getName());
425                 }
426
427                 @Override
428                 protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
429                         gotMessage.set(endListPeerNotes.getName());
430                 }
431
432                 @Override
433                 protected void consumePeerRemoved(PeerRemoved peerRemoved) {
434                         gotMessage.set(peerRemoved.getName());
435                 }
436
437                 @Override
438                 protected void consumeNodeData(NodeData nodeData) {
439                         gotMessage.set(nodeData.getName());
440                 }
441
442                 @Override
443                 protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
444                         gotMessage.set(testDDAReply.getName());
445                 }
446
447                 @Override
448                 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
449                         gotMessage.set(testDDAComplete.getName());
450                 }
451
452                 @Override
453                 protected void consumePersistentGet(PersistentGet persistentGet) {
454                         gotMessage.set(persistentGet.getName());
455                 }
456
457                 @Override
458                 protected void consumePersistentPut(PersistentPut persistentPut) {
459                         gotMessage.set(persistentPut.getName());
460                 }
461
462                 @Override
463                 protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
464                         gotMessage.set(endListPersistentRequests.getName());
465                 }
466
467                 @Override
468                 protected void consumeURIGenerated(URIGenerated uriGenerated) {
469                         gotMessage.set(uriGenerated.getName());
470                 }
471
472                 @Override
473                 protected void consumeDataFound(DataFound dataFound) {
474                         gotMessage.set(dataFound.getName());
475                 }
476
477                 @Override
478                 protected void consumeAllData(AllData allData) {
479                         gotMessage.set(allData.getName());
480                 }
481
482                 @Override
483                 protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
484                         gotMessage.set(simpleProgress.getName());
485                 }
486
487                 @Override
488                 protected void consumeStartedCompression(StartedCompression startedCompression) {
489                         gotMessage.set(startedCompression.getName());
490                 }
491
492                 @Override
493                 protected void consumeFinishedCompression(FinishedCompression finishedCompression) {
494                         gotMessage.set(finishedCompression.getName());
495                 }
496
497                 @Override
498                 protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
499                         gotMessage.set(unknownPeerNoteType.getName());
500                 }
501
502                 @Override
503                 protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
504                         gotMessage.set(unknownNodeIdentifier.getName());
505                 }
506
507                 @Override
508                 protected void consumeConfigData(ConfigData configData) {
509                         gotMessage.set(configData.getName());
510                 }
511
512                 @Override
513                 protected void consumeGetFailed(GetFailed getFailed) {
514                         gotMessage.set(getFailed.getName());
515                 }
516
517                 @Override
518                 protected void consumePutFailed(PutFailed putFailed) {
519                         gotMessage.set(putFailed.getName());
520                 }
521
522                 @Override
523                 protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
524                         gotMessage.set(identifierCollision.getName());
525                 }
526
527                 @Override
528                 protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) {
529                         gotMessage.set(persistentPutDir.getName());
530                 }
531
532                 @Override
533                 protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
534                         gotMessage.set(persistentRequestRemoved.getName());
535                 }
536
537                 @Override
538                 protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
539                         gotMessage.set(subscribedUSKUpdate.getName());
540                 }
541
542                 @Override
543                 protected void consumePluginInfo(PluginInfo pluginInfo) {
544                         gotMessage.set(pluginInfo.getName());
545                 }
546
547                 @Override
548                 protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) {
549                         gotMessage.set(fcpPluginReply.getName());
550                 }
551
552                 @Override
553                 protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) {
554                         gotMessage.set(persistentRequestModified.getName());
555                 }
556
557                 @Override
558                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
559                         gotMessage.set(putSuccessful.getName());
560                 }
561
562                 @Override
563                 protected void consumePutFetchable(PutFetchable putFetchable) {
564                         gotMessage.set(putFetchable.getName());
565                 }
566
567                 @Override
568                 protected void consumeSentFeed(SentFeed sentFeed) {
569                         gotMessage.set(sentFeed.getName());
570                 }
571
572                 @Override
573                 protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) {
574                         gotMessage.set(receivedBookmarkFeed.getName());
575                 }
576
577                 @Override
578                 protected void consumeProtocolError(ProtocolError protocolError) {
579                         gotMessage.set(protocolError.getName());
580                 }
581
582                 @Override
583                 protected void consumeUnknownMessage(FcpMessage fcpMessage) {
584                         gotMessage.set(fcpMessage.getName());
585                 }
586
587                 @Override
588                 protected void consumeConnectionClosed(Throwable throwable) {
589                         receivedThrowable.set(throwable);
590                         gotMessage.set("ConnectionClosed");
591                 }
592
593         }
594
595 }