1 package net.pterodactylus.fcp.quelaton;
3 import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
4 import static org.hamcrest.MatcherAssert.assertThat;
5 import static org.hamcrest.Matchers.allOf;
6 import static org.hamcrest.Matchers.contains;
7 import static org.hamcrest.Matchers.containsInAnyOrder;
8 import static org.hamcrest.Matchers.hasItem;
9 import static org.hamcrest.Matchers.hasSize;
10 import static org.hamcrest.Matchers.is;
11 import static org.hamcrest.Matchers.not;
12 import static org.hamcrest.Matchers.notNullValue;
13 import static org.hamcrest.Matchers.startsWith;
15 import java.io.ByteArrayInputStream;
17 import java.io.IOException;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
37 import net.pterodactylus.fcp.ARK;
38 import net.pterodactylus.fcp.ConfigData;
39 import net.pterodactylus.fcp.DSAGroup;
40 import net.pterodactylus.fcp.FcpKeyPair;
41 import net.pterodactylus.fcp.Key;
42 import net.pterodactylus.fcp.NodeData;
43 import net.pterodactylus.fcp.NodeRef;
44 import net.pterodactylus.fcp.Peer;
45 import net.pterodactylus.fcp.PeerNote;
46 import net.pterodactylus.fcp.PluginInfo;
47 import net.pterodactylus.fcp.Priority;
48 import net.pterodactylus.fcp.RequestProgress;
49 import net.pterodactylus.fcp.fake.FakeTcpServer;
50 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
52 import com.google.common.io.ByteStreams;
53 import com.google.common.io.Files;
54 import com.nitorcreations.junit.runners.NestedRunner;
55 import org.hamcrest.Description;
56 import org.hamcrest.Matcher;
57 import org.hamcrest.Matchers;
58 import org.hamcrest.TypeSafeDiagnosingMatcher;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Before;
62 import org.junit.Ignore;
63 import org.junit.Test;
64 import org.junit.rules.TemporaryFolder;
65 import org.junit.runner.RunWith;
68 * Unit test for {@link DefaultFcpClient}.
70 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
72 @RunWith(NestedRunner.class)
73 public class DefaultFcpClientTest {
75 private static final String INSERT_URI =
76 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
77 private static final String REQUEST_URI =
78 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
80 private int threadCounter = 0;
81 private final ExecutorService threadPool =
82 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
83 private final FakeTcpServer fcpServer;
84 private final DefaultFcpClient fcpClient;
86 public DefaultFcpClientTest() throws IOException {
87 fcpServer = new FakeTcpServer(threadPool);
88 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
92 public void tearDown() throws IOException {
94 threadPool.shutdown();
97 private void connectNode() throws InterruptedException, ExecutionException, IOException {
98 fcpServer.connect().get();
99 fcpServer.collectUntil(is("EndMessage"));
100 fcpServer.writeLine("NodeHello",
101 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
102 "Revision=build01466",
104 "Version=Fred,0.7,1.0,1466",
106 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
110 "NodeLanguage=ENGLISH",
116 private String extractIdentifier(List<String> lines) {
117 return lines.stream()
118 .filter(s -> s.startsWith("Identifier="))
119 .map(s -> s.substring(s.indexOf('=') + 1))
124 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
125 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
128 private Matcher<Iterable<String>> hasHead(String firstElement) {
129 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
131 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
132 if (!iterable.iterator().hasNext()) {
133 mismatchDescription.appendText("is empty");
136 String element = iterable.iterator().next();
137 if (!element.equals(firstElement)) {
138 mismatchDescription.appendText("starts with ").appendValue(element);
145 public void describeTo(Description description) {
146 description.appendText("starts with ").appendValue(firstElement);
151 private Matcher<List<String>> matchesFcpMessageWithTerminator(
152 String name, String terminator, String... requiredLines) {
153 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
156 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
157 return new TypeSafeDiagnosingMatcher<List<String>>() {
159 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
160 if (item.size() < (ignoreStart + ignoreEnd)) {
161 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
164 for (String line : lines) {
165 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
166 mismatchDescription.appendText("does not contains ").appendValue(line);
174 public void describeTo(Description description) {
175 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
176 description.appendText(", ignoring the first ").appendValue(ignoreStart);
177 description.appendText(" and the last ").appendValue(ignoreEnd);
182 private Matcher<List<String>> hasTail(String... lastElements) {
183 return new TypeSafeDiagnosingMatcher<List<String>>() {
185 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
186 if (list.size() < lastElements.length) {
187 mismatchDescription.appendText("is too small");
190 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
191 if (!tail.equals(Arrays.asList(lastElements))) {
192 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
199 public void describeTo(Description description) {
200 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
205 private List<String> lines;
206 private String identifier;
208 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
209 throws InterruptedException, ExecutionException, IOException {
211 readMessage(requestMatcher);
214 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
215 readMessage("EndMessage", requestMatcher);
218 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
219 lines = fcpServer.collectUntil(is(terminator));
220 identifier = extractIdentifier(lines);
221 assertThat(lines, requestMatcher.get());
224 private void replyWithProtocolError() throws IOException {
227 "Identifier=" + identifier,
232 public class ConnectionsAndKeyPairs {
234 public class Connections {
236 @Test(expected = ExecutionException.class)
237 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
238 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
239 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
241 "CloseConnectionDuplicateClientName",
247 @Test(expected = ExecutionException.class)
248 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
249 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
250 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
256 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
257 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
258 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
261 keyPair = fcpClient.generateKeypair().execute();
262 readMessage(() -> matchesFcpMessage("GenerateSSK"));
263 identifier = extractIdentifier(lines);
269 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
270 throws InterruptedException, ExecutionException, IOException {
271 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
272 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
277 } catch (ExecutionException e) {
280 keyPair = fcpClient.generateKeypair().execute();
281 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
288 public class GenerateKeyPair {
291 public void defaultFcpClientCanGenerateKeypair()
292 throws ExecutionException, InterruptedException, IOException {
293 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
294 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
296 FcpKeyPair keyPair = keyPairFuture.get();
297 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
298 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
303 private void replyWithKeyPair() throws IOException {
304 fcpServer.writeLine("SSKKeypair",
305 "InsertURI=" + INSERT_URI + "",
306 "RequestURI=" + REQUEST_URI + "",
307 "Identifier=" + identifier,
315 public class PeerCommands {
317 public class ListPeer {
320 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
321 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
322 connectAndAssert(() -> matchesListPeer("id1"));
323 replyWithPeer("id1");
324 assertThat(peer.get().get().getIdentity(), is("id1"));
328 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
329 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
330 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
331 replyWithPeer("id1");
332 assertThat(peer.get().get().getIdentity(), is("id1"));
336 public void byName() throws InterruptedException, ExecutionException, IOException {
337 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
338 connectAndAssert(() -> matchesListPeer("FriendNode"));
339 replyWithPeer("id1");
340 assertThat(peer.get().get().getIdentity(), is("id1"));
344 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
345 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
346 connectAndAssert(() -> matchesListPeer("id2"));
347 replyWithUnknownNodeIdentifier();
348 assertThat(peer.get().isPresent(), is(false));
351 private Matcher<List<String>> matchesListPeer(String nodeId) {
352 return matchesFcpMessage(
354 "Identifier=" + identifier,
355 "NodeIdentifier=" + nodeId
361 public class ListPeers {
364 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
365 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
366 connectAndAssert(() -> matchesListPeers(false, false));
367 replyWithPeer("id1");
368 replyWithPeer("id2");
370 assertThat(peers.get(), hasSize(2));
371 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
372 containsInAnyOrder("id1", "id2"));
376 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
377 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
378 connectAndAssert(() -> matchesListPeers(false, true));
379 replyWithPeer("id1", "metadata.foo=bar1");
380 replyWithPeer("id2", "metadata.foo=bar2");
382 assertThat(peers.get(), hasSize(2));
383 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
384 containsInAnyOrder("bar1", "bar2"));
388 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
389 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
390 connectAndAssert(() -> matchesListPeers(true, false));
391 replyWithPeer("id1", "volatile.foo=bar1");
392 replyWithPeer("id2", "volatile.foo=bar2");
394 assertThat(peers.get(), hasSize(2));
395 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
396 containsInAnyOrder("bar1", "bar2"));
399 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
400 return matchesFcpMessage(
402 "WithVolatile=" + withVolatile,
403 "WithMetadata=" + withMetadata
407 private void sendEndOfPeerList() throws IOException {
410 "Identifier=" + identifier,
417 public class AddPeer {
420 public void fromFile() throws InterruptedException, ExecutionException, IOException {
421 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
422 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
423 replyWithPeer("id1");
424 assertThat(peer.get().get().getIdentity(), is("id1"));
428 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
429 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
430 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
431 replyWithPeer("id1");
432 assertThat(peer.get().get().getIdentity(), is("id1"));
436 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
437 NodeRef nodeRef = createNodeRef();
438 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
439 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
445 "dsaGroup.q=subprime",
446 "dsaPubKey.y=dsa-public",
447 "physical.udp=1.2.3.4:5678",
451 replyWithPeer("id1");
452 assertThat(peer.get().get().getIdentity(), is("id1"));
456 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
457 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
458 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
459 replyWithProtocolError();
460 assertThat(peer.get().isPresent(), is(false));
463 private NodeRef createNodeRef() {
464 NodeRef nodeRef = new NodeRef();
465 nodeRef.setIdentity("id1");
466 nodeRef.setName("name");
467 nodeRef.setARK(new ARK("public", "1"));
468 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
469 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
470 nodeRef.setPhysicalUDP("1.2.3.4:5678");
471 nodeRef.setDSAPublicKey("dsa-public");
472 nodeRef.setSignature("sig");
476 private Matcher<List<String>> matchesAddPeer() {
477 return matchesFcpMessage(
479 "Identifier=" + identifier
485 public class ModifyPeer {
488 public void defaultFcpClientCanEnablePeerByName()
489 throws InterruptedException, ExecutionException, IOException {
490 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
491 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
492 replyWithPeer("id1");
493 assertThat(peer.get().get().getIdentity(), is("id1"));
497 public void defaultFcpClientCanDisablePeerByName()
498 throws InterruptedException, ExecutionException, IOException {
499 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
500 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
501 replyWithPeer("id1");
502 assertThat(peer.get().get().getIdentity(), is("id1"));
506 public void defaultFcpClientCanEnablePeerByIdentity()
507 throws InterruptedException, ExecutionException, IOException {
508 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
509 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
510 replyWithPeer("id1");
511 assertThat(peer.get().get().getIdentity(), is("id1"));
515 public void defaultFcpClientCanEnablePeerByHostAndPort()
516 throws InterruptedException, ExecutionException, IOException {
517 Future<Optional<Peer>> peer =
518 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
519 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
520 replyWithPeer("id1");
521 assertThat(peer.get().get().getIdentity(), is("id1"));
525 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
526 Future<Optional<Peer>> peer =
527 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
528 connectAndAssert(() -> allOf(
529 matchesModifyPeer("id1", "AllowLocalAddresses", true),
530 not(contains(startsWith("IsDisabled=")))
532 replyWithPeer("id1");
533 assertThat(peer.get().get().getIdentity(), is("id1"));
537 public void disallowLocalAddressesOfPeer()
538 throws InterruptedException, ExecutionException, IOException {
539 Future<Optional<Peer>> peer =
540 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
541 connectAndAssert(() -> allOf(
542 matchesModifyPeer("id1", "AllowLocalAddresses", false),
543 not(contains(startsWith("IsDisabled=")))
545 replyWithPeer("id1");
546 assertThat(peer.get().get().getIdentity(), is("id1"));
550 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
551 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
552 connectAndAssert(() -> allOf(
553 matchesModifyPeer("id1", "IsBurstOnly", true),
554 not(contains(startsWith("AllowLocalAddresses="))),
555 not(contains(startsWith("IsDisabled=")))
557 replyWithPeer("id1");
558 assertThat(peer.get().get().getIdentity(), is("id1"));
562 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
563 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
564 connectAndAssert(() -> allOf(
565 matchesModifyPeer("id1", "IsBurstOnly", false),
566 not(contains(startsWith("AllowLocalAddresses="))),
567 not(contains(startsWith("IsDisabled=")))
569 replyWithPeer("id1");
570 assertThat(peer.get().get().getIdentity(), is("id1"));
574 public void defaultFcpClientCanSetListenOnlyForPeer()
575 throws InterruptedException, ExecutionException, IOException {
576 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
577 connectAndAssert(() -> allOf(
578 matchesModifyPeer("id1", "IsListenOnly", true),
579 not(contains(startsWith("AllowLocalAddresses="))),
580 not(contains(startsWith("IsDisabled="))),
581 not(contains(startsWith("IsBurstOnly=")))
583 replyWithPeer("id1");
584 assertThat(peer.get().get().getIdentity(), is("id1"));
588 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
589 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
590 connectAndAssert(() -> allOf(
591 matchesModifyPeer("id1", "IsListenOnly", false),
592 not(contains(startsWith("AllowLocalAddresses="))),
593 not(contains(startsWith("IsDisabled="))),
594 not(contains(startsWith("IsBurstOnly=")))
596 replyWithPeer("id1");
597 assertThat(peer.get().get().getIdentity(), is("id1"));
601 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
602 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
603 connectAndAssert(() -> allOf(
604 matchesModifyPeer("id1", "IgnoreSourcePort", true),
605 not(contains(startsWith("AllowLocalAddresses="))),
606 not(contains(startsWith("IsDisabled="))),
607 not(contains(startsWith("IsBurstOnly="))),
608 not(contains(startsWith("IsListenOnly=")))
610 replyWithPeer("id1");
611 assertThat(peer.get().get().getIdentity(), is("id1"));
615 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
616 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
617 connectAndAssert(() -> allOf(
618 matchesModifyPeer("id1", "IgnoreSourcePort", false),
619 not(contains(startsWith("AllowLocalAddresses="))),
620 not(contains(startsWith("IsDisabled="))),
621 not(contains(startsWith("IsBurstOnly="))),
622 not(contains(startsWith("IsListenOnly=")))
624 replyWithPeer("id1");
625 assertThat(peer.get().get().getIdentity(), is("id1"));
629 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
630 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
631 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
632 replyWithUnknownNodeIdentifier();
633 assertThat(peer.get().isPresent(), is(false));
636 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
637 return matchesFcpMessage(
639 "Identifier=" + identifier,
640 "NodeIdentifier=" + nodeIdentifier,
641 setting + "=" + value
647 public class RemovePeer {
650 public void byName() throws InterruptedException, ExecutionException, IOException {
651 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
652 connectAndAssert(() -> matchesRemovePeer("Friend1"));
653 replyWithPeerRemoved("Friend1");
654 assertThat(peer.get(), is(true));
658 public void invalidName() throws InterruptedException, ExecutionException, IOException {
659 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
660 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
661 replyWithUnknownNodeIdentifier();
662 assertThat(peer.get(), is(false));
666 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
667 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
668 connectAndAssert(() -> matchesRemovePeer("id1"));
669 replyWithPeerRemoved("id1");
670 assertThat(peer.get(), is(true));
674 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
675 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
676 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
677 replyWithPeerRemoved("Friend1");
678 assertThat(peer.get(), is(true));
681 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
682 return matchesFcpMessage(
684 "Identifier=" + identifier,
685 "NodeIdentifier=" + nodeIdentifier
689 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
692 "Identifier=" + identifier,
693 "NodeIdentifier=" + nodeIdentifier,
700 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
703 "Identifier=" + identifier,
704 "identity=" + peerId,
706 "ark.pubURI=SSK@3YEf.../ark",
709 "version=Fred,0.7,1.0,1466",
710 "lastGoodVersion=Fred,0.7,1.0,1466"
712 fcpServer.writeLine(additionalLines);
713 fcpServer.writeLine("EndMessage");
718 public class PeerNoteCommands {
720 public class ListPeerNotes {
723 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
724 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
725 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
726 replyWithUnknownNodeIdentifier();
727 assertThat(peerNote.get().isPresent(), is(false));
731 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
732 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
733 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
735 replyWithEndListPeerNotes();
736 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
737 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
741 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
742 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
743 connectAndAssert(() -> matchesListPeerNotes("id1"));
745 replyWithEndListPeerNotes();
746 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
747 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
751 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
752 Future<Optional<PeerNote>> peerNote =
753 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
754 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
756 replyWithEndListPeerNotes();
757 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
758 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
761 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
762 return matchesFcpMessage(
764 "NodeIdentifier=" + nodeIdentifier
768 private void replyWithEndListPeerNotes() throws IOException {
771 "Identifier=" + identifier,
776 private void replyWithPeerNote() throws IOException {
779 "Identifier=" + identifier,
780 "NodeIdentifier=Friend1",
781 "NoteText=RXhhbXBsZSBUZXh0Lg==",
789 public class ModifyPeerNotes {
792 public void byName() throws InterruptedException, ExecutionException, IOException {
793 Future<Boolean> noteUpdated =
794 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
795 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
797 assertThat(noteUpdated.get(), is(true));
801 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
802 Future<Boolean> noteUpdated =
803 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
804 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
805 replyWithUnknownNodeIdentifier();
806 assertThat(noteUpdated.get(), is(false));
810 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
811 throws InterruptedException, ExecutionException, IOException {
812 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
813 assertThat(noteUpdated.get(), is(false));
817 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
818 Future<Boolean> noteUpdated =
819 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
820 connectAndAssert(() -> matchesModifyPeerNote("id1"));
822 assertThat(noteUpdated.get(), is(true));
826 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
827 Future<Boolean> noteUpdated =
828 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
829 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
831 assertThat(noteUpdated.get(), is(true));
834 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
835 return matchesFcpMessage(
837 "Identifier=" + identifier,
838 "NodeIdentifier=" + nodeIdentifier,
844 private void replyWithPeerNote() throws IOException {
847 "Identifier=" + identifier,
848 "NodeIdentifier=Friend1",
859 private void replyWithUnknownNodeIdentifier() throws IOException {
861 "UnknownNodeIdentifier",
862 "Identifier=" + identifier,
863 "NodeIdentifier=id2",
870 public class PluginCommands {
872 private static final String CLASS_NAME = "foo.plugin.Plugin";
874 private void replyWithPluginInfo() throws IOException {
877 "Identifier=" + identifier,
878 "PluginName=superPlugin",
882 "OriginUri=superPlugin",
888 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
889 throws InterruptedException, ExecutionException {
890 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
891 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
892 assertThat(pluginInfo.get().get().isTalkable(), is(true));
893 assertThat(pluginInfo.get().get().getVersion(), is("42"));
894 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
895 assertThat(pluginInfo.get().get().isStarted(), is(true));
898 public class LoadPlugin {
900 public class OfficialPlugins {
903 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
904 Future<Optional<PluginInfo>> pluginInfo =
905 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
906 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
907 assertThat(lines, not(contains(startsWith("Store="))));
908 replyWithPluginInfo();
909 verifyPluginInfo(pluginInfo);
913 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
914 Future<Optional<PluginInfo>> pluginInfo =
915 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
916 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
917 assertThat(lines, hasItem("Store=true"));
918 replyWithPluginInfo();
919 verifyPluginInfo(pluginInfo);
923 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
924 Future<Optional<PluginInfo>> pluginInfo =
925 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
926 connectAndAssert(() -> createMatcherForOfficialSource("https"));
927 replyWithPluginInfo();
928 verifyPluginInfo(pluginInfo);
931 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
932 return matchesFcpMessage(
934 "Identifier=" + identifier,
935 "PluginURL=superPlugin",
937 "OfficialSource=" + officialSource
943 public class FromOtherSources {
945 private static final String FILE_PATH = "/path/to/plugin.jar";
946 private static final String URL = "http://server.com/plugin.jar";
947 private static final String KEY = "KSK@plugin.jar";
950 public void fromFile() throws ExecutionException, InterruptedException, IOException {
951 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
952 connectAndAssert(() -> createMatcher("file", FILE_PATH));
953 replyWithPluginInfo();
954 verifyPluginInfo(pluginInfo);
958 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
959 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
960 connectAndAssert(() -> createMatcher("url", URL));
961 replyWithPluginInfo();
962 verifyPluginInfo(pluginInfo);
966 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
967 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
968 connectAndAssert(() -> createMatcher("freenet", KEY));
969 replyWithPluginInfo();
970 verifyPluginInfo(pluginInfo);
973 private Matcher<List<String>> createMatcher(String urlType, String url) {
974 return matchesFcpMessage(
976 "Identifier=" + identifier,
984 public class Failed {
987 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
988 Future<Optional<PluginInfo>> pluginInfo =
989 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
990 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
991 replyWithProtocolError();
992 assertThat(pluginInfo.get().isPresent(), is(false));
999 public class ReloadPlugin {
1002 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1003 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1004 connectAndAssert(this::matchReloadPluginMessage);
1005 replyWithPluginInfo();
1006 verifyPluginInfo(pluginInfo);
1010 public void reloadingPluginWithMaxWaitTimeWorks()
1011 throws InterruptedException, ExecutionException, IOException {
1012 Future<Optional<PluginInfo>> pluginInfo =
1013 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1014 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1015 replyWithPluginInfo();
1016 verifyPluginInfo(pluginInfo);
1020 public void reloadingPluginWithPurgeWorks()
1021 throws InterruptedException, ExecutionException, IOException {
1022 Future<Optional<PluginInfo>> pluginInfo =
1023 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1024 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1025 replyWithPluginInfo();
1026 verifyPluginInfo(pluginInfo);
1030 public void reloadingPluginWithStoreWorks()
1031 throws InterruptedException, ExecutionException, IOException {
1032 Future<Optional<PluginInfo>> pluginInfo =
1033 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1034 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1035 replyWithPluginInfo();
1036 verifyPluginInfo(pluginInfo);
1040 public void protocolErrorIsRecognizedAsFailure()
1041 throws InterruptedException, ExecutionException, IOException {
1042 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1043 connectAndAssert(() -> matchReloadPluginMessage());
1044 replyWithProtocolError();
1045 assertThat(pluginInfo.get().isPresent(), is(false));
1048 private Matcher<List<String>> matchReloadPluginMessage() {
1049 return matchesFcpMessage(
1051 "Identifier=" + identifier,
1052 "PluginName=" + CLASS_NAME
1058 public class RemovePlugin {
1061 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1062 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1063 connectAndAssert(this::matchPluginRemovedMessage);
1064 replyWithPluginRemoved();
1065 assertThat(pluginRemoved.get(), is(true));
1069 public void removingPluginWithMaxWaitTimeWorks()
1070 throws InterruptedException, ExecutionException, IOException {
1071 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1072 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1073 replyWithPluginRemoved();
1074 assertThat(pluginRemoved.get(), is(true));
1078 public void removingPluginWithPurgeWorks()
1079 throws InterruptedException, ExecutionException, IOException {
1080 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1081 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1082 replyWithPluginRemoved();
1083 assertThat(pluginRemoved.get(), is(true));
1086 private void replyWithPluginRemoved() throws IOException {
1087 fcpServer.writeLine(
1089 "Identifier=" + identifier,
1090 "PluginName=" + CLASS_NAME,
1095 private Matcher<List<String>> matchPluginRemovedMessage() {
1096 return matchesFcpMessage(
1098 "Identifier=" + identifier,
1099 "PluginName=" + CLASS_NAME
1105 public class GetPluginInfo {
1108 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1109 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1110 connectAndAssert(this::matchGetPluginInfoMessage);
1111 replyWithPluginInfo();
1112 verifyPluginInfo(pluginInfo);
1116 public void gettingPluginInfoWithDetailsWorks()
1117 throws InterruptedException, ExecutionException, IOException {
1118 Future<Optional<PluginInfo>> pluginInfo =
1119 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1120 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1121 replyWithPluginInfo();
1122 verifyPluginInfo(pluginInfo);
1126 public void protocolErrorIsRecognizedAsFailure()
1127 throws InterruptedException, ExecutionException, IOException {
1128 Future<Optional<PluginInfo>> pluginInfo =
1129 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1130 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1131 replyWithProtocolError();
1132 assertThat(pluginInfo.get(), is(Optional.empty()));
1135 private Matcher<List<String>> matchGetPluginInfoMessage() {
1136 return matchesFcpMessage(
1138 "Identifier=" + identifier,
1139 "PluginName=" + CLASS_NAME
1147 public class UskSubscriptionCommands {
1149 private static final String URI = "USK@some,uri/file.txt";
1152 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1153 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1154 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1155 replyWithSubscribed();
1156 assertThat(uskSubscription.get().get().getUri(), is(URI));
1157 AtomicInteger edition = new AtomicInteger();
1158 CountDownLatch updated = new CountDownLatch(2);
1159 uskSubscription.get().get().onUpdate(e -> {
1161 updated.countDown();
1163 sendUpdateNotification(23);
1164 sendUpdateNotification(24);
1165 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1166 assertThat(edition.get(), is(24));
1170 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1171 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1172 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1173 replyWithSubscribed();
1174 assertThat(uskSubscription.get().get().getUri(), is(URI));
1175 AtomicInteger edition = new AtomicInteger();
1176 CountDownLatch updated = new CountDownLatch(2);
1177 uskSubscription.get().get().onUpdate(e -> {
1179 updated.countDown();
1181 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1182 sendUpdateNotification(23);
1183 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1184 assertThat(edition.get(), is(23));
1188 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1189 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1190 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1191 replyWithSubscribed();
1192 assertThat(uskSubscription.get().get().getUri(), is(URI));
1193 AtomicBoolean updated = new AtomicBoolean();
1194 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1195 uskSubscription.get().get().cancel();
1196 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1197 sendUpdateNotification(23);
1198 assertThat(updated.get(), is(false));
1201 private void replyWithSubscribed() throws IOException {
1202 fcpServer.writeLine(
1204 "Identifier=" + identifier,
1211 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1212 fcpServer.writeLine(
1213 "SubscribedUSKUpdate",
1214 "Identifier=" + identifier,
1216 "Edition=" + edition
1218 fcpServer.writeLine(additionalLines);
1219 fcpServer.writeLine("EndMessage");
1224 public class ClientGet {
1227 public void works() throws InterruptedException, ExecutionException, IOException {
1228 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1229 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1230 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1231 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1232 Optional<Data> data = dataFuture.get();
1237 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1238 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1239 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1240 replyWithGetFailed("not-test");
1241 replyWithGetFailed(identifier);
1242 Optional<Data> data = dataFuture.get();
1243 assertThat(data.isPresent(), is(false));
1247 public void getFailedForDifferentIdentifierIsIgnored()
1248 throws InterruptedException, ExecutionException, IOException {
1249 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1250 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1251 replyWithGetFailed("not-test");
1252 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1253 Optional<Data> data = dataFuture.get();
1257 @Test(expected = ExecutionException.class)
1258 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1259 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1260 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1266 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1267 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1268 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1272 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1273 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1274 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1278 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1279 throws InterruptedException, ExecutionException, IOException {
1280 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1281 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1285 public void clientGetWithPrioritySettingSendsCorrectCommands()
1286 throws InterruptedException, ExecutionException, IOException {
1287 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1288 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1292 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1293 throws InterruptedException, ExecutionException, IOException {
1294 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1295 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1299 public void clientGetWithGlobalSettingSendsCorrectCommands()
1300 throws InterruptedException, ExecutionException, IOException {
1301 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1302 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1305 private void replyWithGetFailed(String identifier) throws IOException {
1306 fcpServer.writeLine(
1308 "Identifier=" + identifier,
1314 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1315 fcpServer.writeLine(
1317 "Identifier=" + identifier,
1318 "DataLength=" + (text.length() + 1),
1319 "StartupTime=1435610539000",
1320 "CompletionTime=1435610540000",
1321 "Metadata.ContentType=" + contentType,
1327 private void verifyData(Optional<Data> data) throws IOException {
1328 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1329 assertThat(data.get().size(), is(6L));
1330 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1331 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1336 public class ClientPut {
1339 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1340 fcpClient.clientPut()
1341 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1346 readMessage("Hello", this::matchesDirectClientPut);
1350 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1351 Future<Optional<Key>> key = fcpClient.clientPut()
1352 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1357 readMessage("Hello", this::matchesDirectClientPut);
1358 replyWithPutFailed("not-the-right-one");
1359 replyWithPutSuccessful(identifier);
1360 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1364 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1365 Future<Optional<Key>> key = fcpClient.clientPut()
1366 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1371 readMessage("Hello", this::matchesDirectClientPut);
1372 replyWithPutSuccessful("not-the-right-one");
1373 replyWithPutFailed(identifier);
1374 assertThat(key.get().isPresent(), is(false));
1378 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1379 fcpClient.clientPut()
1380 .named("otherName.txt")
1381 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1386 readMessage("Hello", () -> allOf(
1387 hasHead("ClientPut"),
1388 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1390 hasTail("EndMessage", "Hello")
1395 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1396 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1397 connectAndAssert(() ->
1398 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1402 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1403 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1404 connectAndAssert(() ->
1405 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1410 private final File ddaFile;
1411 private final File fileToUpload;
1413 public DDA() throws IOException {
1414 ddaFile = createDdaFile();
1415 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1418 private Matcher<List<String>> matchesFileClientPut(File file) {
1419 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1423 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1424 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1425 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1426 sendDdaRequired(identifier);
1427 readMessage(() -> matchesTestDDARequest(ddaFile));
1428 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1429 readMessage(() -> matchesTestDDAResponse(ddaFile));
1430 writeTestDDAComplete(ddaFile);
1431 readMessage(() -> matchesFileClientPut(fileToUpload));
1435 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1436 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1437 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1438 sendDdaRequired(identifier);
1439 readMessage(() -> matchesTestDDARequest(ddaFile));
1440 sendTestDDAReply("/some-other-directory", ddaFile);
1441 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1442 readMessage(() -> matchesTestDDAResponse(ddaFile));
1446 public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1447 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1448 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1449 sendDdaRequired(identifier);
1450 readMessage(() -> matchesTestDDARequest(ddaFile));
1451 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1452 readMessage(this::matchesFailedToReadResponse);
1456 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1457 throws IOException, ExecutionException, InterruptedException {
1458 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1460 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1461 String identifier = extractIdentifier(lines);
1462 fcpServer.writeLine(
1464 "Directory=/some-other-directory",
1467 sendDdaRequired(identifier);
1468 lines = fcpServer.collectUntil(is("EndMessage"));
1469 assertThat(lines, matchesFcpMessage(
1471 "Directory=" + ddaFile.getParent(),
1472 "WantReadDirectory=true",
1473 "WantWriteDirectory=false"
1477 private Matcher<List<String>> matchesFailedToReadResponse() {
1478 return matchesFcpMessage(
1480 "Directory=" + ddaFile.getParent(),
1481 "ReadContent=failed-to-read"
1485 private void writeTestDDAComplete(File tempFile) throws IOException {
1486 fcpServer.writeLine(
1488 "Directory=" + tempFile.getParent(),
1489 "ReadDirectoryAllowed=true",
1494 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1495 return matchesFcpMessage(
1497 "Directory=" + tempFile.getParent(),
1498 "ReadContent=test-content"
1502 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1503 fcpServer.writeLine(
1505 "Directory=" + directory,
1506 "ReadFilename=" + tempFile,
1511 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1512 return matchesFcpMessage(
1514 "Directory=" + tempFile.getParent(),
1515 "WantReadDirectory=true",
1516 "WantWriteDirectory=false"
1520 private void sendDdaRequired(String identifier) throws IOException {
1521 fcpServer.writeLine(
1523 "Identifier=" + identifier,
1531 private void replyWithPutSuccessful(String identifier) throws IOException {
1532 fcpServer.writeLine(
1535 "Identifier=" + identifier,
1540 private void replyWithPutFailed(String identifier) throws IOException {
1541 fcpServer.writeLine(
1543 "Identifier=" + identifier,
1548 private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
1549 List<String> lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
1550 Arrays.asList(additionalLines).forEach(lines::add);
1552 hasHead("ClientPut"),
1553 hasParameters(1, 2, lines.toArray(new String[lines.size()])),
1554 hasTail("EndMessage", "Hello")
1558 private File createDdaFile() throws IOException {
1559 File tempFile = File.createTempFile("test-dda-", ".dat");
1560 tempFile.deleteOnExit();
1561 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1566 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1567 throws InterruptedException, ExecutionException, IOException {
1568 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1570 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1571 String identifier = extractIdentifier(lines);
1572 fcpServer.writeLine(
1574 "Identifier=not-the-right-one",
1578 fcpServer.writeLine(
1580 "Identifier=" + identifier,
1584 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1588 public void clientPutAbortsOnProtocolErrorOtherThan25()
1589 throws InterruptedException, ExecutionException, IOException {
1590 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1592 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1593 String identifier = extractIdentifier(lines);
1594 fcpServer.writeLine(
1596 "Identifier=" + identifier,
1600 assertThat(key.get().isPresent(), is(false));
1604 public void clientPutSendsNotificationsForGeneratedKeys()
1605 throws InterruptedException, ExecutionException, IOException {
1606 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1607 Future<Optional<Key>> key = fcpClient.clientPut()
1608 .onKeyGenerated(generatedKeys::add)
1609 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1614 List<String> lines = fcpServer.collectUntil(is("Hello"));
1615 String identifier = extractIdentifier(lines);
1616 fcpServer.writeLine(
1618 "Identifier=" + identifier,
1622 replyWithPutSuccessful(identifier);
1623 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1624 assertThat(generatedKeys, contains("KSK@foo.txt"));
1628 public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException {
1629 List<RequestProgress> requestProgress = new ArrayList<>();
1630 Future<Optional<Key>> key = fcpClient.clientPut()
1631 .onProgress(requestProgress::add)
1632 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1637 readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
1638 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1639 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1640 replyWithPutSuccessful(identifier);
1641 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1642 assertThat(requestProgress, contains(
1643 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1644 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1648 private void replyWithSimpleProgress(
1649 int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
1650 boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
1651 fcpServer.writeLine(
1653 "Identifier=" + identifier,
1655 "Required=" + required,
1657 "FatallyFailed=" + fatallyFailed,
1658 "Succeeded=" + succeeded,
1659 "LastProgress=" + lastProgress,
1660 "FinalizedTotal=" + finalizedTotal,
1661 "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
1668 public class ClientPutDiskDir {
1670 private final TemporaryFolder folder = new TemporaryFolder();
1673 public void setup() throws IOException {
1675 Files.write("file1\n", folder.newFile("file1.txt"), StandardCharsets.UTF_8);
1676 Files.write("file2\n", folder.newFile("file2.txt"), StandardCharsets.UTF_8);
1677 File directory = folder.newFolder("dir");
1678 Files.write("file3\n", new File(directory, "file3.txt"), StandardCharsets.UTF_8);
1682 public void removeFolder() {
1687 public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1688 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(folder.getRoot()).uri("CHK@").execute();
1689 connectAndAssert(this::matchesClientPutDiskDir);
1690 fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1691 assertThat(key.get().get().getKey(), is("CHK@abc"));
1695 public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
1696 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(folder.getRoot()).uri("CHK@").execute();
1697 connectAndAssert(this::matchesClientPutDiskDir);
1698 replyWithProtocolError();
1699 assertThat(key.get().isPresent(), is(false));
1702 private Matcher<List<String>> matchesClientPutDiskDir() {
1703 return matchesFcpMessage(
1705 "Identifier=" + identifier,
1707 "Filename=" + folder.getRoot().getPath()
1713 public class ConfigCommand {
1715 public class GetConfig {
1718 public void defaultFcpClientCanGetConfigWithoutDetails()
1719 throws InterruptedException, ExecutionException, IOException {
1720 Future<ConfigData> configData = fcpClient.getConfig().execute();
1721 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1722 replyWithConfigData();
1723 assertThat(configData.get(), notNullValue());
1727 public void defaultFcpClientCanGetConfigWithCurrent()
1728 throws InterruptedException, ExecutionException, IOException {
1729 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1730 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1731 replyWithConfigData("current.foo=bar");
1732 assertThat(configData.get().getCurrent("foo"), is("bar"));
1736 public void defaultFcpClientCanGetConfigWithDefaults()
1737 throws InterruptedException, ExecutionException, IOException {
1738 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1739 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1740 replyWithConfigData("default.foo=bar");
1741 assertThat(configData.get().getDefault("foo"), is("bar"));
1745 public void defaultFcpClientCanGetConfigWithSortOrder()
1746 throws InterruptedException, ExecutionException, IOException {
1747 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1748 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1749 replyWithConfigData("sortOrder.foo=17");
1750 assertThat(configData.get().getSortOrder("foo"), is(17));
1754 public void defaultFcpClientCanGetConfigWithExpertFlag()
1755 throws InterruptedException, ExecutionException, IOException {
1756 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1757 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1758 replyWithConfigData("expertFlag.foo=true");
1759 assertThat(configData.get().getExpertFlag("foo"), is(true));
1763 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1764 throws InterruptedException, ExecutionException, IOException {
1765 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1766 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1767 replyWithConfigData("forceWriteFlag.foo=true");
1768 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1772 public void defaultFcpClientCanGetConfigWithShortDescription()
1773 throws InterruptedException, ExecutionException, IOException {
1774 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1775 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1776 replyWithConfigData("shortDescription.foo=bar");
1777 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1781 public void defaultFcpClientCanGetConfigWithLongDescription()
1782 throws InterruptedException, ExecutionException, IOException {
1783 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1784 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1785 replyWithConfigData("longDescription.foo=bar");
1786 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1790 public void defaultFcpClientCanGetConfigWithDataTypes()
1791 throws InterruptedException, ExecutionException, IOException {
1792 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1793 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1794 replyWithConfigData("dataType.foo=number");
1795 assertThat(configData.get().getDataType("foo"), is("number"));
1798 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1799 return matchesFcpMessage(
1801 "Identifier=" + identifier,
1802 additionalParameter + "=true"
1808 public class ModifyConfig {
1811 public void defaultFcpClientCanModifyConfigData()
1812 throws InterruptedException, ExecutionException, IOException {
1813 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1814 connectAndAssert(() -> matchesFcpMessage(
1816 "Identifier=" + identifier,
1819 replyWithConfigData("current.foo.bar=baz");
1820 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1825 private void replyWithConfigData(String... additionalLines) throws IOException {
1826 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1827 fcpServer.writeLine(additionalLines);
1828 fcpServer.writeLine("EndMessage");
1833 public class NodeInformation {
1836 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1837 Future<NodeData> nodeData = fcpClient.getNode().execute();
1838 connectAndAssert(() -> matchesGetNode(false, false, false));
1839 replyWithNodeData();
1840 assertThat(nodeData.get(), notNullValue());
1841 assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1845 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1846 throws InterruptedException, ExecutionException, IOException {
1847 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1848 connectAndAssert(() -> matchesGetNode(true, false, false));
1849 replyWithNodeData("opennet=true");
1850 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1851 assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1855 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1856 throws InterruptedException, ExecutionException, IOException {
1857 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1858 connectAndAssert(() -> matchesGetNode(false, true, false));
1859 replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1860 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1864 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1865 throws InterruptedException, ExecutionException, IOException {
1866 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1867 connectAndAssert(() -> matchesGetNode(false, false, true));
1868 replyWithNodeData("volatile.freeJavaMemory=205706528");
1869 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1872 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1873 return matchesFcpMessage(
1875 "Identifier=" + identifier,
1876 "GiveOpennetRef=" + withOpennetRef,
1877 "WithPrivate=" + withPrivate,
1878 "WithVolatile=" + withVolatile
1882 private void replyWithNodeData(String... additionalLines) throws IOException {
1883 fcpServer.writeLine(
1885 "Identifier=" + identifier,
1886 "ark.pubURI=SSK@3YEf.../ark",
1889 "version=Fred,0.7,1.0,1466",
1890 "lastGoodVersion=Fred,0.7,1.0,1466"
1892 fcpServer.writeLine(additionalLines);
1893 fcpServer.writeLine("EndMessage");