1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
14 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
62 * Unit test for {@link DefaultFcpClient}.
64 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
69 private static final String INSERT_URI =
70 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71 private static final String REQUEST_URI =
72 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
74 private int threadCounter = 0;
75 private final ExecutorService threadPool =
76 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77 private final FakeTcpServer fcpServer;
78 private final DefaultFcpClient fcpClient;
80 public DefaultFcpClientTest() throws IOException {
81 fcpServer = new FakeTcpServer(threadPool);
82 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
86 public void tearDown() throws IOException {
88 threadPool.shutdown();
91 private void connectNode() throws InterruptedException, ExecutionException, IOException {
92 fcpServer.connect().get();
93 fcpServer.collectUntil(is("EndMessage"));
94 fcpServer.writeLine("NodeHello",
95 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
96 "Revision=build01466",
98 "Version=Fred,0.7,1.0,1466",
100 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
104 "NodeLanguage=ENGLISH",
110 private String extractIdentifier(List<String> lines) {
111 return lines.stream()
112 .filter(s -> s.startsWith("Identifier="))
113 .map(s -> s.substring(s.indexOf('=') + 1))
118 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
119 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
122 private Matcher<Iterable<String>> hasHead(String firstElement) {
123 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
125 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
126 if (!iterable.iterator().hasNext()) {
127 mismatchDescription.appendText("is empty");
130 String element = iterable.iterator().next();
131 if (!element.equals(firstElement)) {
132 mismatchDescription.appendText("starts with ").appendValue(element);
139 public void describeTo(Description description) {
140 description.appendText("starts with ").appendValue(firstElement);
145 private Matcher<List<String>> matchesFcpMessageWithTerminator(
146 String name, String terminator, String... requiredLines) {
147 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
150 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
151 return new TypeSafeDiagnosingMatcher<List<String>>() {
153 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
154 if (item.size() < (ignoreStart + ignoreEnd)) {
155 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
158 for (String line : lines) {
159 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
160 mismatchDescription.appendText("does not contains ").appendValue(line);
168 public void describeTo(Description description) {
169 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
170 description.appendText(", ignoring the first ").appendValue(ignoreStart);
171 description.appendText(" and the last ").appendValue(ignoreEnd);
176 private Matcher<List<String>> hasTail(String... lastElements) {
177 return new TypeSafeDiagnosingMatcher<List<String>>() {
179 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
180 if (list.size() < lastElements.length) {
181 mismatchDescription.appendText("is too small");
184 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
185 if (!tail.equals(Arrays.asList(lastElements))) {
186 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
193 public void describeTo(Description description) {
194 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
199 private List<String> lines;
200 private String identifier;
202 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
203 throws InterruptedException, ExecutionException, IOException {
205 readMessage(requestMatcher);
208 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
209 readMessage("EndMessage", requestMatcher);
212 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
213 lines = fcpServer.collectUntil(is(terminator));
214 identifier = extractIdentifier(lines);
215 assertThat(lines, requestMatcher.get());
218 private void replyWithProtocolError() throws IOException {
221 "Identifier=" + identifier,
226 public class ConnectionsAndKeyPairs {
228 public class Connections {
230 @Test(expected = ExecutionException.class)
231 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
232 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
233 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
235 "CloseConnectionDuplicateClientName",
241 @Test(expected = ExecutionException.class)
242 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
243 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
244 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
250 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
251 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
252 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
255 keyPair = fcpClient.generateKeypair().execute();
256 readMessage(() -> matchesFcpMessage("GenerateSSK"));
257 identifier = extractIdentifier(lines);
263 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
264 throws InterruptedException, ExecutionException, IOException {
265 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
266 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
271 } catch (ExecutionException e) {
274 keyPair = fcpClient.generateKeypair().execute();
275 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
282 public class GenerateKeyPair {
285 public void defaultFcpClientCanGenerateKeypair()
286 throws ExecutionException, InterruptedException, IOException {
287 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
288 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
290 FcpKeyPair keyPair = keyPairFuture.get();
291 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
292 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
297 private void replyWithKeyPair() throws IOException {
298 fcpServer.writeLine("SSKKeypair",
299 "InsertURI=" + INSERT_URI + "",
300 "RequestURI=" + REQUEST_URI + "",
301 "Identifier=" + identifier,
309 public class PeerCommands {
311 public class ListPeer {
314 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
315 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
316 connectAndAssert(() -> matchesListPeer("id1"));
317 replyWithPeer("id1");
318 assertThat(peer.get().get().getIdentity(), is("id1"));
322 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
323 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
324 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
325 replyWithPeer("id1");
326 assertThat(peer.get().get().getIdentity(), is("id1"));
330 public void byName() throws InterruptedException, ExecutionException, IOException {
331 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
332 connectAndAssert(() -> matchesListPeer("FriendNode"));
333 replyWithPeer("id1");
334 assertThat(peer.get().get().getIdentity(), is("id1"));
338 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
339 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
340 connectAndAssert(() -> matchesListPeer("id2"));
341 replyWithUnknownNodeIdentifier();
342 assertThat(peer.get().isPresent(), is(false));
345 private Matcher<List<String>> matchesListPeer(String nodeId) {
346 return matchesFcpMessage(
348 "Identifier=" + identifier,
349 "NodeIdentifier=" + nodeId
355 public class ListPeers {
358 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
359 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
360 connectAndAssert(() -> matchesListPeers(false, false));
361 replyWithPeer("id1");
362 replyWithPeer("id2");
364 assertThat(peers.get(), hasSize(2));
365 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
366 containsInAnyOrder("id1", "id2"));
370 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
371 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
372 connectAndAssert(() -> matchesListPeers(false, true));
373 replyWithPeer("id1", "metadata.foo=bar1");
374 replyWithPeer("id2", "metadata.foo=bar2");
376 assertThat(peers.get(), hasSize(2));
377 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
378 containsInAnyOrder("bar1", "bar2"));
382 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
383 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
384 connectAndAssert(() -> matchesListPeers(true, false));
385 replyWithPeer("id1", "volatile.foo=bar1");
386 replyWithPeer("id2", "volatile.foo=bar2");
388 assertThat(peers.get(), hasSize(2));
389 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
390 containsInAnyOrder("bar1", "bar2"));
393 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
394 return matchesFcpMessage(
396 "WithVolatile=" + withVolatile,
397 "WithMetadata=" + withMetadata
401 private void sendEndOfPeerList() throws IOException {
404 "Identifier=" + identifier,
411 public class AddPeer {
414 public void fromFile() throws InterruptedException, ExecutionException, IOException {
415 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
416 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
417 replyWithPeer("id1");
418 assertThat(peer.get().get().getIdentity(), is("id1"));
422 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
423 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
424 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
425 replyWithPeer("id1");
426 assertThat(peer.get().get().getIdentity(), is("id1"));
430 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
431 NodeRef nodeRef = createNodeRef();
432 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
433 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
439 "dsaGroup.q=subprime",
440 "dsaPubKey.y=dsa-public",
441 "physical.udp=1.2.3.4:5678",
445 replyWithPeer("id1");
446 assertThat(peer.get().get().getIdentity(), is("id1"));
450 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
451 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
452 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
453 replyWithProtocolError();
454 assertThat(peer.get().isPresent(), is(false));
457 private NodeRef createNodeRef() {
458 NodeRef nodeRef = new NodeRef();
459 nodeRef.setIdentity("id1");
460 nodeRef.setName("name");
461 nodeRef.setARK(new ARK("public", "1"));
462 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
463 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
464 nodeRef.setPhysicalUDP("1.2.3.4:5678");
465 nodeRef.setDSAPublicKey("dsa-public");
466 nodeRef.setSignature("sig");
470 private Matcher<List<String>> matchesAddPeer() {
471 return matchesFcpMessage(
473 "Identifier=" + identifier
479 public class ModifyPeer {
482 public void defaultFcpClientCanEnablePeerByName()
483 throws InterruptedException, ExecutionException, IOException {
484 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
485 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
486 replyWithPeer("id1");
487 assertThat(peer.get().get().getIdentity(), is("id1"));
491 public void defaultFcpClientCanDisablePeerByName()
492 throws InterruptedException, ExecutionException, IOException {
493 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
494 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
495 replyWithPeer("id1");
496 assertThat(peer.get().get().getIdentity(), is("id1"));
500 public void defaultFcpClientCanEnablePeerByIdentity()
501 throws InterruptedException, ExecutionException, IOException {
502 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
503 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
504 replyWithPeer("id1");
505 assertThat(peer.get().get().getIdentity(), is("id1"));
509 public void defaultFcpClientCanEnablePeerByHostAndPort()
510 throws InterruptedException, ExecutionException, IOException {
511 Future<Optional<Peer>> peer =
512 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
513 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
514 replyWithPeer("id1");
515 assertThat(peer.get().get().getIdentity(), is("id1"));
519 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
520 Future<Optional<Peer>> peer =
521 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
522 connectAndAssert(() -> allOf(
523 matchesModifyPeer("id1", "AllowLocalAddresses", true),
524 not(contains(startsWith("IsDisabled=")))
526 replyWithPeer("id1");
527 assertThat(peer.get().get().getIdentity(), is("id1"));
531 public void disallowLocalAddressesOfPeer()
532 throws InterruptedException, ExecutionException, IOException {
533 Future<Optional<Peer>> peer =
534 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
535 connectAndAssert(() -> allOf(
536 matchesModifyPeer("id1", "AllowLocalAddresses", false),
537 not(contains(startsWith("IsDisabled=")))
539 replyWithPeer("id1");
540 assertThat(peer.get().get().getIdentity(), is("id1"));
544 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
545 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
546 connectAndAssert(() -> allOf(
547 matchesModifyPeer("id1", "IsBurstOnly", true),
548 not(contains(startsWith("AllowLocalAddresses="))),
549 not(contains(startsWith("IsDisabled=")))
551 replyWithPeer("id1");
552 assertThat(peer.get().get().getIdentity(), is("id1"));
556 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
557 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
558 connectAndAssert(() -> allOf(
559 matchesModifyPeer("id1", "IsBurstOnly", false),
560 not(contains(startsWith("AllowLocalAddresses="))),
561 not(contains(startsWith("IsDisabled=")))
563 replyWithPeer("id1");
564 assertThat(peer.get().get().getIdentity(), is("id1"));
568 public void defaultFcpClientCanSetListenOnlyForPeer()
569 throws InterruptedException, ExecutionException, IOException {
570 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
571 connectAndAssert(() -> allOf(
572 matchesModifyPeer("id1", "IsListenOnly", true),
573 not(contains(startsWith("AllowLocalAddresses="))),
574 not(contains(startsWith("IsDisabled="))),
575 not(contains(startsWith("IsBurstOnly=")))
577 replyWithPeer("id1");
578 assertThat(peer.get().get().getIdentity(), is("id1"));
582 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
583 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
584 connectAndAssert(() -> allOf(
585 matchesModifyPeer("id1", "IsListenOnly", false),
586 not(contains(startsWith("AllowLocalAddresses="))),
587 not(contains(startsWith("IsDisabled="))),
588 not(contains(startsWith("IsBurstOnly=")))
590 replyWithPeer("id1");
591 assertThat(peer.get().get().getIdentity(), is("id1"));
595 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
596 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
597 connectAndAssert(() -> allOf(
598 matchesModifyPeer("id1", "IgnoreSourcePort", true),
599 not(contains(startsWith("AllowLocalAddresses="))),
600 not(contains(startsWith("IsDisabled="))),
601 not(contains(startsWith("IsBurstOnly="))),
602 not(contains(startsWith("IsListenOnly=")))
604 replyWithPeer("id1");
605 assertThat(peer.get().get().getIdentity(), is("id1"));
609 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
610 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
611 connectAndAssert(() -> allOf(
612 matchesModifyPeer("id1", "IgnoreSourcePort", false),
613 not(contains(startsWith("AllowLocalAddresses="))),
614 not(contains(startsWith("IsDisabled="))),
615 not(contains(startsWith("IsBurstOnly="))),
616 not(contains(startsWith("IsListenOnly=")))
618 replyWithPeer("id1");
619 assertThat(peer.get().get().getIdentity(), is("id1"));
623 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
624 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
625 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
626 replyWithUnknownNodeIdentifier();
627 assertThat(peer.get().isPresent(), is(false));
630 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
631 return matchesFcpMessage(
633 "Identifier=" + identifier,
634 "NodeIdentifier=" + nodeIdentifier,
635 setting + "=" + value
641 public class RemovePeer {
644 public void byName() throws InterruptedException, ExecutionException, IOException {
645 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
646 connectAndAssert(() -> matchesRemovePeer("Friend1"));
647 replyWithPeerRemoved("Friend1");
648 assertThat(peer.get(), is(true));
652 public void invalidName() throws InterruptedException, ExecutionException, IOException {
653 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
654 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
655 replyWithUnknownNodeIdentifier();
656 assertThat(peer.get(), is(false));
660 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
661 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
662 connectAndAssert(() -> matchesRemovePeer("id1"));
663 replyWithPeerRemoved("id1");
664 assertThat(peer.get(), is(true));
668 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
669 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
670 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
671 replyWithPeerRemoved("Friend1");
672 assertThat(peer.get(), is(true));
675 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
676 return matchesFcpMessage(
678 "Identifier=" + identifier,
679 "NodeIdentifier=" + nodeIdentifier
683 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
686 "Identifier=" + identifier,
687 "NodeIdentifier=" + nodeIdentifier,
694 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
697 "Identifier=" + identifier,
698 "identity=" + peerId,
700 "ark.pubURI=SSK@3YEf.../ark",
703 "version=Fred,0.7,1.0,1466",
704 "lastGoodVersion=Fred,0.7,1.0,1466"
706 fcpServer.writeLine(additionalLines);
707 fcpServer.writeLine("EndMessage");
712 public class PeerNoteCommands {
714 public class ListPeerNotes {
717 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
718 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
719 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
720 replyWithUnknownNodeIdentifier();
721 assertThat(peerNote.get().isPresent(), is(false));
725 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
726 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
727 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
729 replyWithEndListPeerNotes();
730 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
731 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
735 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
736 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
737 connectAndAssert(() -> matchesListPeerNotes("id1"));
739 replyWithEndListPeerNotes();
740 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
741 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
745 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
746 Future<Optional<PeerNote>> peerNote =
747 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
748 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
750 replyWithEndListPeerNotes();
751 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
752 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
755 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
756 return matchesFcpMessage(
758 "NodeIdentifier=" + nodeIdentifier
762 private void replyWithEndListPeerNotes() throws IOException {
765 "Identifier=" + identifier,
770 private void replyWithPeerNote() throws IOException {
773 "Identifier=" + identifier,
774 "NodeIdentifier=Friend1",
775 "NoteText=RXhhbXBsZSBUZXh0Lg==",
783 public class ModifyPeerNotes {
786 public void byName() throws InterruptedException, ExecutionException, IOException {
787 Future<Boolean> noteUpdated =
788 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
789 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
791 assertThat(noteUpdated.get(), is(true));
795 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
796 Future<Boolean> noteUpdated =
797 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
798 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
799 replyWithUnknownNodeIdentifier();
800 assertThat(noteUpdated.get(), is(false));
804 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
805 throws InterruptedException, ExecutionException, IOException {
806 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
807 assertThat(noteUpdated.get(), is(false));
811 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
812 Future<Boolean> noteUpdated =
813 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
814 connectAndAssert(() -> matchesModifyPeerNote("id1"));
816 assertThat(noteUpdated.get(), is(true));
820 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
821 Future<Boolean> noteUpdated =
822 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
823 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
825 assertThat(noteUpdated.get(), is(true));
828 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
829 return matchesFcpMessage(
831 "Identifier=" + identifier,
832 "NodeIdentifier=" + nodeIdentifier,
838 private void replyWithPeerNote() throws IOException {
841 "Identifier=" + identifier,
842 "NodeIdentifier=Friend1",
853 private void replyWithUnknownNodeIdentifier() throws IOException {
855 "UnknownNodeIdentifier",
856 "Identifier=" + identifier,
857 "NodeIdentifier=id2",
864 public class PluginCommands {
866 private static final String CLASS_NAME = "foo.plugin.Plugin";
868 private void replyWithPluginInfo() throws IOException {
871 "Identifier=" + identifier,
872 "PluginName=superPlugin",
876 "OriginUri=superPlugin",
882 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
883 throws InterruptedException, ExecutionException {
884 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
885 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
886 assertThat(pluginInfo.get().get().isTalkable(), is(true));
887 assertThat(pluginInfo.get().get().getVersion(), is("42"));
888 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
889 assertThat(pluginInfo.get().get().isStarted(), is(true));
892 public class LoadPlugin {
894 public class OfficialPlugins {
897 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
898 Future<Optional<PluginInfo>> pluginInfo =
899 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
900 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
901 assertThat(lines, not(contains(startsWith("Store="))));
902 replyWithPluginInfo();
903 verifyPluginInfo(pluginInfo);
907 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
908 Future<Optional<PluginInfo>> pluginInfo =
909 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
910 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
911 assertThat(lines, hasItem("Store=true"));
912 replyWithPluginInfo();
913 verifyPluginInfo(pluginInfo);
917 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
918 Future<Optional<PluginInfo>> pluginInfo =
919 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
920 connectAndAssert(() -> createMatcherForOfficialSource("https"));
921 replyWithPluginInfo();
922 verifyPluginInfo(pluginInfo);
925 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
926 return matchesFcpMessage(
928 "Identifier=" + identifier,
929 "PluginURL=superPlugin",
931 "OfficialSource=" + officialSource
937 public class FromOtherSources {
939 private static final String FILE_PATH = "/path/to/plugin.jar";
940 private static final String URL = "http://server.com/plugin.jar";
941 private static final String KEY = "KSK@plugin.jar";
944 public void fromFile() throws ExecutionException, InterruptedException, IOException {
945 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
946 connectAndAssert(() -> createMatcher("file", FILE_PATH));
947 replyWithPluginInfo();
948 verifyPluginInfo(pluginInfo);
952 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
953 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
954 connectAndAssert(() -> createMatcher("url", URL));
955 replyWithPluginInfo();
956 verifyPluginInfo(pluginInfo);
960 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
961 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
962 connectAndAssert(() -> createMatcher("freenet", KEY));
963 replyWithPluginInfo();
964 verifyPluginInfo(pluginInfo);
967 private Matcher<List<String>> createMatcher(String urlType, String url) {
968 return matchesFcpMessage(
970 "Identifier=" + identifier,
978 public class Failed {
981 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
982 Future<Optional<PluginInfo>> pluginInfo =
983 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
984 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
985 replyWithProtocolError();
986 assertThat(pluginInfo.get().isPresent(), is(false));
993 public class ReloadPlugin {
996 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
997 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
998 connectAndAssert(this::matchReloadPluginMessage);
999 replyWithPluginInfo();
1000 verifyPluginInfo(pluginInfo);
1004 public void reloadingPluginWithMaxWaitTimeWorks()
1005 throws InterruptedException, ExecutionException, IOException {
1006 Future<Optional<PluginInfo>> pluginInfo =
1007 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1008 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1009 replyWithPluginInfo();
1010 verifyPluginInfo(pluginInfo);
1014 public void reloadingPluginWithPurgeWorks()
1015 throws InterruptedException, ExecutionException, IOException {
1016 Future<Optional<PluginInfo>> pluginInfo =
1017 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1018 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1019 replyWithPluginInfo();
1020 verifyPluginInfo(pluginInfo);
1024 public void reloadingPluginWithStoreWorks()
1025 throws InterruptedException, ExecutionException, IOException {
1026 Future<Optional<PluginInfo>> pluginInfo =
1027 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1028 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1029 replyWithPluginInfo();
1030 verifyPluginInfo(pluginInfo);
1034 public void protocolErrorIsRecognizedAsFailure()
1035 throws InterruptedException, ExecutionException, IOException {
1036 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1037 connectAndAssert(() -> matchReloadPluginMessage());
1038 replyWithProtocolError();
1039 assertThat(pluginInfo.get().isPresent(), is(false));
1042 private Matcher<List<String>> matchReloadPluginMessage() {
1043 return matchesFcpMessage(
1045 "Identifier=" + identifier,
1046 "PluginName=" + CLASS_NAME
1052 public class RemovePlugin {
1055 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1056 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1057 connectAndAssert(this::matchPluginRemovedMessage);
1058 replyWithPluginRemoved();
1059 assertThat(pluginRemoved.get(), is(true));
1063 public void removingPluginWithMaxWaitTimeWorks()
1064 throws InterruptedException, ExecutionException, IOException {
1065 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1066 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1067 replyWithPluginRemoved();
1068 assertThat(pluginRemoved.get(), is(true));
1072 public void removingPluginWithPurgeWorks()
1073 throws InterruptedException, ExecutionException, IOException {
1074 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1075 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1076 replyWithPluginRemoved();
1077 assertThat(pluginRemoved.get(), is(true));
1080 private void replyWithPluginRemoved() throws IOException {
1081 fcpServer.writeLine(
1083 "Identifier=" + identifier,
1084 "PluginName=" + CLASS_NAME,
1089 private Matcher<List<String>> matchPluginRemovedMessage() {
1090 return matchesFcpMessage(
1092 "Identifier=" + identifier,
1093 "PluginName=" + CLASS_NAME
1099 public class GetPluginInfo {
1102 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1103 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1104 connectAndAssert(this::matchGetPluginInfoMessage);
1105 replyWithPluginInfo();
1106 verifyPluginInfo(pluginInfo);
1110 public void gettingPluginInfoWithDetailsWorks()
1111 throws InterruptedException, ExecutionException, IOException {
1112 Future<Optional<PluginInfo>> pluginInfo =
1113 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1114 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1115 replyWithPluginInfo();
1116 verifyPluginInfo(pluginInfo);
1120 public void protocolErrorIsRecognizedAsFailure()
1121 throws InterruptedException, ExecutionException, IOException {
1122 Future<Optional<PluginInfo>> pluginInfo =
1123 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1124 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1125 replyWithProtocolError();
1126 assertThat(pluginInfo.get(), is(Optional.empty()));
1129 private Matcher<List<String>> matchGetPluginInfoMessage() {
1130 return matchesFcpMessage(
1132 "Identifier=" + identifier,
1133 "PluginName=" + CLASS_NAME
1141 public class UskSubscriptionCommands {
1143 private static final String URI = "USK@some,uri/file.txt";
1146 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1147 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1148 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1149 replyWithSubscribed();
1150 assertThat(uskSubscription.get().get().getUri(), is(URI));
1151 AtomicInteger edition = new AtomicInteger();
1152 CountDownLatch updated = new CountDownLatch(2);
1153 uskSubscription.get().get().onUpdate(e -> {
1155 updated.countDown();
1157 sendUpdateNotification(23);
1158 sendUpdateNotification(24);
1159 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1160 assertThat(edition.get(), is(24));
1164 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1165 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1166 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1167 replyWithSubscribed();
1168 assertThat(uskSubscription.get().get().getUri(), is(URI));
1169 AtomicInteger edition = new AtomicInteger();
1170 CountDownLatch updated = new CountDownLatch(2);
1171 uskSubscription.get().get().onUpdate(e -> {
1173 updated.countDown();
1175 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1176 sendUpdateNotification(23);
1177 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1178 assertThat(edition.get(), is(23));
1182 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1183 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1184 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1185 replyWithSubscribed();
1186 assertThat(uskSubscription.get().get().getUri(), is(URI));
1187 AtomicBoolean updated = new AtomicBoolean();
1188 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1189 uskSubscription.get().get().cancel();
1190 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1191 sendUpdateNotification(23);
1192 assertThat(updated.get(), is(false));
1195 private void replyWithSubscribed() throws IOException {
1196 fcpServer.writeLine(
1198 "Identifier=" + identifier,
1205 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1206 fcpServer.writeLine(
1207 "SubscribedUSKUpdate",
1208 "Identifier=" + identifier,
1210 "Edition=" + edition
1212 fcpServer.writeLine(additionalLines);
1213 fcpServer.writeLine("EndMessage");
1218 public class ClientGet {
1221 public void works() throws InterruptedException, ExecutionException, IOException {
1222 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1223 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1224 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1225 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1226 Optional<Data> data = dataFuture.get();
1231 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1232 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1233 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1234 replyWithGetFailed("not-test");
1235 replyWithGetFailed(identifier);
1236 Optional<Data> data = dataFuture.get();
1237 assertThat(data.isPresent(), is(false));
1241 public void getFailedForDifferentIdentifierIsIgnored()
1242 throws InterruptedException, ExecutionException, IOException {
1243 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1244 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1245 replyWithGetFailed("not-test");
1246 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1247 Optional<Data> data = dataFuture.get();
1251 @Test(expected = ExecutionException.class)
1252 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1253 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1254 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1260 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1261 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1262 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1266 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1267 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1268 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1272 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1273 throws InterruptedException, ExecutionException, IOException {
1274 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1275 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1279 public void clientGetWithPrioritySettingSendsCorrectCommands()
1280 throws InterruptedException, ExecutionException, IOException {
1281 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1282 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1286 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1287 throws InterruptedException, ExecutionException, IOException {
1288 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1289 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1293 public void clientGetWithGlobalSettingSendsCorrectCommands()
1294 throws InterruptedException, ExecutionException, IOException {
1295 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1296 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1299 private void replyWithGetFailed(String identifier) throws IOException {
1300 fcpServer.writeLine(
1302 "Identifier=" + identifier,
1308 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1309 fcpServer.writeLine(
1311 "Identifier=" + identifier,
1312 "DataLength=" + (text.length() + 1),
1313 "StartupTime=1435610539000",
1314 "CompletionTime=1435610540000",
1315 "Metadata.ContentType=" + contentType,
1321 private void verifyData(Optional<Data> data) throws IOException {
1322 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1323 assertThat(data.get().size(), is(6L));
1324 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1325 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1330 public class ClientPut {
1333 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1334 fcpClient.clientPut()
1335 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1340 readMessage("Hello", this::matchesDirectClientPut);
1344 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1345 Future<Optional<Key>> key = fcpClient.clientPut()
1346 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1351 readMessage("Hello", this::matchesDirectClientPut);
1352 replyWithPutFailed("not-the-right-one");
1353 replyWithPutSuccessful(identifier);
1354 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1358 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1359 Future<Optional<Key>> key = fcpClient.clientPut()
1360 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1365 readMessage("Hello", this::matchesDirectClientPut);
1366 replyWithPutSuccessful("not-the-right-one");
1367 replyWithPutFailed(identifier);
1368 assertThat(key.get().isPresent(), is(false));
1372 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1373 fcpClient.clientPut()
1374 .named("otherName.txt")
1375 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1380 readMessage("Hello", () -> allOf(
1381 hasHead("ClientPut"),
1382 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1384 hasTail("EndMessage", "Hello")
1389 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1390 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1391 connectAndAssert(() ->
1392 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1396 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1397 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1398 connectAndAssert(() ->
1399 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1404 private final File ddaFile;
1405 private final File fileToUpload;
1407 public DDA() throws IOException {
1408 ddaFile = createDdaFile();
1409 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1412 private Matcher<List<String>> matchesFileClientPut(File file) {
1413 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1417 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1418 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1419 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1420 sendDdaRequired(identifier);
1421 readMessage(() -> matchesTestDDARequest(ddaFile));
1422 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1423 readMessage(() -> matchesTestDDAResponse(ddaFile));
1424 writeTestDDAComplete(ddaFile);
1425 readMessage(() -> matchesFileClientPut(fileToUpload));
1429 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1430 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1431 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1432 sendDdaRequired(identifier);
1433 readMessage(() -> matchesTestDDARequest(ddaFile));
1434 sendTestDDAReply("/some-other-directory", ddaFile);
1435 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1436 readMessage(() -> matchesTestDDAResponse(ddaFile));
1440 public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1441 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1442 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1443 sendDdaRequired(identifier);
1444 readMessage(() -> matchesTestDDARequest(ddaFile));
1445 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1446 readMessage(this::matchesFailedToReadResponse);
1450 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1451 throws IOException, ExecutionException, InterruptedException {
1452 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1454 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1455 String identifier = extractIdentifier(lines);
1456 fcpServer.writeLine(
1458 "Directory=/some-other-directory",
1461 sendDdaRequired(identifier);
1462 lines = fcpServer.collectUntil(is("EndMessage"));
1463 assertThat(lines, matchesFcpMessage(
1465 "Directory=" + ddaFile.getParent(),
1466 "WantReadDirectory=true",
1467 "WantWriteDirectory=false"
1471 private Matcher<List<String>> matchesFailedToReadResponse() {
1472 return matchesFcpMessage(
1474 "Directory=" + ddaFile.getParent(),
1475 "ReadContent=failed-to-read"
1479 private void writeTestDDAComplete(File tempFile) throws IOException {
1480 fcpServer.writeLine(
1482 "Directory=" + tempFile.getParent(),
1483 "ReadDirectoryAllowed=true",
1488 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1489 return matchesFcpMessage(
1491 "Directory=" + tempFile.getParent(),
1492 "ReadContent=test-content"
1496 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1497 fcpServer.writeLine(
1499 "Directory=" + directory,
1500 "ReadFilename=" + tempFile,
1505 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1506 return matchesFcpMessage(
1508 "Directory=" + tempFile.getParent(),
1509 "WantReadDirectory=true",
1510 "WantWriteDirectory=false"
1514 private void sendDdaRequired(String identifier) throws IOException {
1515 fcpServer.writeLine(
1517 "Identifier=" + identifier,
1525 private void replyWithPutSuccessful(String identifier) throws IOException {
1526 fcpServer.writeLine(
1529 "Identifier=" + identifier,
1534 private void replyWithPutFailed(String identifier) throws IOException {
1535 fcpServer.writeLine(
1537 "Identifier=" + identifier,
1542 private Matcher<List<String>> matchesDirectClientPut() {
1544 hasHead("ClientPut"),
1545 hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
1546 hasTail("EndMessage", "Hello")
1550 private File createDdaFile() throws IOException {
1551 File tempFile = File.createTempFile("test-dda-", ".dat");
1552 tempFile.deleteOnExit();
1553 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1558 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1559 throws InterruptedException, ExecutionException, IOException {
1560 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1562 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1563 String identifier = extractIdentifier(lines);
1564 fcpServer.writeLine(
1566 "Identifier=not-the-right-one",
1570 fcpServer.writeLine(
1572 "Identifier=" + identifier,
1576 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1580 public void clientPutAbortsOnProtocolErrorOtherThan25()
1581 throws InterruptedException, ExecutionException, IOException {
1582 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1584 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1585 String identifier = extractIdentifier(lines);
1586 fcpServer.writeLine(
1588 "Identifier=" + identifier,
1592 assertThat(key.get().isPresent(), is(false));
1596 public void clientPutSendsNotificationsForGeneratedKeys()
1597 throws InterruptedException, ExecutionException, IOException {
1598 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1599 Future<Optional<Key>> key = fcpClient.clientPut()
1600 .onKeyGenerated(generatedKeys::add)
1601 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1606 List<String> lines = fcpServer.collectUntil(is("Hello"));
1607 String identifier = extractIdentifier(lines);
1608 fcpServer.writeLine(
1610 "Identifier=" + identifier,
1614 replyWithPutSuccessful(identifier);
1615 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1616 assertThat(generatedKeys, contains("KSK@foo.txt"));
1621 public class ConfigCommand {
1623 public class GetConfig {
1626 public void defaultFcpClientCanGetConfigWithoutDetails()
1627 throws InterruptedException, ExecutionException, IOException {
1628 Future<ConfigData> configData = fcpClient.getConfig().execute();
1629 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1630 replyWithConfigData();
1631 assertThat(configData.get(), notNullValue());
1635 public void defaultFcpClientCanGetConfigWithCurrent()
1636 throws InterruptedException, ExecutionException, IOException {
1637 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1638 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1639 replyWithConfigData("current.foo=bar");
1640 assertThat(configData.get().getCurrent("foo"), is("bar"));
1644 public void defaultFcpClientCanGetConfigWithDefaults()
1645 throws InterruptedException, ExecutionException, IOException {
1646 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1647 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1648 replyWithConfigData("default.foo=bar");
1649 assertThat(configData.get().getDefault("foo"), is("bar"));
1653 public void defaultFcpClientCanGetConfigWithSortOrder()
1654 throws InterruptedException, ExecutionException, IOException {
1655 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1656 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1657 replyWithConfigData("sortOrder.foo=17");
1658 assertThat(configData.get().getSortOrder("foo"), is(17));
1662 public void defaultFcpClientCanGetConfigWithExpertFlag()
1663 throws InterruptedException, ExecutionException, IOException {
1664 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1665 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1666 replyWithConfigData("expertFlag.foo=true");
1667 assertThat(configData.get().getExpertFlag("foo"), is(true));
1671 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1672 throws InterruptedException, ExecutionException, IOException {
1673 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1674 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1675 replyWithConfigData("forceWriteFlag.foo=true");
1676 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1680 public void defaultFcpClientCanGetConfigWithShortDescription()
1681 throws InterruptedException, ExecutionException, IOException {
1682 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1683 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1684 replyWithConfigData("shortDescription.foo=bar");
1685 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1689 public void defaultFcpClientCanGetConfigWithLongDescription()
1690 throws InterruptedException, ExecutionException, IOException {
1691 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1692 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1693 replyWithConfigData("longDescription.foo=bar");
1694 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1698 public void defaultFcpClientCanGetConfigWithDataTypes()
1699 throws InterruptedException, ExecutionException, IOException {
1700 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1701 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1702 replyWithConfigData("dataType.foo=number");
1703 assertThat(configData.get().getDataType("foo"), is("number"));
1706 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1707 return matchesFcpMessage(
1709 "Identifier=" + identifier,
1710 additionalParameter + "=true"
1716 public class ModifyConfig {
1719 public void defaultFcpClientCanModifyConfigData()
1720 throws InterruptedException, ExecutionException, IOException {
1721 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1722 connectAndAssert(() -> matchesFcpMessage(
1724 "Identifier=" + identifier,
1727 replyWithConfigData("current.foo.bar=baz");
1728 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1733 private void replyWithConfigData(String... additionalLines) throws IOException {
1734 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1735 fcpServer.writeLine(additionalLines);
1736 fcpServer.writeLine("EndMessage");
1741 public class NodeInformation {
1744 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1745 Future<NodeData> nodeData = fcpClient.getNode().execute();
1746 connectAndAssert(() -> matchesGetNode(false, false, false));
1747 replyWithNodeData();
1748 assertThat(nodeData.get(), notNullValue());
1749 assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1753 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1754 throws InterruptedException, ExecutionException, IOException {
1755 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1756 connectAndAssert(() -> matchesGetNode(true, false, false));
1757 replyWithNodeData("opennet=true");
1758 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1759 assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1763 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1764 throws InterruptedException, ExecutionException, IOException {
1765 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1766 connectAndAssert(() -> matchesGetNode(false, true, false));
1767 replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1768 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1772 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1773 throws InterruptedException, ExecutionException, IOException {
1774 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1775 connectAndAssert(() -> matchesGetNode(false, false, true));
1776 replyWithNodeData("volatile.freeJavaMemory=205706528");
1777 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1780 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1781 return matchesFcpMessage(
1783 "Identifier=" + identifier,
1784 "GiveOpennetRef=" + withOpennetRef,
1785 "WithPrivate=" + withPrivate,
1786 "WithVolatile=" + withVolatile
1790 private void replyWithNodeData(String... additionalLines) throws IOException {
1791 fcpServer.writeLine(
1793 "Identifier=" + identifier,
1794 "ark.pubURI=SSK@3YEf.../ark",
1797 "version=Fred,0.7,1.0,1466",
1798 "lastGoodVersion=Fred,0.7,1.0,1466"
1800 fcpServer.writeLine(additionalLines);
1801 fcpServer.writeLine("EndMessage");