1 package net.pterodactylus.fcp.quelaton;
3 import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
4 import static org.hamcrest.MatcherAssert.assertThat;
5 import static org.hamcrest.Matchers.allOf;
6 import static org.hamcrest.Matchers.contains;
7 import static org.hamcrest.Matchers.containsInAnyOrder;
8 import static org.hamcrest.Matchers.hasItem;
9 import static org.hamcrest.Matchers.hasSize;
10 import static org.hamcrest.Matchers.is;
11 import static org.hamcrest.Matchers.not;
12 import static org.hamcrest.Matchers.notNullValue;
13 import static org.hamcrest.Matchers.startsWith;
15 import java.io.ByteArrayInputStream;
17 import java.io.IOException;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
37 import net.pterodactylus.fcp.ARK;
38 import net.pterodactylus.fcp.ConfigData;
39 import net.pterodactylus.fcp.DSAGroup;
40 import net.pterodactylus.fcp.FcpKeyPair;
41 import net.pterodactylus.fcp.Key;
42 import net.pterodactylus.fcp.NodeData;
43 import net.pterodactylus.fcp.NodeRef;
44 import net.pterodactylus.fcp.Peer;
45 import net.pterodactylus.fcp.PeerNote;
46 import net.pterodactylus.fcp.PluginInfo;
47 import net.pterodactylus.fcp.Priority;
48 import net.pterodactylus.fcp.RequestProgress;
49 import net.pterodactylus.fcp.fake.FakeTcpServer;
50 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
52 import com.google.common.io.ByteStreams;
53 import com.google.common.io.Files;
54 import com.nitorcreations.junit.runners.NestedRunner;
55 import org.hamcrest.Description;
56 import org.hamcrest.Matcher;
57 import org.hamcrest.Matchers;
58 import org.hamcrest.TypeSafeDiagnosingMatcher;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
65 * Unit test for {@link DefaultFcpClient}.
67 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
69 @RunWith(NestedRunner.class)
70 public class DefaultFcpClientTest {
72 private static final String INSERT_URI =
73 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
74 private static final String REQUEST_URI =
75 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
77 private int threadCounter = 0;
78 private final ExecutorService threadPool =
79 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
80 private final FakeTcpServer fcpServer;
81 private final DefaultFcpClient fcpClient;
83 public DefaultFcpClientTest() throws IOException {
84 fcpServer = new FakeTcpServer(threadPool);
85 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
89 public void tearDown() throws IOException {
91 threadPool.shutdown();
94 private void connectNode() throws InterruptedException, ExecutionException, IOException {
95 fcpServer.connect().get();
96 fcpServer.collectUntil(is("EndMessage"));
97 fcpServer.writeLine("NodeHello",
98 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
99 "Revision=build01466",
101 "Version=Fred,0.7,1.0,1466",
103 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
107 "NodeLanguage=ENGLISH",
113 private String extractIdentifier(List<String> lines) {
114 return lines.stream()
115 .filter(s -> s.startsWith("Identifier="))
116 .map(s -> s.substring(s.indexOf('=') + 1))
121 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
122 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
125 private Matcher<Iterable<String>> hasHead(String firstElement) {
126 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
128 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
129 if (!iterable.iterator().hasNext()) {
130 mismatchDescription.appendText("is empty");
133 String element = iterable.iterator().next();
134 if (!element.equals(firstElement)) {
135 mismatchDescription.appendText("starts with ").appendValue(element);
142 public void describeTo(Description description) {
143 description.appendText("starts with ").appendValue(firstElement);
148 private Matcher<List<String>> matchesFcpMessageWithTerminator(
149 String name, String terminator, String... requiredLines) {
150 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
153 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
154 return new TypeSafeDiagnosingMatcher<List<String>>() {
156 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
157 if (item.size() < (ignoreStart + ignoreEnd)) {
158 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
161 for (String line : lines) {
162 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
163 mismatchDescription.appendText("does not contains ").appendValue(line);
171 public void describeTo(Description description) {
172 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
173 description.appendText(", ignoring the first ").appendValue(ignoreStart);
174 description.appendText(" and the last ").appendValue(ignoreEnd);
179 private Matcher<List<String>> hasTail(String... lastElements) {
180 return new TypeSafeDiagnosingMatcher<List<String>>() {
182 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
183 if (list.size() < lastElements.length) {
184 mismatchDescription.appendText("is too small");
187 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
188 if (!tail.equals(Arrays.asList(lastElements))) {
189 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
196 public void describeTo(Description description) {
197 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
202 private List<String> lines;
203 private String identifier;
205 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
206 throws InterruptedException, ExecutionException, IOException {
208 readMessage(requestMatcher);
211 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
212 readMessage("EndMessage", requestMatcher);
215 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
216 lines = fcpServer.collectUntil(is(terminator));
217 identifier = extractIdentifier(lines);
218 assertThat(lines, requestMatcher.get());
221 private void replyWithProtocolError() throws IOException {
224 "Identifier=" + identifier,
229 public class ConnectionsAndKeyPairs {
231 public class Connections {
233 @Test(expected = ExecutionException.class)
234 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
235 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
236 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
238 "CloseConnectionDuplicateClientName",
244 @Test(expected = ExecutionException.class)
245 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
246 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
247 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
253 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
254 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
255 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
258 keyPair = fcpClient.generateKeypair().execute();
259 readMessage(() -> matchesFcpMessage("GenerateSSK"));
260 identifier = extractIdentifier(lines);
266 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
267 throws InterruptedException, ExecutionException, IOException {
268 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
269 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
274 } catch (ExecutionException e) {
277 keyPair = fcpClient.generateKeypair().execute();
278 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
285 public class GenerateKeyPair {
288 public void defaultFcpClientCanGenerateKeypair()
289 throws ExecutionException, InterruptedException, IOException {
290 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
291 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
293 FcpKeyPair keyPair = keyPairFuture.get();
294 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
295 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
300 private void replyWithKeyPair() throws IOException {
301 fcpServer.writeLine("SSKKeypair",
302 "InsertURI=" + INSERT_URI + "",
303 "RequestURI=" + REQUEST_URI + "",
304 "Identifier=" + identifier,
312 public class PeerCommands {
314 public class ListPeer {
317 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
318 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
319 connectAndAssert(() -> matchesListPeer("id1"));
320 replyWithPeer("id1");
321 assertThat(peer.get().get().getIdentity(), is("id1"));
325 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
326 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
327 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
328 replyWithPeer("id1");
329 assertThat(peer.get().get().getIdentity(), is("id1"));
333 public void byName() throws InterruptedException, ExecutionException, IOException {
334 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
335 connectAndAssert(() -> matchesListPeer("FriendNode"));
336 replyWithPeer("id1");
337 assertThat(peer.get().get().getIdentity(), is("id1"));
341 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
342 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
343 connectAndAssert(() -> matchesListPeer("id2"));
344 replyWithUnknownNodeIdentifier();
345 assertThat(peer.get().isPresent(), is(false));
348 private Matcher<List<String>> matchesListPeer(String nodeId) {
349 return matchesFcpMessage(
351 "Identifier=" + identifier,
352 "NodeIdentifier=" + nodeId
358 public class ListPeers {
361 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
362 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
363 connectAndAssert(() -> matchesListPeers(false, false));
364 replyWithPeer("id1");
365 replyWithPeer("id2");
367 assertThat(peers.get(), hasSize(2));
368 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
369 containsInAnyOrder("id1", "id2"));
373 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
374 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
375 connectAndAssert(() -> matchesListPeers(false, true));
376 replyWithPeer("id1", "metadata.foo=bar1");
377 replyWithPeer("id2", "metadata.foo=bar2");
379 assertThat(peers.get(), hasSize(2));
380 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
381 containsInAnyOrder("bar1", "bar2"));
385 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
386 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
387 connectAndAssert(() -> matchesListPeers(true, false));
388 replyWithPeer("id1", "volatile.foo=bar1");
389 replyWithPeer("id2", "volatile.foo=bar2");
391 assertThat(peers.get(), hasSize(2));
392 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
393 containsInAnyOrder("bar1", "bar2"));
396 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
397 return matchesFcpMessage(
399 "WithVolatile=" + withVolatile,
400 "WithMetadata=" + withMetadata
404 private void sendEndOfPeerList() throws IOException {
407 "Identifier=" + identifier,
414 public class AddPeer {
417 public void fromFile() throws InterruptedException, ExecutionException, IOException {
418 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
419 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
420 replyWithPeer("id1");
421 assertThat(peer.get().get().getIdentity(), is("id1"));
425 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
426 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
427 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
428 replyWithPeer("id1");
429 assertThat(peer.get().get().getIdentity(), is("id1"));
433 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
434 NodeRef nodeRef = createNodeRef();
435 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
436 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
442 "dsaGroup.q=subprime",
443 "dsaPubKey.y=dsa-public",
444 "physical.udp=1.2.3.4:5678",
448 replyWithPeer("id1");
449 assertThat(peer.get().get().getIdentity(), is("id1"));
453 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
454 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
455 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
456 replyWithProtocolError();
457 assertThat(peer.get().isPresent(), is(false));
460 private NodeRef createNodeRef() {
461 NodeRef nodeRef = new NodeRef();
462 nodeRef.setIdentity("id1");
463 nodeRef.setName("name");
464 nodeRef.setARK(new ARK("public", "1"));
465 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
466 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
467 nodeRef.setPhysicalUDP("1.2.3.4:5678");
468 nodeRef.setDSAPublicKey("dsa-public");
469 nodeRef.setSignature("sig");
473 private Matcher<List<String>> matchesAddPeer() {
474 return matchesFcpMessage(
476 "Identifier=" + identifier
482 public class ModifyPeer {
485 public void defaultFcpClientCanEnablePeerByName()
486 throws InterruptedException, ExecutionException, IOException {
487 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
488 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
489 replyWithPeer("id1");
490 assertThat(peer.get().get().getIdentity(), is("id1"));
494 public void defaultFcpClientCanDisablePeerByName()
495 throws InterruptedException, ExecutionException, IOException {
496 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
497 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
498 replyWithPeer("id1");
499 assertThat(peer.get().get().getIdentity(), is("id1"));
503 public void defaultFcpClientCanEnablePeerByIdentity()
504 throws InterruptedException, ExecutionException, IOException {
505 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
506 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
507 replyWithPeer("id1");
508 assertThat(peer.get().get().getIdentity(), is("id1"));
512 public void defaultFcpClientCanEnablePeerByHostAndPort()
513 throws InterruptedException, ExecutionException, IOException {
514 Future<Optional<Peer>> peer =
515 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
516 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
517 replyWithPeer("id1");
518 assertThat(peer.get().get().getIdentity(), is("id1"));
522 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
523 Future<Optional<Peer>> peer =
524 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
525 connectAndAssert(() -> allOf(
526 matchesModifyPeer("id1", "AllowLocalAddresses", true),
527 not(contains(startsWith("IsDisabled=")))
529 replyWithPeer("id1");
530 assertThat(peer.get().get().getIdentity(), is("id1"));
534 public void disallowLocalAddressesOfPeer()
535 throws InterruptedException, ExecutionException, IOException {
536 Future<Optional<Peer>> peer =
537 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
538 connectAndAssert(() -> allOf(
539 matchesModifyPeer("id1", "AllowLocalAddresses", false),
540 not(contains(startsWith("IsDisabled=")))
542 replyWithPeer("id1");
543 assertThat(peer.get().get().getIdentity(), is("id1"));
547 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
548 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
549 connectAndAssert(() -> allOf(
550 matchesModifyPeer("id1", "IsBurstOnly", true),
551 not(contains(startsWith("AllowLocalAddresses="))),
552 not(contains(startsWith("IsDisabled=")))
554 replyWithPeer("id1");
555 assertThat(peer.get().get().getIdentity(), is("id1"));
559 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
560 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
561 connectAndAssert(() -> allOf(
562 matchesModifyPeer("id1", "IsBurstOnly", false),
563 not(contains(startsWith("AllowLocalAddresses="))),
564 not(contains(startsWith("IsDisabled=")))
566 replyWithPeer("id1");
567 assertThat(peer.get().get().getIdentity(), is("id1"));
571 public void defaultFcpClientCanSetListenOnlyForPeer()
572 throws InterruptedException, ExecutionException, IOException {
573 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
574 connectAndAssert(() -> allOf(
575 matchesModifyPeer("id1", "IsListenOnly", true),
576 not(contains(startsWith("AllowLocalAddresses="))),
577 not(contains(startsWith("IsDisabled="))),
578 not(contains(startsWith("IsBurstOnly=")))
580 replyWithPeer("id1");
581 assertThat(peer.get().get().getIdentity(), is("id1"));
585 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
586 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
587 connectAndAssert(() -> allOf(
588 matchesModifyPeer("id1", "IsListenOnly", false),
589 not(contains(startsWith("AllowLocalAddresses="))),
590 not(contains(startsWith("IsDisabled="))),
591 not(contains(startsWith("IsBurstOnly=")))
593 replyWithPeer("id1");
594 assertThat(peer.get().get().getIdentity(), is("id1"));
598 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
599 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
600 connectAndAssert(() -> allOf(
601 matchesModifyPeer("id1", "IgnoreSourcePort", true),
602 not(contains(startsWith("AllowLocalAddresses="))),
603 not(contains(startsWith("IsDisabled="))),
604 not(contains(startsWith("IsBurstOnly="))),
605 not(contains(startsWith("IsListenOnly=")))
607 replyWithPeer("id1");
608 assertThat(peer.get().get().getIdentity(), is("id1"));
612 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
613 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
614 connectAndAssert(() -> allOf(
615 matchesModifyPeer("id1", "IgnoreSourcePort", false),
616 not(contains(startsWith("AllowLocalAddresses="))),
617 not(contains(startsWith("IsDisabled="))),
618 not(contains(startsWith("IsBurstOnly="))),
619 not(contains(startsWith("IsListenOnly=")))
621 replyWithPeer("id1");
622 assertThat(peer.get().get().getIdentity(), is("id1"));
626 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
627 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
628 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
629 replyWithUnknownNodeIdentifier();
630 assertThat(peer.get().isPresent(), is(false));
633 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
634 return matchesFcpMessage(
636 "Identifier=" + identifier,
637 "NodeIdentifier=" + nodeIdentifier,
638 setting + "=" + value
644 public class RemovePeer {
647 public void byName() throws InterruptedException, ExecutionException, IOException {
648 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
649 connectAndAssert(() -> matchesRemovePeer("Friend1"));
650 replyWithPeerRemoved("Friend1");
651 assertThat(peer.get(), is(true));
655 public void invalidName() throws InterruptedException, ExecutionException, IOException {
656 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
657 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
658 replyWithUnknownNodeIdentifier();
659 assertThat(peer.get(), is(false));
663 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
664 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
665 connectAndAssert(() -> matchesRemovePeer("id1"));
666 replyWithPeerRemoved("id1");
667 assertThat(peer.get(), is(true));
671 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
672 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
673 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
674 replyWithPeerRemoved("Friend1");
675 assertThat(peer.get(), is(true));
678 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
679 return matchesFcpMessage(
681 "Identifier=" + identifier,
682 "NodeIdentifier=" + nodeIdentifier
686 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
689 "Identifier=" + identifier,
690 "NodeIdentifier=" + nodeIdentifier,
697 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
700 "Identifier=" + identifier,
701 "identity=" + peerId,
703 "ark.pubURI=SSK@3YEf.../ark",
706 "version=Fred,0.7,1.0,1466",
707 "lastGoodVersion=Fred,0.7,1.0,1466"
709 fcpServer.writeLine(additionalLines);
710 fcpServer.writeLine("EndMessage");
715 public class PeerNoteCommands {
717 public class ListPeerNotes {
720 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
721 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
722 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
723 replyWithUnknownNodeIdentifier();
724 assertThat(peerNote.get().isPresent(), is(false));
728 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
729 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
730 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
732 replyWithEndListPeerNotes();
733 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
734 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
738 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
739 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
740 connectAndAssert(() -> matchesListPeerNotes("id1"));
742 replyWithEndListPeerNotes();
743 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
744 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
748 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
749 Future<Optional<PeerNote>> peerNote =
750 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
751 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
753 replyWithEndListPeerNotes();
754 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
755 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
758 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
759 return matchesFcpMessage(
761 "NodeIdentifier=" + nodeIdentifier
765 private void replyWithEndListPeerNotes() throws IOException {
768 "Identifier=" + identifier,
773 private void replyWithPeerNote() throws IOException {
776 "Identifier=" + identifier,
777 "NodeIdentifier=Friend1",
778 "NoteText=RXhhbXBsZSBUZXh0Lg==",
786 public class ModifyPeerNotes {
789 public void byName() throws InterruptedException, ExecutionException, IOException {
790 Future<Boolean> noteUpdated =
791 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
792 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
794 assertThat(noteUpdated.get(), is(true));
798 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
799 Future<Boolean> noteUpdated =
800 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
801 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
802 replyWithUnknownNodeIdentifier();
803 assertThat(noteUpdated.get(), is(false));
807 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
808 throws InterruptedException, ExecutionException, IOException {
809 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
810 assertThat(noteUpdated.get(), is(false));
814 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
815 Future<Boolean> noteUpdated =
816 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
817 connectAndAssert(() -> matchesModifyPeerNote("id1"));
819 assertThat(noteUpdated.get(), is(true));
823 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
824 Future<Boolean> noteUpdated =
825 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
826 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
828 assertThat(noteUpdated.get(), is(true));
831 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
832 return matchesFcpMessage(
834 "Identifier=" + identifier,
835 "NodeIdentifier=" + nodeIdentifier,
841 private void replyWithPeerNote() throws IOException {
844 "Identifier=" + identifier,
845 "NodeIdentifier=Friend1",
856 private void replyWithUnknownNodeIdentifier() throws IOException {
858 "UnknownNodeIdentifier",
859 "Identifier=" + identifier,
860 "NodeIdentifier=id2",
867 public class PluginCommands {
869 private static final String CLASS_NAME = "foo.plugin.Plugin";
871 private void replyWithPluginInfo() throws IOException {
874 "Identifier=" + identifier,
875 "PluginName=superPlugin",
879 "OriginUri=superPlugin",
885 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
886 throws InterruptedException, ExecutionException {
887 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
888 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
889 assertThat(pluginInfo.get().get().isTalkable(), is(true));
890 assertThat(pluginInfo.get().get().getVersion(), is("42"));
891 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
892 assertThat(pluginInfo.get().get().isStarted(), is(true));
895 public class LoadPlugin {
897 public class OfficialPlugins {
900 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
901 Future<Optional<PluginInfo>> pluginInfo =
902 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
903 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
904 assertThat(lines, not(contains(startsWith("Store="))));
905 replyWithPluginInfo();
906 verifyPluginInfo(pluginInfo);
910 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
911 Future<Optional<PluginInfo>> pluginInfo =
912 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
913 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
914 assertThat(lines, hasItem("Store=true"));
915 replyWithPluginInfo();
916 verifyPluginInfo(pluginInfo);
920 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
921 Future<Optional<PluginInfo>> pluginInfo =
922 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
923 connectAndAssert(() -> createMatcherForOfficialSource("https"));
924 replyWithPluginInfo();
925 verifyPluginInfo(pluginInfo);
928 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
929 return matchesFcpMessage(
931 "Identifier=" + identifier,
932 "PluginURL=superPlugin",
934 "OfficialSource=" + officialSource
940 public class FromOtherSources {
942 private static final String FILE_PATH = "/path/to/plugin.jar";
943 private static final String URL = "http://server.com/plugin.jar";
944 private static final String KEY = "KSK@plugin.jar";
947 public void fromFile() throws ExecutionException, InterruptedException, IOException {
948 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
949 connectAndAssert(() -> createMatcher("file", FILE_PATH));
950 replyWithPluginInfo();
951 verifyPluginInfo(pluginInfo);
955 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
956 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
957 connectAndAssert(() -> createMatcher("url", URL));
958 replyWithPluginInfo();
959 verifyPluginInfo(pluginInfo);
963 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
964 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
965 connectAndAssert(() -> createMatcher("freenet", KEY));
966 replyWithPluginInfo();
967 verifyPluginInfo(pluginInfo);
970 private Matcher<List<String>> createMatcher(String urlType, String url) {
971 return matchesFcpMessage(
973 "Identifier=" + identifier,
981 public class Failed {
984 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
985 Future<Optional<PluginInfo>> pluginInfo =
986 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
987 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
988 replyWithProtocolError();
989 assertThat(pluginInfo.get().isPresent(), is(false));
996 public class ReloadPlugin {
999 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1000 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1001 connectAndAssert(this::matchReloadPluginMessage);
1002 replyWithPluginInfo();
1003 verifyPluginInfo(pluginInfo);
1007 public void reloadingPluginWithMaxWaitTimeWorks()
1008 throws InterruptedException, ExecutionException, IOException {
1009 Future<Optional<PluginInfo>> pluginInfo =
1010 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1011 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1012 replyWithPluginInfo();
1013 verifyPluginInfo(pluginInfo);
1017 public void reloadingPluginWithPurgeWorks()
1018 throws InterruptedException, ExecutionException, IOException {
1019 Future<Optional<PluginInfo>> pluginInfo =
1020 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1021 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1022 replyWithPluginInfo();
1023 verifyPluginInfo(pluginInfo);
1027 public void reloadingPluginWithStoreWorks()
1028 throws InterruptedException, ExecutionException, IOException {
1029 Future<Optional<PluginInfo>> pluginInfo =
1030 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1031 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1032 replyWithPluginInfo();
1033 verifyPluginInfo(pluginInfo);
1037 public void protocolErrorIsRecognizedAsFailure()
1038 throws InterruptedException, ExecutionException, IOException {
1039 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1040 connectAndAssert(() -> matchReloadPluginMessage());
1041 replyWithProtocolError();
1042 assertThat(pluginInfo.get().isPresent(), is(false));
1045 private Matcher<List<String>> matchReloadPluginMessage() {
1046 return matchesFcpMessage(
1048 "Identifier=" + identifier,
1049 "PluginName=" + CLASS_NAME
1055 public class RemovePlugin {
1058 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1059 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1060 connectAndAssert(this::matchPluginRemovedMessage);
1061 replyWithPluginRemoved();
1062 assertThat(pluginRemoved.get(), is(true));
1066 public void removingPluginWithMaxWaitTimeWorks()
1067 throws InterruptedException, ExecutionException, IOException {
1068 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1069 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1070 replyWithPluginRemoved();
1071 assertThat(pluginRemoved.get(), is(true));
1075 public void removingPluginWithPurgeWorks()
1076 throws InterruptedException, ExecutionException, IOException {
1077 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1078 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1079 replyWithPluginRemoved();
1080 assertThat(pluginRemoved.get(), is(true));
1083 private void replyWithPluginRemoved() throws IOException {
1084 fcpServer.writeLine(
1086 "Identifier=" + identifier,
1087 "PluginName=" + CLASS_NAME,
1092 private Matcher<List<String>> matchPluginRemovedMessage() {
1093 return matchesFcpMessage(
1095 "Identifier=" + identifier,
1096 "PluginName=" + CLASS_NAME
1102 public class GetPluginInfo {
1105 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1106 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1107 connectAndAssert(this::matchGetPluginInfoMessage);
1108 replyWithPluginInfo();
1109 verifyPluginInfo(pluginInfo);
1113 public void gettingPluginInfoWithDetailsWorks()
1114 throws InterruptedException, ExecutionException, IOException {
1115 Future<Optional<PluginInfo>> pluginInfo =
1116 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1117 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1118 replyWithPluginInfo();
1119 verifyPluginInfo(pluginInfo);
1123 public void protocolErrorIsRecognizedAsFailure()
1124 throws InterruptedException, ExecutionException, IOException {
1125 Future<Optional<PluginInfo>> pluginInfo =
1126 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1127 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1128 replyWithProtocolError();
1129 assertThat(pluginInfo.get(), is(Optional.empty()));
1132 private Matcher<List<String>> matchGetPluginInfoMessage() {
1133 return matchesFcpMessage(
1135 "Identifier=" + identifier,
1136 "PluginName=" + CLASS_NAME
1144 public class UskSubscriptionCommands {
1146 private static final String URI = "USK@some,uri/file.txt";
1149 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1150 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1151 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1152 replyWithSubscribed();
1153 assertThat(uskSubscription.get().get().getUri(), is(URI));
1154 AtomicInteger edition = new AtomicInteger();
1155 CountDownLatch updated = new CountDownLatch(2);
1156 uskSubscription.get().get().onUpdate(e -> {
1158 updated.countDown();
1160 sendUpdateNotification(23);
1161 sendUpdateNotification(24);
1162 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1163 assertThat(edition.get(), is(24));
1167 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1168 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1169 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1170 replyWithSubscribed();
1171 assertThat(uskSubscription.get().get().getUri(), is(URI));
1172 AtomicInteger edition = new AtomicInteger();
1173 CountDownLatch updated = new CountDownLatch(2);
1174 uskSubscription.get().get().onUpdate(e -> {
1176 updated.countDown();
1178 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1179 sendUpdateNotification(23);
1180 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1181 assertThat(edition.get(), is(23));
1185 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1186 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1187 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1188 replyWithSubscribed();
1189 assertThat(uskSubscription.get().get().getUri(), is(URI));
1190 AtomicBoolean updated = new AtomicBoolean();
1191 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1192 uskSubscription.get().get().cancel();
1193 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1194 sendUpdateNotification(23);
1195 assertThat(updated.get(), is(false));
1198 private void replyWithSubscribed() throws IOException {
1199 fcpServer.writeLine(
1201 "Identifier=" + identifier,
1208 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1209 fcpServer.writeLine(
1210 "SubscribedUSKUpdate",
1211 "Identifier=" + identifier,
1213 "Edition=" + edition
1215 fcpServer.writeLine(additionalLines);
1216 fcpServer.writeLine("EndMessage");
1221 public class ClientGet {
1224 public void works() throws InterruptedException, ExecutionException, IOException {
1225 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1226 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1227 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1228 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1229 Optional<Data> data = dataFuture.get();
1234 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1235 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1236 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1237 replyWithGetFailed("not-test");
1238 replyWithGetFailed(identifier);
1239 Optional<Data> data = dataFuture.get();
1240 assertThat(data.isPresent(), is(false));
1244 public void getFailedForDifferentIdentifierIsIgnored()
1245 throws InterruptedException, ExecutionException, IOException {
1246 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1247 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1248 replyWithGetFailed("not-test");
1249 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1250 Optional<Data> data = dataFuture.get();
1254 @Test(expected = ExecutionException.class)
1255 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1256 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1257 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1263 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1264 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1265 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1269 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1270 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1271 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1275 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1276 throws InterruptedException, ExecutionException, IOException {
1277 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1278 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1282 public void clientGetWithPrioritySettingSendsCorrectCommands()
1283 throws InterruptedException, ExecutionException, IOException {
1284 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1285 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1289 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1290 throws InterruptedException, ExecutionException, IOException {
1291 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1292 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1296 public void clientGetWithGlobalSettingSendsCorrectCommands()
1297 throws InterruptedException, ExecutionException, IOException {
1298 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1299 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1302 private void replyWithGetFailed(String identifier) throws IOException {
1303 fcpServer.writeLine(
1305 "Identifier=" + identifier,
1311 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1312 fcpServer.writeLine(
1314 "Identifier=" + identifier,
1315 "DataLength=" + (text.length() + 1),
1316 "StartupTime=1435610539000",
1317 "CompletionTime=1435610540000",
1318 "Metadata.ContentType=" + contentType,
1324 private void verifyData(Optional<Data> data) throws IOException {
1325 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1326 assertThat(data.get().size(), is(6L));
1327 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1328 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1333 public class PutCommands {
1335 public class ClientPut {
1338 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1339 fcpClient.clientPut()
1340 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1345 readMessage("Hello", this::matchesDirectClientPut);
1349 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1350 Future<Optional<Key>> key = fcpClient.clientPut()
1351 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1356 readMessage("Hello", this::matchesDirectClientPut);
1357 replyWithPutFailed("not-the-right-one");
1358 replyWithPutSuccessful(identifier);
1359 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1363 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1364 Future<Optional<Key>> key = fcpClient.clientPut()
1365 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1370 readMessage("Hello", this::matchesDirectClientPut);
1371 replyWithPutSuccessful("not-the-right-one");
1372 replyWithPutFailed(identifier);
1373 assertThat(key.get().isPresent(), is(false));
1377 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1378 fcpClient.clientPut()
1379 .named("otherName.txt")
1380 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1385 readMessage("Hello", () -> allOf(
1386 hasHead("ClientPut"),
1387 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1389 hasTail("EndMessage", "Hello")
1394 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1395 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1396 connectAndAssert(() ->
1397 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1401 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1402 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1403 connectAndAssert(() ->
1404 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1409 private final File ddaFile;
1410 private final File fileToUpload;
1412 public DDA() throws IOException {
1413 ddaFile = createDdaFile();
1414 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1417 private Matcher<List<String>> matchesFileClientPut(File file) {
1418 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1422 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1423 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1424 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1425 sendDdaRequired(identifier);
1426 readMessage(() -> matchesTestDDARequest(ddaFile));
1427 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1428 readMessage(() -> matchesTestDDAResponse(ddaFile));
1429 writeTestDDAComplete(ddaFile);
1430 readMessage(() -> matchesFileClientPut(fileToUpload));
1434 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1435 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1436 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1437 sendDdaRequired(identifier);
1438 readMessage(() -> matchesTestDDARequest(ddaFile));
1439 sendTestDDAReply("/some-other-directory", ddaFile);
1440 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1441 readMessage(() -> matchesTestDDAResponse(ddaFile));
1445 public void sendResponseIfFileUnreadable()
1446 throws IOException, ExecutionException, InterruptedException {
1447 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1448 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1449 sendDdaRequired(identifier);
1450 readMessage(() -> matchesTestDDARequest(ddaFile));
1451 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1452 readMessage(this::matchesFailedToReadResponse);
1456 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1457 throws IOException, ExecutionException, InterruptedException {
1458 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1460 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1461 String identifier = extractIdentifier(lines);
1462 fcpServer.writeLine(
1464 "Directory=/some-other-directory",
1467 sendDdaRequired(identifier);
1468 lines = fcpServer.collectUntil(is("EndMessage"));
1469 assertThat(lines, matchesFcpMessage(
1471 "Directory=" + ddaFile.getParent(),
1472 "WantReadDirectory=true",
1473 "WantWriteDirectory=false"
1477 private Matcher<List<String>> matchesFailedToReadResponse() {
1478 return matchesFcpMessage(
1480 "Directory=" + ddaFile.getParent(),
1481 "ReadContent=failed-to-read"
1485 private void writeTestDDAComplete(File tempFile) throws IOException {
1486 fcpServer.writeLine(
1488 "Directory=" + tempFile.getParent(),
1489 "ReadDirectoryAllowed=true",
1494 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1495 return matchesFcpMessage(
1497 "Directory=" + tempFile.getParent(),
1498 "ReadContent=test-content"
1502 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1503 fcpServer.writeLine(
1505 "Directory=" + directory,
1506 "ReadFilename=" + tempFile,
1511 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1512 return matchesFcpMessage(
1514 "Directory=" + tempFile.getParent(),
1515 "WantReadDirectory=true",
1516 "WantWriteDirectory=false"
1520 private void sendDdaRequired(String identifier) throws IOException {
1521 fcpServer.writeLine(
1523 "Identifier=" + identifier,
1531 private void replyWithPutSuccessful(String identifier) throws IOException {
1532 fcpServer.writeLine(
1535 "Identifier=" + identifier,
1540 private void replyWithPutFailed(String identifier) throws IOException {
1541 fcpServer.writeLine(
1543 "Identifier=" + identifier,
1548 private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
1549 List<String> lines =
1550 new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
1551 Arrays.asList(additionalLines).forEach(lines::add);
1553 hasHead("ClientPut"),
1554 hasParameters(1, 2, lines.toArray(new String[lines.size()])),
1555 hasTail("EndMessage", "Hello")
1559 private File createDdaFile() throws IOException {
1560 File tempFile = File.createTempFile("test-dda-", ".dat");
1561 tempFile.deleteOnExit();
1562 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1567 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1568 throws InterruptedException, ExecutionException, IOException {
1569 Future<Optional<Key>> key =
1570 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1572 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1573 String identifier = extractIdentifier(lines);
1574 fcpServer.writeLine(
1576 "Identifier=not-the-right-one",
1580 fcpServer.writeLine(
1582 "Identifier=" + identifier,
1586 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1590 public void clientPutAbortsOnProtocolErrorOtherThan25()
1591 throws InterruptedException, ExecutionException, IOException {
1592 Future<Optional<Key>> key =
1593 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1595 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1596 String identifier = extractIdentifier(lines);
1597 fcpServer.writeLine(
1599 "Identifier=" + identifier,
1603 assertThat(key.get().isPresent(), is(false));
1607 public void clientPutSendsNotificationsForGeneratedKeys()
1608 throws InterruptedException, ExecutionException, IOException {
1609 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1610 Future<Optional<Key>> key = fcpClient.clientPut()
1611 .onKeyGenerated(generatedKeys::add)
1612 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1617 List<String> lines = fcpServer.collectUntil(is("Hello"));
1618 String identifier = extractIdentifier(lines);
1619 fcpServer.writeLine(
1621 "Identifier=" + identifier,
1625 replyWithPutSuccessful(identifier);
1626 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1627 assertThat(generatedKeys, contains("KSK@foo.txt"));
1631 public void clientPutSendsNotificationOnProgress()
1632 throws InterruptedException, ExecutionException, IOException {
1633 List<RequestProgress> requestProgress = new ArrayList<>();
1634 Future<Optional<Key>> key = fcpClient.clientPut()
1635 .onProgress(requestProgress::add)
1636 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1641 readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
1642 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1643 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1644 replyWithPutSuccessful(identifier);
1645 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1646 assertThat(requestProgress, contains(
1647 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1648 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1654 private void replyWithSimpleProgress(
1655 int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
1656 boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
1657 fcpServer.writeLine(
1659 "Identifier=" + identifier,
1661 "Required=" + required,
1663 "FatallyFailed=" + fatallyFailed,
1664 "Succeeded=" + succeeded,
1665 "LastProgress=" + lastProgress,
1666 "FinalizedTotal=" + finalizedTotal,
1667 "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
1672 public class ClientPutDiskDir {
1675 public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1676 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1677 connectAndAssert(this::matchesClientPutDiskDir);
1678 fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1679 assertThat(key.get().get().getKey(), is("CHK@abc"));
1683 public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
1684 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1685 connectAndAssert(this::matchesClientPutDiskDir);
1686 replyWithProtocolError();
1687 assertThat(key.get().isPresent(), is(false));
1691 public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
1692 List<RequestProgress> requestProgress = new ArrayList<>();
1693 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add)
1694 .fromDirectory(new File("")).uri("CHK@").execute();
1695 connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1"));
1696 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1697 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1698 fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1699 assertThat(key.get().get().getKey(), is("CHK@abc"));
1700 assertThat(requestProgress, contains(
1701 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1702 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1706 private Matcher<List<String>> matchesClientPutDiskDir(String... additionalLines) {
1707 List<String> lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath()));
1708 Arrays.asList(additionalLines).forEach(lines::add);
1709 return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()]));
1716 public class ConfigCommand {
1718 public class GetConfig {
1721 public void defaultFcpClientCanGetConfigWithoutDetails()
1722 throws InterruptedException, ExecutionException, IOException {
1723 Future<ConfigData> configData = fcpClient.getConfig().execute();
1724 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1725 replyWithConfigData();
1726 assertThat(configData.get(), notNullValue());
1730 public void defaultFcpClientCanGetConfigWithCurrent()
1731 throws InterruptedException, ExecutionException, IOException {
1732 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1733 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1734 replyWithConfigData("current.foo=bar");
1735 assertThat(configData.get().getCurrent("foo"), is("bar"));
1739 public void defaultFcpClientCanGetConfigWithDefaults()
1740 throws InterruptedException, ExecutionException, IOException {
1741 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1742 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1743 replyWithConfigData("default.foo=bar");
1744 assertThat(configData.get().getDefault("foo"), is("bar"));
1748 public void defaultFcpClientCanGetConfigWithSortOrder()
1749 throws InterruptedException, ExecutionException, IOException {
1750 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1751 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1752 replyWithConfigData("sortOrder.foo=17");
1753 assertThat(configData.get().getSortOrder("foo"), is(17));
1757 public void defaultFcpClientCanGetConfigWithExpertFlag()
1758 throws InterruptedException, ExecutionException, IOException {
1759 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1760 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1761 replyWithConfigData("expertFlag.foo=true");
1762 assertThat(configData.get().getExpertFlag("foo"), is(true));
1766 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1767 throws InterruptedException, ExecutionException, IOException {
1768 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1769 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1770 replyWithConfigData("forceWriteFlag.foo=true");
1771 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1775 public void defaultFcpClientCanGetConfigWithShortDescription()
1776 throws InterruptedException, ExecutionException, IOException {
1777 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1778 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1779 replyWithConfigData("shortDescription.foo=bar");
1780 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1784 public void defaultFcpClientCanGetConfigWithLongDescription()
1785 throws InterruptedException, ExecutionException, IOException {
1786 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1787 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1788 replyWithConfigData("longDescription.foo=bar");
1789 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1793 public void defaultFcpClientCanGetConfigWithDataTypes()
1794 throws InterruptedException, ExecutionException, IOException {
1795 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1796 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1797 replyWithConfigData("dataType.foo=number");
1798 assertThat(configData.get().getDataType("foo"), is("number"));
1801 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1802 return matchesFcpMessage(
1804 "Identifier=" + identifier,
1805 additionalParameter + "=true"
1811 public class ModifyConfig {
1814 public void defaultFcpClientCanModifyConfigData()
1815 throws InterruptedException, ExecutionException, IOException {
1816 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1817 connectAndAssert(() -> matchesFcpMessage(
1819 "Identifier=" + identifier,
1822 replyWithConfigData("current.foo.bar=baz");
1823 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1828 private void replyWithConfigData(String... additionalLines) throws IOException {
1829 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1830 fcpServer.writeLine(additionalLines);
1831 fcpServer.writeLine("EndMessage");
1836 public class NodeInformation {
1839 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1840 Future<NodeData> nodeData = fcpClient.getNode().execute();
1841 connectAndAssert(() -> matchesGetNode(false, false, false));
1842 replyWithNodeData();
1843 assertThat(nodeData.get(), notNullValue());
1844 assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1848 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1849 throws InterruptedException, ExecutionException, IOException {
1850 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1851 connectAndAssert(() -> matchesGetNode(true, false, false));
1852 replyWithNodeData("opennet=true");
1853 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1854 assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1858 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1859 throws InterruptedException, ExecutionException, IOException {
1860 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1861 connectAndAssert(() -> matchesGetNode(false, true, false));
1862 replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1863 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1867 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1868 throws InterruptedException, ExecutionException, IOException {
1869 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1870 connectAndAssert(() -> matchesGetNode(false, false, true));
1871 replyWithNodeData("volatile.freeJavaMemory=205706528");
1872 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1875 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1876 return matchesFcpMessage(
1878 "Identifier=" + identifier,
1879 "GiveOpennetRef=" + withOpennetRef,
1880 "WithPrivate=" + withPrivate,
1881 "WithVolatile=" + withVolatile
1885 private void replyWithNodeData(String... additionalLines) throws IOException {
1886 fcpServer.writeLine(
1888 "Identifier=" + identifier,
1889 "ark.pubURI=SSK@3YEf.../ark",
1892 "version=Fred,0.7,1.0,1466",
1893 "lastGoodVersion=Fred,0.7,1.0,1466"
1895 fcpServer.writeLine(additionalLines);
1896 fcpServer.writeLine("EndMessage");