1 package net.pterodactylus.fcp.quelaton;
3 import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
4 import static org.hamcrest.MatcherAssert.assertThat;
5 import static org.hamcrest.Matchers.allOf;
6 import static org.hamcrest.Matchers.contains;
7 import static org.hamcrest.Matchers.containsInAnyOrder;
8 import static org.hamcrest.Matchers.hasItem;
9 import static org.hamcrest.Matchers.hasSize;
10 import static org.hamcrest.Matchers.is;
11 import static org.hamcrest.Matchers.not;
12 import static org.hamcrest.Matchers.notNullValue;
13 import static org.hamcrest.Matchers.startsWith;
15 import java.io.ByteArrayInputStream;
17 import java.io.IOException;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
37 import net.pterodactylus.fcp.ARK;
38 import net.pterodactylus.fcp.ConfigData;
39 import net.pterodactylus.fcp.DSAGroup;
40 import net.pterodactylus.fcp.FcpKeyPair;
41 import net.pterodactylus.fcp.Key;
42 import net.pterodactylus.fcp.NodeData;
43 import net.pterodactylus.fcp.NodeRef;
44 import net.pterodactylus.fcp.Peer;
45 import net.pterodactylus.fcp.PeerNote;
46 import net.pterodactylus.fcp.PluginInfo;
47 import net.pterodactylus.fcp.Priority;
48 import net.pterodactylus.fcp.RequestProgress;
49 import net.pterodactylus.fcp.fake.FakeTcpServer;
50 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
52 import com.google.common.io.ByteStreams;
53 import com.google.common.io.Files;
54 import com.nitorcreations.junit.runners.NestedRunner;
55 import org.hamcrest.Description;
56 import org.hamcrest.Matcher;
57 import org.hamcrest.Matchers;
58 import org.hamcrest.TypeSafeDiagnosingMatcher;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
65 * Unit test for {@link DefaultFcpClient}.
67 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
69 @RunWith(NestedRunner.class)
70 public class DefaultFcpClientTest {
72 private static final String INSERT_URI =
73 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
74 private static final String REQUEST_URI =
75 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
77 private int threadCounter = 0;
78 private final ExecutorService threadPool =
79 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
80 private final FakeTcpServer fcpServer;
81 private final DefaultFcpClient fcpClient;
83 public DefaultFcpClientTest() throws IOException {
84 fcpServer = new FakeTcpServer(threadPool);
85 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
89 public void tearDown() throws IOException {
91 threadPool.shutdown();
94 private void connectNode() throws InterruptedException, ExecutionException, IOException {
95 fcpServer.connect().get();
96 fcpServer.collectUntil(is("EndMessage"));
97 fcpServer.writeLine("NodeHello",
98 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
99 "Revision=build01466",
101 "Version=Fred,0.7,1.0,1466",
103 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
107 "NodeLanguage=ENGLISH",
113 private String extractIdentifier(List<String> lines) {
114 return lines.stream()
115 .filter(s -> s.startsWith("Identifier="))
116 .map(s -> s.substring(s.indexOf('=') + 1))
121 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
122 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
125 private Matcher<Iterable<String>> hasHead(String firstElement) {
126 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
128 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
129 if (!iterable.iterator().hasNext()) {
130 mismatchDescription.appendText("is empty");
133 String element = iterable.iterator().next();
134 if (!element.equals(firstElement)) {
135 mismatchDescription.appendText("starts with ").appendValue(element);
142 public void describeTo(Description description) {
143 description.appendText("starts with ").appendValue(firstElement);
148 private Matcher<List<String>> matchesFcpMessageWithTerminator(
149 String name, String terminator, String... requiredLines) {
150 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
153 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
154 return new TypeSafeDiagnosingMatcher<List<String>>() {
156 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
157 if (item.size() < (ignoreStart + ignoreEnd)) {
158 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
161 for (String line : lines) {
162 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
163 mismatchDescription.appendText("does not contains ").appendValue(line);
171 public void describeTo(Description description) {
172 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
173 description.appendText(", ignoring the first ").appendValue(ignoreStart);
174 description.appendText(" and the last ").appendValue(ignoreEnd);
179 private Matcher<List<String>> hasTail(String... lastElements) {
180 return new TypeSafeDiagnosingMatcher<List<String>>() {
182 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
183 if (list.size() < lastElements.length) {
184 mismatchDescription.appendText("is too small");
187 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
188 if (!tail.equals(Arrays.asList(lastElements))) {
189 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
196 public void describeTo(Description description) {
197 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
202 private List<String> lines;
203 private String identifier;
205 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
206 throws InterruptedException, ExecutionException, IOException {
208 readMessage(requestMatcher);
211 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
212 readMessage("EndMessage", requestMatcher);
215 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
216 lines = fcpServer.collectUntil(is(terminator));
217 identifier = extractIdentifier(lines);
218 assertThat(lines, requestMatcher.get());
221 private void replyWithProtocolError() throws IOException {
224 "Identifier=" + identifier,
229 public class ConnectionsAndKeyPairs {
231 public class Connections {
233 @Test(expected = ExecutionException.class)
234 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
235 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
236 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
238 "CloseConnectionDuplicateClientName",
244 @Test(expected = ExecutionException.class)
245 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
246 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
247 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
253 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
254 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
255 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
258 keyPair = fcpClient.generateKeypair().execute();
259 readMessage(() -> matchesFcpMessage("GenerateSSK"));
260 identifier = extractIdentifier(lines);
266 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
267 throws InterruptedException, ExecutionException, IOException {
268 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
269 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
274 } catch (ExecutionException e) {
277 keyPair = fcpClient.generateKeypair().execute();
278 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
285 public class GenerateKeyPair {
288 public void defaultFcpClientCanGenerateKeypair()
289 throws ExecutionException, InterruptedException, IOException {
290 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
291 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
293 FcpKeyPair keyPair = keyPairFuture.get();
294 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
295 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
300 private void replyWithKeyPair() throws IOException {
301 fcpServer.writeLine("SSKKeypair",
302 "InsertURI=" + INSERT_URI + "",
303 "RequestURI=" + REQUEST_URI + "",
304 "Identifier=" + identifier,
312 public class PeerCommands {
314 public class ListPeer {
317 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
318 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
319 connectAndAssert(() -> matchesListPeer("id1"));
320 replyWithPeer("id1");
321 assertThat(peer.get().get().getIdentity(), is("id1"));
325 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
326 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
327 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
328 replyWithPeer("id1");
329 assertThat(peer.get().get().getIdentity(), is("id1"));
333 public void byName() throws InterruptedException, ExecutionException, IOException {
334 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
335 connectAndAssert(() -> matchesListPeer("FriendNode"));
336 replyWithPeer("id1");
337 assertThat(peer.get().get().getIdentity(), is("id1"));
341 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
342 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
343 connectAndAssert(() -> matchesListPeer("id2"));
344 replyWithUnknownNodeIdentifier();
345 assertThat(peer.get().isPresent(), is(false));
348 private Matcher<List<String>> matchesListPeer(String nodeId) {
349 return matchesFcpMessage(
351 "Identifier=" + identifier,
352 "NodeIdentifier=" + nodeId
358 public class ListPeers {
361 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
362 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
363 connectAndAssert(() -> matchesListPeers(false, false));
364 replyWithPeer("id1");
365 replyWithPeer("id2");
367 assertThat(peers.get(), hasSize(2));
368 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
369 containsInAnyOrder("id1", "id2"));
373 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
374 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
375 connectAndAssert(() -> matchesListPeers(false, true));
376 replyWithPeer("id1", "metadata.foo=bar1");
377 replyWithPeer("id2", "metadata.foo=bar2");
379 assertThat(peers.get(), hasSize(2));
380 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
381 containsInAnyOrder("bar1", "bar2"));
385 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
386 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
387 connectAndAssert(() -> matchesListPeers(true, false));
388 replyWithPeer("id1", "volatile.foo=bar1");
389 replyWithPeer("id2", "volatile.foo=bar2");
391 assertThat(peers.get(), hasSize(2));
392 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
393 containsInAnyOrder("bar1", "bar2"));
396 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
397 return matchesFcpMessage(
399 "WithVolatile=" + withVolatile,
400 "WithMetadata=" + withMetadata
404 private void sendEndOfPeerList() throws IOException {
407 "Identifier=" + identifier,
414 public class AddPeer {
417 public void fromFile() throws InterruptedException, ExecutionException, IOException {
418 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
419 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
420 replyWithPeer("id1");
421 assertThat(peer.get().get().getIdentity(), is("id1"));
425 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
426 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
427 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
428 replyWithPeer("id1");
429 assertThat(peer.get().get().getIdentity(), is("id1"));
433 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
434 NodeRef nodeRef = createNodeRef();
435 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
436 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
442 "dsaGroup.q=subprime",
443 "dsaPubKey.y=dsa-public",
444 "physical.udp=1.2.3.4:5678",
448 replyWithPeer("id1");
449 assertThat(peer.get().get().getIdentity(), is("id1"));
453 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
454 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
455 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
456 replyWithProtocolError();
457 assertThat(peer.get().isPresent(), is(false));
460 private NodeRef createNodeRef() {
461 NodeRef nodeRef = new NodeRef();
462 nodeRef.setIdentity("id1");
463 nodeRef.setName("name");
464 nodeRef.setARK(new ARK("public", "1"));
465 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
466 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
467 nodeRef.setPhysicalUDP("1.2.3.4:5678");
468 nodeRef.setDSAPublicKey("dsa-public");
469 nodeRef.setSignature("sig");
473 private Matcher<List<String>> matchesAddPeer() {
474 return matchesFcpMessage(
476 "Identifier=" + identifier
482 public class ModifyPeer {
485 public void defaultFcpClientCanEnablePeerByName()
486 throws InterruptedException, ExecutionException, IOException {
487 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
488 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
489 replyWithPeer("id1");
490 assertThat(peer.get().get().getIdentity(), is("id1"));
494 public void defaultFcpClientCanDisablePeerByName()
495 throws InterruptedException, ExecutionException, IOException {
496 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
497 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
498 replyWithPeer("id1");
499 assertThat(peer.get().get().getIdentity(), is("id1"));
503 public void defaultFcpClientCanEnablePeerByIdentity()
504 throws InterruptedException, ExecutionException, IOException {
505 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
506 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
507 replyWithPeer("id1");
508 assertThat(peer.get().get().getIdentity(), is("id1"));
512 public void defaultFcpClientCanEnablePeerByHostAndPort()
513 throws InterruptedException, ExecutionException, IOException {
514 Future<Optional<Peer>> peer =
515 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
516 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
517 replyWithPeer("id1");
518 assertThat(peer.get().get().getIdentity(), is("id1"));
522 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
523 Future<Optional<Peer>> peer =
524 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
525 connectAndAssert(() -> allOf(
526 matchesModifyPeer("id1", "AllowLocalAddresses", true),
527 not(contains(startsWith("IsDisabled=")))
529 replyWithPeer("id1");
530 assertThat(peer.get().get().getIdentity(), is("id1"));
534 public void disallowLocalAddressesOfPeer()
535 throws InterruptedException, ExecutionException, IOException {
536 Future<Optional<Peer>> peer =
537 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
538 connectAndAssert(() -> allOf(
539 matchesModifyPeer("id1", "AllowLocalAddresses", false),
540 not(contains(startsWith("IsDisabled=")))
542 replyWithPeer("id1");
543 assertThat(peer.get().get().getIdentity(), is("id1"));
547 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
548 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
549 connectAndAssert(() -> allOf(
550 matchesModifyPeer("id1", "IsBurstOnly", true),
551 not(contains(startsWith("AllowLocalAddresses="))),
552 not(contains(startsWith("IsDisabled=")))
554 replyWithPeer("id1");
555 assertThat(peer.get().get().getIdentity(), is("id1"));
559 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
560 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
561 connectAndAssert(() -> allOf(
562 matchesModifyPeer("id1", "IsBurstOnly", false),
563 not(contains(startsWith("AllowLocalAddresses="))),
564 not(contains(startsWith("IsDisabled=")))
566 replyWithPeer("id1");
567 assertThat(peer.get().get().getIdentity(), is("id1"));
571 public void defaultFcpClientCanSetListenOnlyForPeer()
572 throws InterruptedException, ExecutionException, IOException {
573 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
574 connectAndAssert(() -> allOf(
575 matchesModifyPeer("id1", "IsListenOnly", true),
576 not(contains(startsWith("AllowLocalAddresses="))),
577 not(contains(startsWith("IsDisabled="))),
578 not(contains(startsWith("IsBurstOnly=")))
580 replyWithPeer("id1");
581 assertThat(peer.get().get().getIdentity(), is("id1"));
585 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
586 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
587 connectAndAssert(() -> allOf(
588 matchesModifyPeer("id1", "IsListenOnly", false),
589 not(contains(startsWith("AllowLocalAddresses="))),
590 not(contains(startsWith("IsDisabled="))),
591 not(contains(startsWith("IsBurstOnly=")))
593 replyWithPeer("id1");
594 assertThat(peer.get().get().getIdentity(), is("id1"));
598 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
599 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
600 connectAndAssert(() -> allOf(
601 matchesModifyPeer("id1", "IgnoreSourcePort", true),
602 not(contains(startsWith("AllowLocalAddresses="))),
603 not(contains(startsWith("IsDisabled="))),
604 not(contains(startsWith("IsBurstOnly="))),
605 not(contains(startsWith("IsListenOnly=")))
607 replyWithPeer("id1");
608 assertThat(peer.get().get().getIdentity(), is("id1"));
612 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
613 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
614 connectAndAssert(() -> allOf(
615 matchesModifyPeer("id1", "IgnoreSourcePort", false),
616 not(contains(startsWith("AllowLocalAddresses="))),
617 not(contains(startsWith("IsDisabled="))),
618 not(contains(startsWith("IsBurstOnly="))),
619 not(contains(startsWith("IsListenOnly=")))
621 replyWithPeer("id1");
622 assertThat(peer.get().get().getIdentity(), is("id1"));
626 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
627 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
628 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
629 replyWithUnknownNodeIdentifier();
630 assertThat(peer.get().isPresent(), is(false));
633 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
634 return matchesFcpMessage(
636 "Identifier=" + identifier,
637 "NodeIdentifier=" + nodeIdentifier,
638 setting + "=" + value
644 public class RemovePeer {
647 public void byName() throws InterruptedException, ExecutionException, IOException {
648 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
649 connectAndAssert(() -> matchesRemovePeer("Friend1"));
650 replyWithPeerRemoved("Friend1");
651 assertThat(peer.get(), is(true));
655 public void invalidName() throws InterruptedException, ExecutionException, IOException {
656 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
657 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
658 replyWithUnknownNodeIdentifier();
659 assertThat(peer.get(), is(false));
663 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
664 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
665 connectAndAssert(() -> matchesRemovePeer("id1"));
666 replyWithPeerRemoved("id1");
667 assertThat(peer.get(), is(true));
671 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
672 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
673 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
674 replyWithPeerRemoved("Friend1");
675 assertThat(peer.get(), is(true));
678 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
679 return matchesFcpMessage(
681 "Identifier=" + identifier,
682 "NodeIdentifier=" + nodeIdentifier
686 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
689 "Identifier=" + identifier,
690 "NodeIdentifier=" + nodeIdentifier,
697 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
700 "Identifier=" + identifier,
701 "identity=" + peerId,
703 "ark.pubURI=SSK@3YEf.../ark",
706 "version=Fred,0.7,1.0,1466",
707 "lastGoodVersion=Fred,0.7,1.0,1466"
709 fcpServer.writeLine(additionalLines);
710 fcpServer.writeLine("EndMessage");
715 public class PeerNoteCommands {
717 public class ListPeerNotes {
720 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
721 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
722 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
723 replyWithUnknownNodeIdentifier();
724 assertThat(peerNote.get().isPresent(), is(false));
728 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
729 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
730 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
732 replyWithEndListPeerNotes();
733 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
734 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
738 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
739 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
740 connectAndAssert(() -> matchesListPeerNotes("id1"));
742 replyWithEndListPeerNotes();
743 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
744 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
748 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
749 Future<Optional<PeerNote>> peerNote =
750 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
751 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
753 replyWithEndListPeerNotes();
754 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
755 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
758 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
759 return matchesFcpMessage(
761 "NodeIdentifier=" + nodeIdentifier
765 private void replyWithEndListPeerNotes() throws IOException {
768 "Identifier=" + identifier,
773 private void replyWithPeerNote() throws IOException {
776 "Identifier=" + identifier,
777 "NodeIdentifier=Friend1",
778 "NoteText=RXhhbXBsZSBUZXh0Lg==",
786 public class ModifyPeerNotes {
789 public void byName() throws InterruptedException, ExecutionException, IOException {
790 Future<Boolean> noteUpdated =
791 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
792 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
794 assertThat(noteUpdated.get(), is(true));
798 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
799 Future<Boolean> noteUpdated =
800 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
801 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
802 replyWithUnknownNodeIdentifier();
803 assertThat(noteUpdated.get(), is(false));
807 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
808 throws InterruptedException, ExecutionException, IOException {
809 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
810 assertThat(noteUpdated.get(), is(false));
814 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
815 Future<Boolean> noteUpdated =
816 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
817 connectAndAssert(() -> matchesModifyPeerNote("id1"));
819 assertThat(noteUpdated.get(), is(true));
823 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
824 Future<Boolean> noteUpdated =
825 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
826 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
828 assertThat(noteUpdated.get(), is(true));
831 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
832 return matchesFcpMessage(
834 "Identifier=" + identifier,
835 "NodeIdentifier=" + nodeIdentifier,
841 private void replyWithPeerNote() throws IOException {
844 "Identifier=" + identifier,
845 "NodeIdentifier=Friend1",
856 private void replyWithUnknownNodeIdentifier() throws IOException {
858 "UnknownNodeIdentifier",
859 "Identifier=" + identifier,
860 "NodeIdentifier=id2",
867 public class PluginCommands {
869 private static final String CLASS_NAME = "foo.plugin.Plugin";
871 private void replyWithPluginInfo() throws IOException {
874 "Identifier=" + identifier,
875 "PluginName=superPlugin",
879 "OriginUri=superPlugin",
885 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
886 throws InterruptedException, ExecutionException {
887 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
888 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
889 assertThat(pluginInfo.get().get().isTalkable(), is(true));
890 assertThat(pluginInfo.get().get().getVersion(), is("42"));
891 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
892 assertThat(pluginInfo.get().get().isStarted(), is(true));
895 public class LoadPlugin {
897 public class OfficialPlugins {
900 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
901 Future<Optional<PluginInfo>> pluginInfo =
902 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
903 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
904 assertThat(lines, not(contains(startsWith("Store="))));
905 replyWithPluginInfo();
906 verifyPluginInfo(pluginInfo);
910 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
911 Future<Optional<PluginInfo>> pluginInfo =
912 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
913 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
914 assertThat(lines, hasItem("Store=true"));
915 replyWithPluginInfo();
916 verifyPluginInfo(pluginInfo);
920 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
921 Future<Optional<PluginInfo>> pluginInfo =
922 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
923 connectAndAssert(() -> createMatcherForOfficialSource("https"));
924 replyWithPluginInfo();
925 verifyPluginInfo(pluginInfo);
928 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
929 return matchesFcpMessage(
931 "Identifier=" + identifier,
932 "PluginURL=superPlugin",
934 "OfficialSource=" + officialSource
940 public class FromOtherSources {
942 private static final String FILE_PATH = "/path/to/plugin.jar";
943 private static final String URL = "http://server.com/plugin.jar";
944 private static final String KEY = "KSK@plugin.jar";
947 public void fromFile() throws ExecutionException, InterruptedException, IOException {
948 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
949 connectAndAssert(() -> createMatcher("file", FILE_PATH));
950 replyWithPluginInfo();
951 verifyPluginInfo(pluginInfo);
955 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
956 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
957 connectAndAssert(() -> createMatcher("url", URL));
958 replyWithPluginInfo();
959 verifyPluginInfo(pluginInfo);
963 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
964 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
965 connectAndAssert(() -> createMatcher("freenet", KEY));
966 replyWithPluginInfo();
967 verifyPluginInfo(pluginInfo);
970 private Matcher<List<String>> createMatcher(String urlType, String url) {
971 return matchesFcpMessage(
973 "Identifier=" + identifier,
981 public class Failed {
984 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
985 Future<Optional<PluginInfo>> pluginInfo =
986 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
987 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
988 replyWithProtocolError();
989 assertThat(pluginInfo.get().isPresent(), is(false));
996 public class ReloadPlugin {
999 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1000 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1001 connectAndAssert(this::matchReloadPluginMessage);
1002 replyWithPluginInfo();
1003 verifyPluginInfo(pluginInfo);
1007 public void reloadingPluginWithMaxWaitTimeWorks()
1008 throws InterruptedException, ExecutionException, IOException {
1009 Future<Optional<PluginInfo>> pluginInfo =
1010 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1011 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1012 replyWithPluginInfo();
1013 verifyPluginInfo(pluginInfo);
1017 public void reloadingPluginWithPurgeWorks()
1018 throws InterruptedException, ExecutionException, IOException {
1019 Future<Optional<PluginInfo>> pluginInfo =
1020 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1021 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1022 replyWithPluginInfo();
1023 verifyPluginInfo(pluginInfo);
1027 public void reloadingPluginWithStoreWorks()
1028 throws InterruptedException, ExecutionException, IOException {
1029 Future<Optional<PluginInfo>> pluginInfo =
1030 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1031 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1032 replyWithPluginInfo();
1033 verifyPluginInfo(pluginInfo);
1037 public void protocolErrorIsRecognizedAsFailure()
1038 throws InterruptedException, ExecutionException, IOException {
1039 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1040 connectAndAssert(() -> matchReloadPluginMessage());
1041 replyWithProtocolError();
1042 assertThat(pluginInfo.get().isPresent(), is(false));
1045 private Matcher<List<String>> matchReloadPluginMessage() {
1046 return matchesFcpMessage(
1048 "Identifier=" + identifier,
1049 "PluginName=" + CLASS_NAME
1055 public class RemovePlugin {
1058 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1059 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1060 connectAndAssert(this::matchPluginRemovedMessage);
1061 replyWithPluginRemoved();
1062 assertThat(pluginRemoved.get(), is(true));
1066 public void removingPluginWithMaxWaitTimeWorks()
1067 throws InterruptedException, ExecutionException, IOException {
1068 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1069 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1070 replyWithPluginRemoved();
1071 assertThat(pluginRemoved.get(), is(true));
1075 public void removingPluginWithPurgeWorks()
1076 throws InterruptedException, ExecutionException, IOException {
1077 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1078 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1079 replyWithPluginRemoved();
1080 assertThat(pluginRemoved.get(), is(true));
1083 private void replyWithPluginRemoved() throws IOException {
1084 fcpServer.writeLine(
1086 "Identifier=" + identifier,
1087 "PluginName=" + CLASS_NAME,
1092 private Matcher<List<String>> matchPluginRemovedMessage() {
1093 return matchesFcpMessage(
1095 "Identifier=" + identifier,
1096 "PluginName=" + CLASS_NAME
1102 public class GetPluginInfo {
1105 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1106 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1107 connectAndAssert(this::matchGetPluginInfoMessage);
1108 replyWithPluginInfo();
1109 verifyPluginInfo(pluginInfo);
1113 public void gettingPluginInfoWithDetailsWorks()
1114 throws InterruptedException, ExecutionException, IOException {
1115 Future<Optional<PluginInfo>> pluginInfo =
1116 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1117 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1118 replyWithPluginInfo();
1119 verifyPluginInfo(pluginInfo);
1123 public void protocolErrorIsRecognizedAsFailure()
1124 throws InterruptedException, ExecutionException, IOException {
1125 Future<Optional<PluginInfo>> pluginInfo =
1126 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1127 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1128 replyWithProtocolError();
1129 assertThat(pluginInfo.get(), is(Optional.empty()));
1132 private Matcher<List<String>> matchGetPluginInfoMessage() {
1133 return matchesFcpMessage(
1135 "Identifier=" + identifier,
1136 "PluginName=" + CLASS_NAME
1144 public class UskSubscriptionCommands {
1146 private static final String URI = "USK@some,uri/file.txt";
1149 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1150 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1151 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1152 replyWithSubscribed();
1153 assertThat(uskSubscription.get().get().getUri(), is(URI));
1154 AtomicInteger edition = new AtomicInteger();
1155 CountDownLatch updated = new CountDownLatch(2);
1156 uskSubscription.get().get().onUpdate(e -> {
1158 updated.countDown();
1160 sendUpdateNotification(23);
1161 sendUpdateNotification(24);
1162 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1163 assertThat(edition.get(), is(24));
1167 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1168 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1169 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1170 replyWithSubscribed();
1171 assertThat(uskSubscription.get().get().getUri(), is(URI));
1172 AtomicInteger edition = new AtomicInteger();
1173 CountDownLatch updated = new CountDownLatch(2);
1174 uskSubscription.get().get().onUpdate(e -> {
1176 updated.countDown();
1178 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1179 sendUpdateNotification(23);
1180 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1181 assertThat(edition.get(), is(23));
1185 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1186 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1187 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1188 replyWithSubscribed();
1189 assertThat(uskSubscription.get().get().getUri(), is(URI));
1190 AtomicBoolean updated = new AtomicBoolean();
1191 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1192 uskSubscription.get().get().cancel();
1193 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1194 sendUpdateNotification(23);
1195 assertThat(updated.get(), is(false));
1198 private void replyWithSubscribed() throws IOException {
1199 fcpServer.writeLine(
1201 "Identifier=" + identifier,
1208 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1209 fcpServer.writeLine(
1210 "SubscribedUSKUpdate",
1211 "Identifier=" + identifier,
1213 "Edition=" + edition
1215 fcpServer.writeLine(additionalLines);
1216 fcpServer.writeLine("EndMessage");
1221 public class ClientGet {
1224 public void works() throws InterruptedException, ExecutionException, IOException {
1225 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1226 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1227 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1228 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1229 Optional<Data> data = dataFuture.get();
1234 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1235 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1236 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1237 replyWithGetFailed("not-test");
1238 replyWithGetFailed(identifier);
1239 Optional<Data> data = dataFuture.get();
1240 assertThat(data.isPresent(), is(false));
1244 public void getFailedForDifferentIdentifierIsIgnored()
1245 throws InterruptedException, ExecutionException, IOException {
1246 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1247 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1248 replyWithGetFailed("not-test");
1249 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1250 Optional<Data> data = dataFuture.get();
1254 @Test(expected = ExecutionException.class)
1255 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1256 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1257 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1263 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1264 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1265 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1269 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1270 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1271 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1275 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1276 throws InterruptedException, ExecutionException, IOException {
1277 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1278 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1282 public void clientGetWithPrioritySettingSendsCorrectCommands()
1283 throws InterruptedException, ExecutionException, IOException {
1284 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1285 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1289 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1290 throws InterruptedException, ExecutionException, IOException {
1291 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1292 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1296 public void clientGetWithGlobalSettingSendsCorrectCommands()
1297 throws InterruptedException, ExecutionException, IOException {
1298 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1299 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1302 private void replyWithGetFailed(String identifier) throws IOException {
1303 fcpServer.writeLine(
1305 "Identifier=" + identifier,
1311 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1312 fcpServer.writeLine(
1314 "Identifier=" + identifier,
1315 "DataLength=" + (text.length() + 1),
1316 "StartupTime=1435610539000",
1317 "CompletionTime=1435610540000",
1318 "Metadata.ContentType=" + contentType,
1324 private void verifyData(Optional<Data> data) throws IOException {
1325 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1326 assertThat(data.get().size(), is(6L));
1327 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1328 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1333 public class PutCommands {
1335 public class ClientPut {
1338 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1339 fcpClient.clientPut()
1340 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1345 readMessage("Hello", this::matchesDirectClientPut);
1349 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1350 Future<Optional<Key>> key = fcpClient.clientPut()
1351 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1356 readMessage("Hello", this::matchesDirectClientPut);
1357 replyWithPutFailed("not-the-right-one");
1358 replyWithPutSuccessful(identifier);
1359 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1363 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1364 Future<Optional<Key>> key = fcpClient.clientPut()
1365 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1370 readMessage("Hello", this::matchesDirectClientPut);
1371 replyWithPutSuccessful("not-the-right-one");
1372 replyWithPutFailed(identifier);
1373 assertThat(key.get().isPresent(), is(false));
1377 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1378 fcpClient.clientPut()
1379 .named("otherName.txt")
1380 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1385 readMessage("Hello", () -> allOf(
1386 hasHead("ClientPut"),
1387 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1389 hasTail("EndMessage", "Hello")
1394 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1395 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1396 connectAndAssert(() ->
1397 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1401 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1402 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1403 connectAndAssert(() ->
1404 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1409 private final File ddaFile;
1410 private final File fileToUpload;
1412 public DDA() throws IOException {
1413 ddaFile = createDdaFile();
1414 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1417 private Matcher<List<String>> matchesFileClientPut(File file) {
1418 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1422 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1423 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1424 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1425 sendDdaRequired(identifier);
1426 readMessage(() -> matchesTestDDARequest(ddaFile));
1427 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1428 readMessage(() -> matchesTestDDAResponse(ddaFile));
1429 writeTestDDAComplete(ddaFile);
1430 readMessage(() -> matchesFileClientPut(fileToUpload));
1434 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1435 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1436 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1437 sendDdaRequired(identifier);
1438 readMessage(() -> matchesTestDDARequest(ddaFile));
1439 sendTestDDAReply("/some-other-directory", ddaFile);
1440 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1441 readMessage(() -> matchesTestDDAResponse(ddaFile));
1445 public void sendResponseIfFileUnreadable()
1446 throws IOException, ExecutionException, InterruptedException {
1447 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1448 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1449 sendDdaRequired(identifier);
1450 readMessage(() -> matchesTestDDARequest(ddaFile));
1451 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1452 readMessage(this::matchesFailedToReadResponse);
1456 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1457 throws IOException, ExecutionException, InterruptedException {
1458 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1460 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1461 String identifier = extractIdentifier(lines);
1462 fcpServer.writeLine(
1464 "Directory=/some-other-directory",
1467 sendDdaRequired(identifier);
1468 lines = fcpServer.collectUntil(is("EndMessage"));
1469 assertThat(lines, matchesFcpMessage(
1471 "Directory=" + ddaFile.getParent(),
1472 "WantReadDirectory=true",
1473 "WantWriteDirectory=false"
1477 private Matcher<List<String>> matchesFailedToReadResponse() {
1478 return matchesFcpMessage(
1480 "Directory=" + ddaFile.getParent(),
1481 "ReadContent=failed-to-read"
1485 private void writeTestDDAComplete(File tempFile) throws IOException {
1486 fcpServer.writeLine(
1488 "Directory=" + tempFile.getParent(),
1489 "ReadDirectoryAllowed=true",
1494 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1495 return matchesFcpMessage(
1497 "Directory=" + tempFile.getParent(),
1498 "ReadContent=test-content"
1502 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1503 fcpServer.writeLine(
1505 "Directory=" + directory,
1506 "ReadFilename=" + tempFile,
1511 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1512 return matchesFcpMessage(
1514 "Directory=" + tempFile.getParent(),
1515 "WantReadDirectory=true",
1516 "WantWriteDirectory=false"
1520 private void sendDdaRequired(String identifier) throws IOException {
1521 fcpServer.writeLine(
1523 "Identifier=" + identifier,
1531 private void replyWithPutFailed(String identifier) throws IOException {
1532 fcpServer.writeLine(
1534 "Identifier=" + identifier,
1539 private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
1540 List<String> lines =
1541 new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
1542 Arrays.asList(additionalLines).forEach(lines::add);
1544 hasHead("ClientPut"),
1545 hasParameters(1, 2, lines.toArray(new String[lines.size()])),
1546 hasTail("EndMessage", "Hello")
1550 private File createDdaFile() throws IOException {
1551 File tempFile = File.createTempFile("test-dda-", ".dat");
1552 tempFile.deleteOnExit();
1553 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1558 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1559 throws InterruptedException, ExecutionException, IOException {
1560 Future<Optional<Key>> key =
1561 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1563 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1564 String identifier = extractIdentifier(lines);
1565 fcpServer.writeLine(
1567 "Identifier=not-the-right-one",
1571 fcpServer.writeLine(
1573 "Identifier=" + identifier,
1577 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1581 public void clientPutAbortsOnProtocolErrorOtherThan25()
1582 throws InterruptedException, ExecutionException, IOException {
1583 Future<Optional<Key>> key =
1584 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1586 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1587 String identifier = extractIdentifier(lines);
1588 fcpServer.writeLine(
1590 "Identifier=" + identifier,
1594 assertThat(key.get().isPresent(), is(false));
1598 public void clientPutSendsNotificationsForGeneratedKeys()
1599 throws InterruptedException, ExecutionException, IOException {
1600 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1601 Future<Optional<Key>> key = fcpClient.clientPut()
1602 .onKeyGenerated(generatedKeys::add)
1603 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1608 readMessage("Hello", this::matchesDirectClientPut);
1609 replyWithGeneratedUri();
1610 replyWithPutSuccessful(identifier);
1611 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1612 assertThat(generatedKeys, contains("KSK@foo.txt"));
1616 public void clientPutSendsNotificationOnProgress()
1617 throws InterruptedException, ExecutionException, IOException {
1618 List<RequestProgress> requestProgress = new ArrayList<>();
1619 Future<Optional<Key>> key = fcpClient.clientPut()
1620 .onProgress(requestProgress::add)
1621 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1626 readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
1627 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1628 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1629 replyWithPutSuccessful(identifier);
1630 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1631 assertThat(requestProgress, contains(
1632 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1633 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1639 private void replyWithPutSuccessful(String identifier) throws IOException {
1640 fcpServer.writeLine(
1643 "Identifier=" + identifier,
1648 private void replyWithGeneratedUri() throws IOException {
1649 fcpServer.writeLine(
1651 "Identifier=" + identifier,
1657 private void replyWithSimpleProgress(
1658 int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
1659 boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
1660 fcpServer.writeLine(
1662 "Identifier=" + identifier,
1664 "Required=" + required,
1666 "FatallyFailed=" + fatallyFailed,
1667 "Succeeded=" + succeeded,
1668 "LastProgress=" + lastProgress,
1669 "FinalizedTotal=" + finalizedTotal,
1670 "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
1675 public class ClientPutDiskDir {
1678 public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1679 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1680 connectAndAssert(this::matchesClientPutDiskDir);
1681 fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1682 assertThat(key.get().get().getKey(), is("CHK@abc"));
1686 public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
1687 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1688 connectAndAssert(this::matchesClientPutDiskDir);
1689 replyWithProtocolError();
1690 assertThat(key.get().isPresent(), is(false));
1694 public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
1695 List<RequestProgress> requestProgress = new ArrayList<>();
1696 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add)
1697 .fromDirectory(new File("")).uri("CHK@").execute();
1698 connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1"));
1699 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1700 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1701 replyWithPutSuccessful(identifier);
1702 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1703 assertThat(requestProgress, contains(
1704 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1705 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1710 public void generatedUriIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
1711 List<String> generatedKeys = new ArrayList<>();
1712 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onKeyGenerated(generatedKeys::add)
1713 .fromDirectory(new File("")).uri("CHK@").execute();
1714 connectAndAssert(this::matchesClientPutDiskDir);
1715 replyWithGeneratedUri();
1716 replyWithPutSuccessful(identifier);
1717 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1718 assertThat(generatedKeys, contains("KSK@foo.txt"));
1721 private Matcher<List<String>> matchesClientPutDiskDir(String... additionalLines) {
1722 List<String> lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath()));
1723 Arrays.asList(additionalLines).forEach(lines::add);
1724 return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()]));
1731 public class ConfigCommand {
1733 public class GetConfig {
1736 public void defaultFcpClientCanGetConfigWithoutDetails()
1737 throws InterruptedException, ExecutionException, IOException {
1738 Future<ConfigData> configData = fcpClient.getConfig().execute();
1739 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1740 replyWithConfigData();
1741 assertThat(configData.get(), notNullValue());
1745 public void defaultFcpClientCanGetConfigWithCurrent()
1746 throws InterruptedException, ExecutionException, IOException {
1747 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1748 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1749 replyWithConfigData("current.foo=bar");
1750 assertThat(configData.get().getCurrent("foo"), is("bar"));
1754 public void defaultFcpClientCanGetConfigWithDefaults()
1755 throws InterruptedException, ExecutionException, IOException {
1756 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1757 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1758 replyWithConfigData("default.foo=bar");
1759 assertThat(configData.get().getDefault("foo"), is("bar"));
1763 public void defaultFcpClientCanGetConfigWithSortOrder()
1764 throws InterruptedException, ExecutionException, IOException {
1765 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1766 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1767 replyWithConfigData("sortOrder.foo=17");
1768 assertThat(configData.get().getSortOrder("foo"), is(17));
1772 public void defaultFcpClientCanGetConfigWithExpertFlag()
1773 throws InterruptedException, ExecutionException, IOException {
1774 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1775 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1776 replyWithConfigData("expertFlag.foo=true");
1777 assertThat(configData.get().getExpertFlag("foo"), is(true));
1781 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1782 throws InterruptedException, ExecutionException, IOException {
1783 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1784 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1785 replyWithConfigData("forceWriteFlag.foo=true");
1786 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1790 public void defaultFcpClientCanGetConfigWithShortDescription()
1791 throws InterruptedException, ExecutionException, IOException {
1792 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1793 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1794 replyWithConfigData("shortDescription.foo=bar");
1795 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1799 public void defaultFcpClientCanGetConfigWithLongDescription()
1800 throws InterruptedException, ExecutionException, IOException {
1801 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1802 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1803 replyWithConfigData("longDescription.foo=bar");
1804 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1808 public void defaultFcpClientCanGetConfigWithDataTypes()
1809 throws InterruptedException, ExecutionException, IOException {
1810 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1811 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1812 replyWithConfigData("dataType.foo=number");
1813 assertThat(configData.get().getDataType("foo"), is("number"));
1816 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1817 return matchesFcpMessage(
1819 "Identifier=" + identifier,
1820 additionalParameter + "=true"
1826 public class ModifyConfig {
1829 public void defaultFcpClientCanModifyConfigData()
1830 throws InterruptedException, ExecutionException, IOException {
1831 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1832 connectAndAssert(() -> matchesFcpMessage(
1834 "Identifier=" + identifier,
1837 replyWithConfigData("current.foo.bar=baz");
1838 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1843 private void replyWithConfigData(String... additionalLines) throws IOException {
1844 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1845 fcpServer.writeLine(additionalLines);
1846 fcpServer.writeLine("EndMessage");
1851 public class NodeInformation {
1854 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1855 Future<NodeData> nodeData = fcpClient.getNode().execute();
1856 connectAndAssert(() -> matchesGetNode(false, false, false));
1857 replyWithNodeData();
1858 assertThat(nodeData.get(), notNullValue());
1859 assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1863 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1864 throws InterruptedException, ExecutionException, IOException {
1865 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1866 connectAndAssert(() -> matchesGetNode(true, false, false));
1867 replyWithNodeData("opennet=true");
1868 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1869 assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1873 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1874 throws InterruptedException, ExecutionException, IOException {
1875 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1876 connectAndAssert(() -> matchesGetNode(false, true, false));
1877 replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1878 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1882 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1883 throws InterruptedException, ExecutionException, IOException {
1884 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1885 connectAndAssert(() -> matchesGetNode(false, false, true));
1886 replyWithNodeData("volatile.freeJavaMemory=205706528");
1887 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1890 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1891 return matchesFcpMessage(
1893 "Identifier=" + identifier,
1894 "GiveOpennetRef=" + withOpennetRef,
1895 "WithPrivate=" + withPrivate,
1896 "WithVolatile=" + withVolatile
1900 private void replyWithNodeData(String... additionalLines) throws IOException {
1901 fcpServer.writeLine(
1903 "Identifier=" + identifier,
1904 "ark.pubURI=SSK@3YEf.../ark",
1907 "version=Fred,0.7,1.0,1466",
1908 "lastGoodVersion=Fred,0.7,1.0,1466"
1910 fcpServer.writeLine(additionalLines);
1911 fcpServer.writeLine("EndMessage");