1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
14 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.function.Supplier;
32 import java.util.stream.Collectors;
34 import net.pterodactylus.fcp.ARK;
35 import net.pterodactylus.fcp.ConfigData;
36 import net.pterodactylus.fcp.DSAGroup;
37 import net.pterodactylus.fcp.FcpKeyPair;
38 import net.pterodactylus.fcp.Key;
39 import net.pterodactylus.fcp.NodeData;
40 import net.pterodactylus.fcp.NodeRef;
41 import net.pterodactylus.fcp.Peer;
42 import net.pterodactylus.fcp.PeerNote;
43 import net.pterodactylus.fcp.PluginInfo;
44 import net.pterodactylus.fcp.Priority;
45 import net.pterodactylus.fcp.fake.FakeTcpServer;
46 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
48 import com.google.common.io.ByteStreams;
49 import com.google.common.io.Files;
50 import com.nitorcreations.junit.runners.NestedRunner;
51 import org.hamcrest.Description;
52 import org.hamcrest.Matcher;
53 import org.hamcrest.TypeSafeDiagnosingMatcher;
54 import org.junit.After;
55 import org.junit.Assert;
56 import org.junit.Test;
57 import org.junit.runner.RunWith;
60 * Unit test for {@link DefaultFcpClient}.
62 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
64 @RunWith(NestedRunner.class)
65 public class DefaultFcpClientTest {
67 private static final String INSERT_URI =
68 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
69 private static final String REQUEST_URI =
70 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
72 private int threadCounter = 0;
73 private final ExecutorService threadPool =
74 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
75 private final FakeTcpServer fcpServer;
76 private final DefaultFcpClient fcpClient;
78 public DefaultFcpClientTest() throws IOException {
79 fcpServer = new FakeTcpServer(threadPool);
80 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
84 public void tearDown() throws IOException {
86 threadPool.shutdown();
89 @Test(expected = ExecutionException.class)
90 public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
91 throws IOException, ExecutionException, InterruptedException {
92 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
93 fcpServer.connect().get();
94 fcpServer.collectUntil(is("EndMessage"));
96 "CloseConnectionDuplicateClientName",
102 @Test(expected = ExecutionException.class)
103 public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
104 throws IOException, ExecutionException, InterruptedException {
105 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
106 fcpServer.connect().get();
107 fcpServer.collectUntil(is("EndMessage"));
113 public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
114 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
116 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
117 String identifier = extractIdentifier(lines);
118 fcpServer.writeLine("SSKKeypair",
119 "InsertURI=" + INSERT_URI + "",
120 "RequestURI=" + REQUEST_URI + "",
121 "Identifier=" + identifier,
123 FcpKeyPair keyPair = keyPairFuture.get();
124 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
125 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
128 private void connectNode() throws InterruptedException, ExecutionException, IOException {
129 fcpServer.connect().get();
130 fcpServer.collectUntil(is("EndMessage"));
131 fcpServer.writeLine("NodeHello",
132 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
133 "Revision=build01466",
135 "Version=Fred,0.7,1.0,1466",
137 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
141 "NodeLanguage=ENGLISH",
148 public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
149 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
151 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
152 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
153 String identifier = extractIdentifier(lines);
156 "Identifier=" + identifier,
158 "StartupTime=1435610539000",
159 "CompletionTime=1435610540000",
160 "Metadata.ContentType=text/plain;charset=utf-8",
164 Optional<Data> data = dataFuture.get();
165 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
166 assertThat(data.get().size(), is(6L));
167 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
168 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
171 private String extractIdentifier(List<String> lines) {
172 return lines.stream()
173 .filter(s -> s.startsWith("Identifier="))
174 .map(s -> s.substring(s.indexOf('=') + 1))
180 public void clientGetDownloadsDataForCorrectIdentifier()
181 throws InterruptedException, ExecutionException, IOException {
182 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
184 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
185 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
186 String identifier = extractIdentifier(lines);
189 "Identifier=not-test",
191 "StartupTime=1435610539000",
192 "CompletionTime=1435610540000",
193 "Metadata.ContentType=text/plain;charset=latin-9",
199 "Identifier=" + identifier,
201 "StartupTime=1435610539000",
202 "CompletionTime=1435610540000",
203 "Metadata.ContentType=text/plain;charset=utf-8",
207 Optional<Data> data = dataFuture.get();
208 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
209 assertThat(data.get().size(), is(6L));
210 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
211 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
215 public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
216 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
218 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
219 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
220 String identifier = extractIdentifier(lines);
223 "Identifier=" + identifier,
227 Optional<Data> data = dataFuture.get();
228 assertThat(data.isPresent(), is(false));
232 public void clientGetRecognizesGetFailedForCorrectIdentifier()
233 throws InterruptedException, ExecutionException, IOException {
234 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
236 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
237 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
238 String identifier = extractIdentifier(lines);
241 "Identifier=not-test",
247 "Identifier=" + identifier,
251 Optional<Data> data = dataFuture.get();
252 assertThat(data.isPresent(), is(false));
255 @Test(expected = ExecutionException.class)
256 public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
257 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
259 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
260 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
266 public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
267 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
269 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
270 String identifier = extractIdentifier(lines);
273 "InsertURI=" + INSERT_URI + "",
274 "RequestURI=" + REQUEST_URI + "",
275 "Identifier=" + identifier,
279 keyPair = fcpClient.generateKeypair().execute();
280 lines = fcpServer.collectUntil(is("EndMessage"));
281 identifier = extractIdentifier(lines);
284 "InsertURI=" + INSERT_URI + "",
285 "RequestURI=" + REQUEST_URI + "",
286 "Identifier=" + identifier,
293 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
294 throws InterruptedException, ExecutionException, IOException {
295 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
297 fcpServer.collectUntil(is("EndMessage"));
302 } catch (ExecutionException e) {
304 keyPair = fcpClient.generateKeypair().execute();
306 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
307 String identifier = extractIdentifier(lines);
310 "InsertURI=" + INSERT_URI + "",
311 "RequestURI=" + REQUEST_URI + "",
312 "Identifier=" + identifier,
319 public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
320 throws InterruptedException, ExecutionException, IOException {
321 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
323 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
324 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
328 public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
329 throws InterruptedException, ExecutionException, IOException {
330 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
332 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
333 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
337 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
338 throws InterruptedException, ExecutionException, IOException {
339 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
341 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
342 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
346 public void clientGetWithPrioritySettingSendsCorrectCommands()
347 throws InterruptedException, ExecutionException, IOException {
348 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
350 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
351 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
355 public void clientGetWithRealTimeSettingSendsCorrectCommands()
356 throws InterruptedException, ExecutionException, IOException {
357 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
359 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
360 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
364 public void clientGetWithGlobalSettingSendsCorrectCommands()
365 throws InterruptedException, ExecutionException, IOException {
366 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
368 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
369 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
372 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
373 return new TypeSafeDiagnosingMatcher<List<String>>() {
375 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
376 if (!item.get(0).equals(name)) {
377 mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
380 for (String requiredLine : requiredLines) {
381 if (item.indexOf(requiredLine) < 1) {
382 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
390 public void describeTo(Description description) {
391 description.appendText("FCP message named ").appendValue(name);
392 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
398 public void clientPutWithDirectDataSendsCorrectCommand()
399 throws IOException, ExecutionException, InterruptedException {
400 fcpClient.clientPut()
401 .from(new ByteArrayInputStream("Hello\n".getBytes()))
406 List<String> lines = fcpServer.collectUntil(is("Hello"));
407 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
411 public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
412 throws InterruptedException, ExecutionException, IOException {
413 Future<Optional<Key>> key = fcpClient.clientPut()
414 .from(new ByteArrayInputStream("Hello\n".getBytes()))
419 List<String> lines = fcpServer.collectUntil(is("Hello"));
420 String identifier = extractIdentifier(lines);
423 "Identifier=not-the-right-one",
429 "Identifier=" + identifier,
432 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
436 public void clientPutWithDirectDataFailsOnCorrectIdentifier()
437 throws InterruptedException, ExecutionException, IOException {
438 Future<Optional<Key>> key = fcpClient.clientPut()
439 .from(new ByteArrayInputStream("Hello\n".getBytes()))
444 List<String> lines = fcpServer.collectUntil(is("Hello"));
445 String identifier = extractIdentifier(lines);
448 "Identifier=not-the-right-one",
454 "Identifier=" + identifier,
457 assertThat(key.get().isPresent(), is(false));
461 public void clientPutWithRenamedDirectDataSendsCorrectCommand()
462 throws InterruptedException, ExecutionException, IOException {
463 fcpClient.clientPut()
464 .named("otherName.txt")
465 .from(new ByteArrayInputStream("Hello\n".getBytes()))
470 List<String> lines = fcpServer.collectUntil(is("Hello"));
471 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
472 "DataLength=6", "URI=KSK@foo.txt"));
476 public void clientPutWithRedirectSendsCorrectCommand()
477 throws IOException, ExecutionException, InterruptedException {
478 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
480 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
482 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
486 public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
487 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
489 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
491 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
495 public void clientPutWithFileCanCompleteTestDdaSequence()
496 throws IOException, ExecutionException, InterruptedException {
497 File tempFile = createTempFile();
498 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
500 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
501 String identifier = extractIdentifier(lines);
504 "Identifier=" + identifier,
508 lines = fcpServer.collectUntil(is("EndMessage"));
509 assertThat(lines, matchesFcpMessage(
511 "Directory=" + tempFile.getParent(),
512 "WantReadDirectory=true",
513 "WantWriteDirectory=false",
518 "Directory=" + tempFile.getParent(),
519 "ReadFilename=" + tempFile,
522 lines = fcpServer.collectUntil(is("EndMessage"));
523 assertThat(lines, matchesFcpMessage(
525 "Directory=" + tempFile.getParent(),
526 "ReadContent=test-content",
531 "Directory=" + tempFile.getParent(),
532 "ReadDirectoryAllowed=true",
535 lines = fcpServer.collectUntil(is("EndMessage"));
537 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
538 "Filename=" + new File(tempFile.getParent(), "test.dat")));
541 private File createTempFile() throws IOException {
542 File tempFile = File.createTempFile("test-dda-", ".dat");
543 tempFile.deleteOnExit();
544 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
549 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
550 throws InterruptedException, ExecutionException, IOException {
551 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
553 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
554 String identifier = extractIdentifier(lines);
557 "Identifier=not-the-right-one",
563 "Identifier=" + identifier,
567 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
571 public void clientPutAbortsOnProtocolErrorOtherThan25()
572 throws InterruptedException, ExecutionException, IOException {
573 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
575 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
576 String identifier = extractIdentifier(lines);
579 "Identifier=" + identifier,
583 assertThat(key.get().isPresent(), is(false));
587 public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
588 InterruptedException {
589 File tempFile = createTempFile();
590 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
592 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
593 String identifier = extractIdentifier(lines);
596 "Identifier=" + identifier,
600 lines = fcpServer.collectUntil(is("EndMessage"));
601 assertThat(lines, matchesFcpMessage(
603 "Directory=" + tempFile.getParent(),
604 "WantReadDirectory=true",
605 "WantWriteDirectory=false",
610 "Directory=/some-other-directory",
611 "ReadFilename=" + tempFile,
616 "Directory=" + tempFile.getParent(),
617 "ReadFilename=" + tempFile,
620 lines = fcpServer.collectUntil(is("EndMessage"));
621 assertThat(lines, matchesFcpMessage(
623 "Directory=" + tempFile.getParent(),
624 "ReadContent=test-content",
630 public void clientPutSendsResponseEvenIfFileCanNotBeRead()
631 throws IOException, ExecutionException, InterruptedException {
632 File tempFile = createTempFile();
633 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
635 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
636 String identifier = extractIdentifier(lines);
639 "Identifier=" + identifier,
643 lines = fcpServer.collectUntil(is("EndMessage"));
644 assertThat(lines, matchesFcpMessage(
646 "Directory=" + tempFile.getParent(),
647 "WantReadDirectory=true",
648 "WantWriteDirectory=false",
653 "Directory=" + tempFile.getParent(),
654 "ReadFilename=" + tempFile + ".foo",
657 lines = fcpServer.collectUntil(is("EndMessage"));
658 assertThat(lines, matchesFcpMessage(
660 "Directory=" + tempFile.getParent(),
661 "ReadContent=failed-to-read",
667 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
668 throws IOException, ExecutionException, InterruptedException {
669 File tempFile = createTempFile();
670 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
672 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
673 String identifier = extractIdentifier(lines);
676 "Directory=/some-other-directory",
681 "Identifier=" + identifier,
685 lines = fcpServer.collectUntil(is("EndMessage"));
686 assertThat(lines, matchesFcpMessage(
688 "Directory=" + tempFile.getParent(),
689 "WantReadDirectory=true",
690 "WantWriteDirectory=false",
696 public void clientPutSendsNotificationsForGeneratedKeys()
697 throws InterruptedException, ExecutionException, IOException {
698 List<String> generatedKeys = new CopyOnWriteArrayList<>();
699 Future<Optional<Key>> key = fcpClient.clientPut()
700 .onKeyGenerated(generatedKeys::add)
701 .from(new ByteArrayInputStream("Hello\n".getBytes()))
706 List<String> lines = fcpServer.collectUntil(is("Hello"));
707 String identifier = extractIdentifier(lines);
710 "Identifier=" + identifier,
717 "Identifier=" + identifier,
720 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
721 assertThat(generatedKeys, contains("KSK@foo.txt"));
725 public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
726 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
728 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
729 assertThat(lines, matchesFcpMessage(
731 "WithVolatile=false",
732 "WithMetadata=false",
735 String identifier = extractIdentifier(lines);
738 "Identifier=" + identifier,
744 "Identifier=" + identifier,
750 "Identifier=" + identifier,
753 assertThat(peers.get(), hasSize(2));
754 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
755 containsInAnyOrder("id1", "id2"));
759 public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
760 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
762 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
763 assertThat(lines, matchesFcpMessage(
765 "WithVolatile=false",
769 String identifier = extractIdentifier(lines);
772 "Identifier=" + identifier,
779 "Identifier=" + identifier,
786 "Identifier=" + identifier,
789 assertThat(peers.get(), hasSize(2));
790 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
791 containsInAnyOrder("bar1", "bar2"));
795 public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
796 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
798 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
799 assertThat(lines, matchesFcpMessage(
802 "WithMetadata=false",
805 String identifier = extractIdentifier(lines);
808 "Identifier=" + identifier,
815 "Identifier=" + identifier,
822 "Identifier=" + identifier,
825 assertThat(peers.get(), hasSize(2));
826 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
827 containsInAnyOrder("bar1", "bar2"));
831 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
832 Future<NodeData> nodeData = fcpClient.getNode().execute();
834 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
835 String identifier = extractIdentifier(lines);
836 assertThat(lines, matchesFcpMessage(
838 "Identifier=" + identifier,
839 "GiveOpennetRef=false",
841 "WithVolatile=false",
846 "Identifier=" + identifier,
847 "ark.pubURI=SSK@3YEf.../ark",
850 "version=Fred,0.7,1.0,1466",
851 "lastGoodVersion=Fred,0.7,1.0,1466",
854 assertThat(nodeData.get(), notNullValue());
858 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
859 throws InterruptedException, ExecutionException, IOException {
860 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
862 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
863 String identifier = extractIdentifier(lines);
864 assertThat(lines, matchesFcpMessage(
866 "Identifier=" + identifier,
867 "GiveOpennetRef=true",
869 "WithVolatile=false",
874 "Identifier=" + identifier,
876 "ark.pubURI=SSK@3YEf.../ark",
879 "version=Fred,0.7,1.0,1466",
880 "lastGoodVersion=Fred,0.7,1.0,1466",
883 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
887 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
888 throws InterruptedException, ExecutionException, IOException {
889 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
891 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
892 String identifier = extractIdentifier(lines);
893 assertThat(lines, matchesFcpMessage(
895 "Identifier=" + identifier,
896 "GiveOpennetRef=false",
898 "WithVolatile=false",
903 "Identifier=" + identifier,
905 "ark.pubURI=SSK@3YEf.../ark",
908 "version=Fred,0.7,1.0,1466",
909 "lastGoodVersion=Fred,0.7,1.0,1466",
910 "ark.privURI=SSK@XdHMiRl",
913 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
917 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
918 throws InterruptedException, ExecutionException, IOException {
919 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
921 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
922 String identifier = extractIdentifier(lines);
923 assertThat(lines, matchesFcpMessage(
925 "Identifier=" + identifier,
926 "GiveOpennetRef=false",
933 "Identifier=" + identifier,
935 "ark.pubURI=SSK@3YEf.../ark",
938 "version=Fred,0.7,1.0,1466",
939 "lastGoodVersion=Fred,0.7,1.0,1466",
940 "volatile.freeJavaMemory=205706528",
943 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
947 public void defaultFcpClientCanListSinglePeerByIdentity()
948 throws InterruptedException, ExecutionException, IOException {
949 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
951 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
952 String identifier = extractIdentifier(lines);
953 assertThat(lines, matchesFcpMessage(
955 "Identifier=" + identifier,
956 "NodeIdentifier=id1",
961 "Identifier=" + identifier,
964 "ark.pubURI=SSK@3YEf.../ark",
967 "version=Fred,0.7,1.0,1466",
968 "lastGoodVersion=Fred,0.7,1.0,1466",
971 assertThat(peer.get().get().getIdentity(), is("id1"));
975 public void defaultFcpClientCanListSinglePeerByHostAndPort()
976 throws InterruptedException, ExecutionException, IOException {
977 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
979 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
980 String identifier = extractIdentifier(lines);
981 assertThat(lines, matchesFcpMessage(
983 "Identifier=" + identifier,
984 "NodeIdentifier=host.free.net:12345",
989 "Identifier=" + identifier,
992 "ark.pubURI=SSK@3YEf.../ark",
995 "version=Fred,0.7,1.0,1466",
996 "lastGoodVersion=Fred,0.7,1.0,1466",
999 assertThat(peer.get().get().getIdentity(), is("id1"));
1003 public void defaultFcpClientCanListSinglePeerByName()
1004 throws InterruptedException, ExecutionException, IOException {
1005 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
1007 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1008 String identifier = extractIdentifier(lines);
1009 assertThat(lines, matchesFcpMessage(
1011 "Identifier=" + identifier,
1012 "NodeIdentifier=FriendNode",
1015 fcpServer.writeLine(
1017 "Identifier=" + identifier,
1020 "ark.pubURI=SSK@3YEf.../ark",
1023 "version=Fred,0.7,1.0,1466",
1024 "lastGoodVersion=Fred,0.7,1.0,1466",
1027 assertThat(peer.get().get().getIdentity(), is("id1"));
1031 public void defaultFcpClientRecognizesUnknownNodeIdentifiers()
1032 throws InterruptedException, ExecutionException, IOException {
1033 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
1035 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1036 String identifier = extractIdentifier(lines);
1037 assertThat(lines, matchesFcpMessage(
1039 "Identifier=" + identifier,
1040 "NodeIdentifier=id2",
1043 fcpServer.writeLine(
1044 "UnknownNodeIdentifier",
1045 "Identifier=" + identifier,
1046 "NodeIdentifier=id2",
1049 assertThat(peer.get().isPresent(), is(false));
1053 public void defaultFcpClientCanAddPeerFromFile() throws InterruptedException, ExecutionException, IOException {
1054 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
1056 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1057 String identifier = extractIdentifier(lines);
1058 assertThat(lines, matchesFcpMessage(
1060 "Identifier=" + identifier,
1061 "File=/tmp/ref.txt",
1064 fcpServer.writeLine(
1066 "Identifier=" + identifier,
1069 "ark.pubURI=SSK@3YEf.../ark",
1072 "version=Fred,0.7,1.0,1466",
1073 "lastGoodVersion=Fred,0.7,1.0,1466",
1076 assertThat(peer.get().get().getIdentity(), is("id1"));
1080 public void defaultFcpClientCanAddPeerFromURL() throws InterruptedException, ExecutionException, IOException {
1081 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
1083 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1084 String identifier = extractIdentifier(lines);
1085 assertThat(lines, matchesFcpMessage(
1087 "Identifier=" + identifier,
1088 "URL=http://node.ref/",
1091 fcpServer.writeLine(
1093 "Identifier=" + identifier,
1096 "ark.pubURI=SSK@3YEf.../ark",
1099 "version=Fred,0.7,1.0,1466",
1100 "lastGoodVersion=Fred,0.7,1.0,1466",
1103 assertThat(peer.get().get().getIdentity(), is("id1"));
1107 public void defaultFcpClientCanAddPeerFromNodeRef() throws InterruptedException, ExecutionException, IOException {
1108 NodeRef nodeRef = new NodeRef();
1109 nodeRef.setIdentity("id1");
1110 nodeRef.setName("name");
1111 nodeRef.setARK(new ARK("public", "1"));
1112 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
1113 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
1114 nodeRef.setPhysicalUDP("1.2.3.4:5678");
1115 nodeRef.setDSAPublicKey("dsa-public");
1116 nodeRef.setSignature("sig");
1117 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
1119 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1120 String identifier = extractIdentifier(lines);
1121 assertThat(lines, matchesFcpMessage(
1123 "Identifier=" + identifier,
1126 "ark.pubURI=public",
1130 "dsaGroup.q=subprime",
1131 "dsaPubKey.y=dsa-public",
1132 "physical.udp=1.2.3.4:5678",
1133 "auth.negTypes=3;5",
1137 fcpServer.writeLine(
1139 "Identifier=" + identifier,
1142 "ark.pubURI=SSK@3YEf.../ark",
1145 "version=Fred,0.7,1.0,1466",
1146 "lastGoodVersion=Fred,0.7,1.0,1466",
1149 assertThat(peer.get().get().getIdentity(), is("id1"));
1153 public void listPeerNotesCanGetPeerNotesByNodeName() throws InterruptedException, ExecutionException, IOException {
1154 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1156 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1157 String identifier = extractIdentifier(lines);
1158 assertThat(lines, matchesFcpMessage(
1160 "NodeIdentifier=Friend1",
1163 fcpServer.writeLine(
1165 "Identifier=" + identifier,
1166 "NodeIdentifier=Friend1",
1167 "NoteText=RXhhbXBsZSBUZXh0Lg==",
1171 fcpServer.writeLine(
1173 "Identifier=" + identifier,
1176 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1177 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1181 public void listPeerNotesReturnsEmptyOptionalWhenNodeIdenfierUnknown()
1182 throws InterruptedException, ExecutionException,
1184 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1186 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1187 String identifier = extractIdentifier(lines);
1188 assertThat(lines, matchesFcpMessage(
1190 "NodeIdentifier=Friend1",
1193 fcpServer.writeLine(
1194 "UnknownNodeIdentifier",
1195 "Identifier=" + identifier,
1196 "NodeIdentifier=Friend1",
1199 assertThat(peerNote.get().isPresent(), is(false));
1203 public void listPeerNotesCanGetPeerNotesByNodeIdentifier()
1204 throws InterruptedException, ExecutionException, IOException {
1205 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
1207 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1208 String identifier = extractIdentifier(lines);
1209 assertThat(lines, matchesFcpMessage(
1211 "NodeIdentifier=id1",
1214 fcpServer.writeLine(
1216 "Identifier=" + identifier,
1217 "NodeIdentifier=id1",
1218 "NoteText=RXhhbXBsZSBUZXh0Lg==",
1222 fcpServer.writeLine(
1224 "Identifier=" + identifier,
1227 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1228 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1232 public void listPeerNotesCanGetPeerNotesByHostNameAndPortNumber()
1233 throws InterruptedException, ExecutionException, IOException {
1234 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
1236 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1237 String identifier = extractIdentifier(lines);
1238 assertThat(lines, matchesFcpMessage(
1240 "NodeIdentifier=1.2.3.4:5678",
1243 fcpServer.writeLine(
1245 "Identifier=" + identifier,
1246 "NodeIdentifier=id1",
1247 "NoteText=RXhhbXBsZSBUZXh0Lg==",
1251 fcpServer.writeLine(
1253 "Identifier=" + identifier,
1256 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1257 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1261 public void defaultFcpClientCanEnablePeerByName() throws InterruptedException, ExecutionException, IOException {
1262 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("Friend1").execute();
1264 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1265 String identifier = extractIdentifier(lines);
1266 assertThat(lines, matchesFcpMessage(
1268 "Identifier=" + identifier,
1269 "NodeIdentifier=Friend1",
1273 fcpServer.writeLine(
1275 "Identifier=" + identifier,
1276 "NodeIdentifier=Friend1",
1280 assertThat(peer.get().get().getIdentity(), is("id1"));
1284 public void defaultFcpClientCanDisablePeerByName() throws InterruptedException, ExecutionException, IOException {
1285 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("Friend1").execute();
1287 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1288 String identifier = extractIdentifier(lines);
1289 assertThat(lines, matchesFcpMessage(
1291 "Identifier=" + identifier,
1292 "NodeIdentifier=Friend1",
1296 fcpServer.writeLine(
1298 "Identifier=" + identifier,
1299 "NodeIdentifier=Friend1",
1303 assertThat(peer.get().get().getIdentity(), is("id1"));
1307 public void defaultFcpClientCanEnablePeerByIdentity() throws InterruptedException, ExecutionException, IOException {
1308 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
1310 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1311 String identifier = extractIdentifier(lines);
1312 assertThat(lines, matchesFcpMessage(
1314 "Identifier=" + identifier,
1315 "NodeIdentifier=id1",
1319 fcpServer.writeLine(
1321 "Identifier=" + identifier,
1322 "NodeIdentifier=Friend1",
1326 assertThat(peer.get().get().getIdentity(), is("id1"));
1330 public void defaultFcpClientCanEnablePeerByHostAndPort()
1331 throws InterruptedException, ExecutionException, IOException {
1332 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
1334 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1335 String identifier = extractIdentifier(lines);
1336 assertThat(lines, matchesFcpMessage(
1338 "Identifier=" + identifier,
1339 "NodeIdentifier=1.2.3.4:5678",
1343 fcpServer.writeLine(
1345 "Identifier=" + identifier,
1346 "NodeIdentifier=Friend1",
1350 assertThat(peer.get().get().getIdentity(), is("id1"));
1354 public void defaultFcpClientCanNotModifyPeerOfUnknownNode()
1355 throws InterruptedException, ExecutionException, IOException {
1356 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
1358 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1359 String identifier = extractIdentifier(lines);
1360 assertThat(lines, matchesFcpMessage(
1362 "Identifier=" + identifier,
1363 "NodeIdentifier=id1",
1367 fcpServer.writeLine(
1368 "UnknownNodeIdentifier",
1369 "Identifier=" + identifier,
1370 "NodeIdentifier=id1",
1373 assertThat(peer.get().isPresent(), is(false));
1377 public void defaultFcpClientCanAllowLocalAddressesOfPeer()
1378 throws InterruptedException, ExecutionException, IOException {
1379 Future<Optional<Peer>> peer = fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
1381 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1382 String identifier = extractIdentifier(lines);
1383 assertThat(lines, matchesFcpMessage(
1385 "Identifier=" + identifier,
1386 "NodeIdentifier=id1",
1387 "AllowLocalAddresses=true",
1390 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1391 fcpServer.writeLine(
1393 "Identifier=" + identifier,
1394 "NodeIdentifier=Friend1",
1398 assertThat(peer.get().get().getIdentity(), is("id1"));
1402 public void defaultFcpClientCanDisallowLocalAddressesOfPeer()
1403 throws InterruptedException, ExecutionException, IOException {
1404 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
1406 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1407 String identifier = extractIdentifier(lines);
1408 assertThat(lines, matchesFcpMessage(
1410 "Identifier=" + identifier,
1411 "NodeIdentifier=id1",
1412 "AllowLocalAddresses=false",
1415 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1416 fcpServer.writeLine(
1418 "Identifier=" + identifier,
1419 "NodeIdentifier=Friend1",
1423 assertThat(peer.get().get().getIdentity(), is("id1"));
1427 public void defaultFcpClientCanSetBurstOnlyForPeer()
1428 throws InterruptedException, ExecutionException, IOException {
1429 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
1431 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1432 String identifier = extractIdentifier(lines);
1433 assertThat(lines, matchesFcpMessage(
1435 "Identifier=" + identifier,
1436 "NodeIdentifier=id1",
1440 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1441 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1442 fcpServer.writeLine(
1444 "Identifier=" + identifier,
1445 "NodeIdentifier=Friend1",
1449 assertThat(peer.get().get().getIdentity(), is("id1"));
1453 public void defaultFcpClientCanClearBurstOnlyForPeer()
1454 throws InterruptedException, ExecutionException, IOException {
1455 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
1457 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1458 String identifier = extractIdentifier(lines);
1459 assertThat(lines, matchesFcpMessage(
1461 "Identifier=" + identifier,
1462 "NodeIdentifier=id1",
1463 "IsBurstOnly=false",
1466 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1467 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1468 fcpServer.writeLine(
1470 "Identifier=" + identifier,
1471 "NodeIdentifier=Friend1",
1475 assertThat(peer.get().get().getIdentity(), is("id1"));
1479 public void defaultFcpClientCanSetListenOnlyForPeer()
1480 throws InterruptedException, ExecutionException, IOException {
1481 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
1483 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1484 String identifier = extractIdentifier(lines);
1485 assertThat(lines, matchesFcpMessage(
1487 "Identifier=" + identifier,
1488 "NodeIdentifier=id1",
1489 "IsListenOnly=true",
1492 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1493 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1494 assertThat(lines, not(contains(startsWith("IsBurstOnly="))));
1495 fcpServer.writeLine(
1497 "Identifier=" + identifier,
1498 "NodeIdentifier=Friend1",
1502 assertThat(peer.get().get().getIdentity(), is("id1"));
1506 public void defaultFcpClientCanClearListenOnlyForPeer()
1507 throws InterruptedException, ExecutionException, IOException {
1508 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
1510 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1511 String identifier = extractIdentifier(lines);
1512 assertThat(lines, matchesFcpMessage(
1514 "Identifier=" + identifier,
1515 "NodeIdentifier=id1",
1516 "IsListenOnly=false",
1519 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1520 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1521 assertThat(lines, not(contains(startsWith("IsBurstOnly="))));
1522 fcpServer.writeLine(
1524 "Identifier=" + identifier,
1525 "NodeIdentifier=Friend1",
1529 assertThat(peer.get().get().getIdentity(), is("id1"));
1533 public void defaultFcpClientCanIgnoreSourceForPeer()
1534 throws InterruptedException, ExecutionException, IOException {
1535 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
1537 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1538 String identifier = extractIdentifier(lines);
1539 assertThat(lines, matchesFcpMessage(
1541 "Identifier=" + identifier,
1542 "NodeIdentifier=id1",
1543 "IgnoreSourcePort=true",
1546 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1547 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1548 assertThat(lines, not(contains(startsWith("IsBurstOnly="))));
1549 assertThat(lines, not(contains(startsWith("IsListenOnly="))));
1550 fcpServer.writeLine(
1552 "Identifier=" + identifier,
1553 "NodeIdentifier=Friend1",
1557 assertThat(peer.get().get().getIdentity(), is("id1"));
1561 public void defaultFcpClientCanUseSourceForPeer()
1562 throws InterruptedException, ExecutionException, IOException {
1563 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
1565 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1566 String identifier = extractIdentifier(lines);
1567 assertThat(lines, matchesFcpMessage(
1569 "Identifier=" + identifier,
1570 "NodeIdentifier=id1",
1571 "IgnoreSourcePort=false",
1574 assertThat(lines, not(contains(startsWith("AllowLocalAddresses="))));
1575 assertThat(lines, not(contains(startsWith("IsDisabled="))));
1576 assertThat(lines, not(contains(startsWith("IsBurstOnly="))));
1577 assertThat(lines, not(contains(startsWith("IsListenOnly="))));
1578 fcpServer.writeLine(
1580 "Identifier=" + identifier,
1581 "NodeIdentifier=Friend1",
1585 assertThat(peer.get().get().getIdentity(), is("id1"));
1589 public void defaultFcpClientCanRemovePeerByName() throws InterruptedException, ExecutionException, IOException {
1590 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
1592 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1593 String identifier = extractIdentifier(lines);
1594 assertThat(lines, matchesFcpMessage(
1596 "Identifier=" + identifier,
1597 "NodeIdentifier=Friend1",
1600 fcpServer.writeLine(
1602 "Identifier=" + identifier,
1603 "NodeIdentifier=Friend1",
1606 assertThat(peer.get(), is(true));
1610 public void defaultFcpClientCanNotRemovePeerByInvalidName()
1611 throws InterruptedException, ExecutionException, IOException {
1612 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
1614 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1615 String identifier = extractIdentifier(lines);
1616 assertThat(lines, matchesFcpMessage(
1618 "Identifier=" + identifier,
1619 "NodeIdentifier=NotFriend1",
1622 fcpServer.writeLine(
1623 "UnknownNodeIdentifier",
1624 "Identifier=" + identifier,
1627 assertThat(peer.get(), is(false));
1631 public void defaultFcpClientCanRemovePeerByIdentity() throws InterruptedException, ExecutionException, IOException {
1632 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
1634 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1635 String identifier = extractIdentifier(lines);
1636 assertThat(lines, matchesFcpMessage(
1638 "Identifier=" + identifier,
1639 "NodeIdentifier=id1",
1642 fcpServer.writeLine(
1644 "Identifier=" + identifier,
1645 "NodeIdentifier=Friend1",
1648 assertThat(peer.get(), is(true));
1652 public void defaultFcpClientCanRemovePeerByHostAndPort()
1653 throws InterruptedException, ExecutionException, IOException {
1654 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
1656 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1657 String identifier = extractIdentifier(lines);
1658 assertThat(lines, matchesFcpMessage(
1660 "Identifier=" + identifier,
1661 "NodeIdentifier=1.2.3.4:5678",
1664 fcpServer.writeLine(
1666 "Identifier=" + identifier,
1667 "NodeIdentifier=Friend1",
1670 assertThat(peer.get(), is(true));
1674 public void defaultFcpClientCanModifyPeerNoteByName()
1675 throws InterruptedException, ExecutionException, IOException {
1676 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1678 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1679 String identifier = extractIdentifier(lines);
1680 assertThat(lines, matchesFcpMessage(
1682 "Identifier=" + identifier,
1683 "NodeIdentifier=Friend1",
1688 fcpServer.writeLine(
1690 "Identifier=" + identifier,
1691 "NodeIdentifier=Friend1",
1696 assertThat(noteUpdated.get(), is(true));
1700 public void defaultFcpClientKnowsPeerNoteWasNotModifiedOnUnknownNodeIdentifier()
1701 throws InterruptedException, ExecutionException, IOException {
1702 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1704 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1705 String identifier = extractIdentifier(lines);
1706 assertThat(lines, matchesFcpMessage(
1708 "Identifier=" + identifier,
1709 "NodeIdentifier=Friend1",
1714 fcpServer.writeLine(
1715 "UnknownNodeIdentifier",
1716 "Identifier=" + identifier,
1717 "NodeIdentifier=Friend1",
1720 assertThat(noteUpdated.get(), is(false));
1724 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
1725 throws InterruptedException, ExecutionException, IOException {
1726 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
1727 assertThat(noteUpdated.get(), is(false));
1731 public void defaultFcpClientCanModifyPeerNoteByIdentifier()
1732 throws InterruptedException, ExecutionException, IOException {
1733 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
1735 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1736 String identifier = extractIdentifier(lines);
1737 assertThat(lines, matchesFcpMessage(
1739 "Identifier=" + identifier,
1740 "NodeIdentifier=id1",
1745 fcpServer.writeLine(
1747 "Identifier=" + identifier,
1748 "NodeIdentifier=id1",
1753 assertThat(noteUpdated.get(), is(true));
1757 public void defaultFcpClientCanModifyPeerNoteByHostAndPort()
1758 throws InterruptedException, ExecutionException, IOException {
1759 Future<Boolean> noteUpdated =
1760 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
1762 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1763 String identifier = extractIdentifier(lines);
1764 assertThat(lines, matchesFcpMessage(
1766 "Identifier=" + identifier,
1767 "NodeIdentifier=1.2.3.4:5678",
1772 fcpServer.writeLine(
1774 "Identifier=" + identifier,
1775 "NodeIdentifier=1.2.3.4:5678",
1780 assertThat(noteUpdated.get(), is(true));
1784 public void defaultFcpClientCanGetConfigWithoutDetails()
1785 throws InterruptedException, ExecutionException, IOException {
1786 Future<ConfigData> configData = fcpClient.getConfig().execute();
1788 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1789 String identifier = extractIdentifier(lines);
1790 assertThat(lines, matchesFcpMessage(
1792 "Identifier=" + identifier,
1795 fcpServer.writeLine(
1797 "Identifier=" + identifier,
1800 assertThat(configData.get(), notNullValue());
1804 public void defaultFcpClientCanGetConfigWithCurrent()
1805 throws InterruptedException, ExecutionException, IOException {
1806 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1808 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1809 String identifier = extractIdentifier(lines);
1810 assertThat(lines, matchesFcpMessage(
1812 "Identifier=" + identifier,
1816 fcpServer.writeLine(
1818 "Identifier=" + identifier,
1822 assertThat(configData.get().getCurrent("foo"), is("bar"));
1826 public void defaultFcpClientCanGetConfigWithDefaults()
1827 throws InterruptedException, ExecutionException, IOException {
1828 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1830 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1831 String identifier = extractIdentifier(lines);
1832 assertThat(lines, matchesFcpMessage(
1834 "Identifier=" + identifier,
1835 "WithDefaults=true",
1838 fcpServer.writeLine(
1840 "Identifier=" + identifier,
1844 assertThat(configData.get().getDefault("foo"), is("bar"));
1848 public void defaultFcpClientCanGetConfigWithSortOrder()
1849 throws InterruptedException, ExecutionException, IOException {
1850 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1852 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1853 String identifier = extractIdentifier(lines);
1854 assertThat(lines, matchesFcpMessage(
1856 "Identifier=" + identifier,
1857 "WithSortOrder=true",
1860 fcpServer.writeLine(
1862 "Identifier=" + identifier,
1866 assertThat(configData.get().getSortOrder("foo"), is(17));
1870 public void defaultFcpClientCanGetConfigWithExpertFlag()
1871 throws InterruptedException, ExecutionException, IOException {
1872 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1874 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1875 String identifier = extractIdentifier(lines);
1876 assertThat(lines, matchesFcpMessage(
1878 "Identifier=" + identifier,
1879 "WithExpertFlag=true",
1882 fcpServer.writeLine(
1884 "Identifier=" + identifier,
1885 "expertFlag.foo=true",
1888 assertThat(configData.get().getExpertFlag("foo"), is(true));
1892 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1893 throws InterruptedException, ExecutionException, IOException {
1894 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1896 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1897 String identifier = extractIdentifier(lines);
1898 assertThat(lines, matchesFcpMessage(
1900 "Identifier=" + identifier,
1901 "WithForceWriteFlag=true",
1904 fcpServer.writeLine(
1906 "Identifier=" + identifier,
1907 "forceWriteFlag.foo=true",
1910 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1914 public void defaultFcpClientCanGetConfigWithShortDescription()
1915 throws InterruptedException, ExecutionException, IOException {
1916 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1918 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1919 String identifier = extractIdentifier(lines);
1920 assertThat(lines, matchesFcpMessage(
1922 "Identifier=" + identifier,
1923 "WithShortDescription=true",
1926 fcpServer.writeLine(
1928 "Identifier=" + identifier,
1929 "shortDescription.foo=bar",
1932 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1936 public void defaultFcpClientCanGetConfigWithLongDescription()
1937 throws InterruptedException, ExecutionException, IOException {
1938 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1940 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1941 String identifier = extractIdentifier(lines);
1942 assertThat(lines, matchesFcpMessage(
1944 "Identifier=" + identifier,
1945 "WithLongDescription=true",
1948 fcpServer.writeLine(
1950 "Identifier=" + identifier,
1951 "longDescription.foo=bar",
1954 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1958 public void defaultFcpClientCanGetConfigWithDataTypes()
1959 throws InterruptedException, ExecutionException, IOException {
1960 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1962 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1963 String identifier = extractIdentifier(lines);
1964 assertThat(lines, matchesFcpMessage(
1966 "Identifier=" + identifier,
1967 "WithDataTypes=true",
1970 fcpServer.writeLine(
1972 "Identifier=" + identifier,
1973 "dataType.foo=number",
1976 assertThat(configData.get().getDataType("foo"), is("number"));
1980 public void defaultFcpClientCanModifyConfigData() throws InterruptedException, ExecutionException, IOException {
1981 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1983 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1984 String identifier = extractIdentifier(lines);
1985 assertThat(lines, matchesFcpMessage(
1987 "Identifier=" + identifier,
1991 fcpServer.writeLine(
1993 "Identifier=" + identifier,
1994 "current.foo.bar=baz",
1997 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
2000 private List<String> lines;
2001 private String identifier;
2003 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
2004 throws InterruptedException, ExecutionException, IOException {
2006 readMessage(requestMatcher);
2009 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
2010 lines = fcpServer.collectUntil(is("EndMessage"));
2011 identifier = extractIdentifier(lines);
2012 assertThat(lines, requestMatcher.get());
2015 public class PluginCommands {
2017 private static final String CLASS_NAME = "foo.plugin.Plugin";
2019 private void replyWithPluginInfo() throws IOException {
2020 fcpServer.writeLine(
2022 "Identifier=" + identifier,
2023 "PluginName=superPlugin",
2025 "LongVersion=1.2.3",
2027 "OriginUri=superPlugin",
2033 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
2034 throws InterruptedException, ExecutionException {
2035 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
2036 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
2037 assertThat(pluginInfo.get().get().isTalkable(), is(true));
2038 assertThat(pluginInfo.get().get().getVersion(), is("42"));
2039 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
2040 assertThat(pluginInfo.get().get().isStarted(), is(true));
2043 public class LoadPlugin {
2045 public class OfficialPlugins {
2048 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
2049 Future<Optional<PluginInfo>> pluginInfo =
2050 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
2051 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
2052 assertThat(lines, not(contains(startsWith("Store="))));
2053 replyWithPluginInfo();
2054 verifyPluginInfo(pluginInfo);
2058 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
2059 Future<Optional<PluginInfo>> pluginInfo =
2060 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
2061 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
2062 assertThat(lines, hasItem("Store=true"));
2063 replyWithPluginInfo();
2064 verifyPluginInfo(pluginInfo);
2068 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
2069 Future<Optional<PluginInfo>> pluginInfo =
2070 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
2071 connectAndAssert(() -> createMatcherForOfficialSource("https"));
2072 replyWithPluginInfo();
2073 verifyPluginInfo(pluginInfo);
2076 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
2077 return matchesFcpMessage(
2079 "Identifier=" + identifier,
2080 "PluginURL=superPlugin",
2082 "OfficialSource=" + officialSource,
2089 public class FromOtherSources {
2091 private static final String FILE_PATH = "/path/to/plugin.jar";
2092 private static final String URL = "http://server.com/plugin.jar";
2093 private static final String KEY = "KSK@plugin.jar";
2096 public void fromFile() throws ExecutionException, InterruptedException, IOException {
2097 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
2098 connectAndAssert(() -> createMatcher("file", FILE_PATH));
2099 replyWithPluginInfo();
2100 verifyPluginInfo(pluginInfo);
2104 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
2105 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
2106 connectAndAssert(() -> createMatcher("url", URL));
2107 replyWithPluginInfo();
2108 verifyPluginInfo(pluginInfo);
2112 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
2113 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
2114 connectAndAssert(() -> createMatcher("freenet", KEY));
2115 replyWithPluginInfo();
2116 verifyPluginInfo(pluginInfo);
2119 private Matcher<List<String>> createMatcher(String urlType, String url) {
2120 return matchesFcpMessage(
2122 "Identifier=" + identifier,
2124 "URLType=" + urlType,
2131 public class Failed {
2134 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
2135 Future<Optional<PluginInfo>> pluginInfo =
2136 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
2137 connectAndAssert(() -> matchesFcpMessage("LoadPlugin", "EndMessage"));
2138 replyWithProtocolError();
2139 assertThat(pluginInfo.get().isPresent(), is(false));
2146 private void replyWithProtocolError() throws IOException {
2147 fcpServer.writeLine(
2149 "Identifier=" + identifier,
2154 public class ReloadPlugin {
2157 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
2158 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
2159 connectAndAssert(() -> matchReloadPluginMessage());
2160 replyWithPluginInfo();
2161 verifyPluginInfo(pluginInfo);
2165 public void reloadingPluginWithMaxWaitTimeWorks()
2166 throws InterruptedException, ExecutionException, IOException {
2167 Future<Optional<PluginInfo>> pluginInfo =
2168 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
2169 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
2170 replyWithPluginInfo();
2171 verifyPluginInfo(pluginInfo);
2175 public void reloadingPluginWithPurgeWorks()
2176 throws InterruptedException, ExecutionException, IOException {
2177 Future<Optional<PluginInfo>> pluginInfo =
2178 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
2179 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
2180 replyWithPluginInfo();
2181 verifyPluginInfo(pluginInfo);
2185 public void reloadingPluginWithStoreWorks()
2186 throws InterruptedException, ExecutionException, IOException {
2187 Future<Optional<PluginInfo>> pluginInfo =
2188 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
2189 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
2190 replyWithPluginInfo();
2191 verifyPluginInfo(pluginInfo);
2194 private Matcher<List<String>> matchReloadPluginMessage() {
2195 return matchesFcpMessage(
2197 "Identifier=" + identifier,
2198 "PluginName=" + CLASS_NAME,
2205 public class RemovePlugin {
2208 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
2209 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
2210 connectAndAssert(() -> matchPluginRemovedMessage());
2211 replyWithPluginRemoved();
2212 assertThat(pluginRemoved.get(), is(true));
2216 public void removingPluginWithMaxWaitTimeWorks()
2217 throws InterruptedException, ExecutionException, IOException {
2218 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
2219 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
2220 replyWithPluginRemoved();
2221 assertThat(pluginRemoved.get(), is(true));
2225 public void removingPluginWithPurgeWorks()
2226 throws InterruptedException, ExecutionException, IOException {
2227 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
2228 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
2229 replyWithPluginRemoved();
2230 assertThat(pluginRemoved.get(), is(true));
2233 private void replyWithPluginRemoved() throws IOException {
2234 fcpServer.writeLine(
2236 "Identifier=" + identifier,
2237 "PluginName=" + CLASS_NAME,
2242 private Matcher<List<String>> matchPluginRemovedMessage() {
2243 return matchesFcpMessage(
2245 "Identifier=" + identifier,
2246 "PluginName=" + CLASS_NAME,
2253 public class GetPluginInfo {
2256 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
2257 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
2258 connectAndAssert(() -> matchGetPluginInfoMessage());
2259 replyWithPluginInfo();
2260 verifyPluginInfo(pluginInfo);
2264 public void gettingPluginInfoWithDetailsWorks()
2265 throws InterruptedException, ExecutionException, IOException {
2266 Future<Optional<PluginInfo>> pluginInfo =
2267 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
2268 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
2269 replyWithPluginInfo();
2270 verifyPluginInfo(pluginInfo);
2274 public void protocolErrorIsRecognizedAsFailure()
2275 throws InterruptedException, ExecutionException, IOException {
2276 Future<Optional<PluginInfo>> pluginInfo =
2277 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
2278 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
2279 replyWithProtocolError();
2280 assertThat(pluginInfo.get(), is(Optional.empty()));
2283 private Matcher<List<String>> matchGetPluginInfoMessage() {
2284 return matchesFcpMessage(
2286 "Identifier=" + identifier,
2287 "PluginName=" + CLASS_NAME,
2296 public class UskSubscriptionCommands {
2298 private static final String URI = "USK@some,uri/file.txt";
2301 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
2302 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
2303 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
2304 replyWithSubscribed();
2305 assertThat(uskSubscription.get().get().getUri(), is(URI));
2306 AtomicInteger edition = new AtomicInteger();
2307 CountDownLatch updated = new CountDownLatch(2);
2308 uskSubscription.get().get().onUpdate(e -> {
2310 updated.countDown();
2312 sendUpdateNotification(23);
2313 sendUpdateNotification(24);
2314 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
2315 assertThat(edition.get(), is(24));
2319 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
2320 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
2321 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
2322 replyWithSubscribed();
2323 assertThat(uskSubscription.get().get().getUri(), is(URI));
2324 AtomicInteger edition = new AtomicInteger();
2325 CountDownLatch updated = new CountDownLatch(2);
2326 uskSubscription.get().get().onUpdate(e -> {
2328 updated.countDown();
2330 uskSubscription.get().get().onUpdate(e -> updated.countDown());
2331 sendUpdateNotification(23);
2332 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
2333 assertThat(edition.get(), is(23));
2337 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
2338 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
2339 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
2340 replyWithSubscribed();
2341 assertThat(uskSubscription.get().get().getUri(), is(URI));
2342 AtomicBoolean updated = new AtomicBoolean();
2343 uskSubscription.get().get().onUpdate(e -> updated.set(true));
2344 uskSubscription.get().get().cancel();
2345 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage"));
2346 sendUpdateNotification(23);
2347 assertThat(updated.get(), is(false));
2350 private void replyWithSubscribed() throws IOException {
2351 fcpServer.writeLine(
2353 "Identifier=" + identifier,
2360 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
2361 fcpServer.writeLine(
2362 "SubscribedUSKUpdate",
2363 "Identifier=" + identifier,
2365 "Edition=" + edition
2367 fcpServer.writeLine(additionalLines);
2368 fcpServer.writeLine("EndMessage");