1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
14 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
62 * Unit test for {@link DefaultFcpClient}.
64 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
69 private static final String INSERT_URI =
70 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71 private static final String REQUEST_URI =
72 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
74 private int threadCounter = 0;
75 private final ExecutorService threadPool =
76 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77 private final FakeTcpServer fcpServer;
78 private final DefaultFcpClient fcpClient;
80 public DefaultFcpClientTest() throws IOException {
81 fcpServer = new FakeTcpServer(threadPool);
82 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
86 public void tearDown() throws IOException {
88 threadPool.shutdown();
91 private void connectNode() throws InterruptedException, ExecutionException, IOException {
92 fcpServer.connect().get();
93 fcpServer.collectUntil(is("EndMessage"));
94 fcpServer.writeLine("NodeHello",
95 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
96 "Revision=build01466",
98 "Version=Fred,0.7,1.0,1466",
100 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
104 "NodeLanguage=ENGLISH",
110 private String extractIdentifier(List<String> lines) {
111 return lines.stream()
112 .filter(s -> s.startsWith("Identifier="))
113 .map(s -> s.substring(s.indexOf('=') + 1))
118 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
119 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
122 private Matcher<Iterable<String>> hasHead(String firstElement) {
123 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
125 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
126 if (!iterable.iterator().hasNext()) {
127 mismatchDescription.appendText("is empty");
130 String element = iterable.iterator().next();
131 if (!element.equals(firstElement)) {
132 mismatchDescription.appendText("starts with ").appendValue(element);
139 public void describeTo(Description description) {
140 description.appendText("starts with ").appendValue(firstElement);
145 private Matcher<List<String>> matchesFcpMessageWithTerminator(
146 String name, String terminator, String... requiredLines) {
147 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
150 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
151 return new TypeSafeDiagnosingMatcher<List<String>>() {
153 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
154 if (item.size() < (ignoreStart + ignoreEnd)) {
155 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
158 for (String line : lines) {
159 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
160 mismatchDescription.appendText("does not contains ").appendValue(line);
168 public void describeTo(Description description) {
169 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
170 description.appendText(", ignoring the first ").appendValue(ignoreStart);
171 description.appendText(" and the last ").appendValue(ignoreEnd);
176 private Matcher<List<String>> hasTail(String... lastElements) {
177 return new TypeSafeDiagnosingMatcher<List<String>>() {
179 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
180 if (list.size() < lastElements.length) {
181 mismatchDescription.appendText("is too small");
184 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
185 if (!tail.equals(Arrays.asList(lastElements))) {
186 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
193 public void describeTo(Description description) {
194 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
200 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
201 Future<NodeData> nodeData = fcpClient.getNode().execute();
203 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
204 String identifier = extractIdentifier(lines);
205 assertThat(lines, matchesFcpMessage(
207 "Identifier=" + identifier,
208 "GiveOpennetRef=false",
214 "Identifier=" + identifier,
215 "ark.pubURI=SSK@3YEf.../ark",
218 "version=Fred,0.7,1.0,1466",
219 "lastGoodVersion=Fred,0.7,1.0,1466",
222 assertThat(nodeData.get(), notNullValue());
226 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
227 throws InterruptedException, ExecutionException, IOException {
228 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
230 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
231 String identifier = extractIdentifier(lines);
232 assertThat(lines, matchesFcpMessage(
234 "Identifier=" + identifier,
235 "GiveOpennetRef=true",
241 "Identifier=" + identifier,
243 "ark.pubURI=SSK@3YEf.../ark",
246 "version=Fred,0.7,1.0,1466",
247 "lastGoodVersion=Fred,0.7,1.0,1466",
250 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
254 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
255 throws InterruptedException, ExecutionException, IOException {
256 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
258 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
259 String identifier = extractIdentifier(lines);
260 assertThat(lines, matchesFcpMessage(
262 "Identifier=" + identifier,
263 "GiveOpennetRef=false",
269 "Identifier=" + identifier,
271 "ark.pubURI=SSK@3YEf.../ark",
274 "version=Fred,0.7,1.0,1466",
275 "lastGoodVersion=Fred,0.7,1.0,1466",
276 "ark.privURI=SSK@XdHMiRl",
279 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
283 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
284 throws InterruptedException, ExecutionException, IOException {
285 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
287 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
288 String identifier = extractIdentifier(lines);
289 assertThat(lines, matchesFcpMessage(
291 "Identifier=" + identifier,
292 "GiveOpennetRef=false",
298 "Identifier=" + identifier,
300 "ark.pubURI=SSK@3YEf.../ark",
303 "version=Fred,0.7,1.0,1466",
304 "lastGoodVersion=Fred,0.7,1.0,1466",
305 "volatile.freeJavaMemory=205706528",
308 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
311 private List<String> lines;
312 private String identifier;
314 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
315 throws InterruptedException, ExecutionException, IOException {
317 readMessage(requestMatcher);
320 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
321 readMessage("EndMessage", requestMatcher);
324 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
325 lines = fcpServer.collectUntil(is(terminator));
326 identifier = extractIdentifier(lines);
327 assertThat(lines, requestMatcher.get());
330 public class ConnectionsAndKeyPairs {
332 public class Connections {
334 @Test(expected = ExecutionException.class)
335 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
336 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
337 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
339 "CloseConnectionDuplicateClientName",
345 @Test(expected = ExecutionException.class)
346 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
347 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
348 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
354 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
355 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
356 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
359 keyPair = fcpClient.generateKeypair().execute();
360 readMessage(() -> matchesFcpMessage("GenerateSSK"));
361 identifier = extractIdentifier(lines);
367 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
368 throws InterruptedException, ExecutionException, IOException {
369 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
370 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
375 } catch (ExecutionException e) {
378 keyPair = fcpClient.generateKeypair().execute();
379 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
386 public class GenerateKeyPair {
389 public void defaultFcpClientCanGenerateKeypair()
390 throws ExecutionException, InterruptedException, IOException {
391 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
392 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
394 FcpKeyPair keyPair = keyPairFuture.get();
395 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
396 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
401 private void replyWithKeyPair() throws IOException {
402 fcpServer.writeLine("SSKKeypair",
403 "InsertURI=" + INSERT_URI + "",
404 "RequestURI=" + REQUEST_URI + "",
405 "Identifier=" + identifier,
413 public class PeerCommands {
415 public class ListPeer {
418 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
419 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
420 connectAndAssert(() -> matchesListPeer("id1"));
421 replyWithPeer("id1");
422 assertThat(peer.get().get().getIdentity(), is("id1"));
426 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
427 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
428 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
429 replyWithPeer("id1");
430 assertThat(peer.get().get().getIdentity(), is("id1"));
434 public void byName() throws InterruptedException, ExecutionException, IOException {
435 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
436 connectAndAssert(() -> matchesListPeer("FriendNode"));
437 replyWithPeer("id1");
438 assertThat(peer.get().get().getIdentity(), is("id1"));
442 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
443 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
444 connectAndAssert(() -> matchesListPeer("id2"));
445 replyWithUnknownNodeIdentifier();
446 assertThat(peer.get().isPresent(), is(false));
449 private Matcher<List<String>> matchesListPeer(String nodeId) {
450 return matchesFcpMessage(
452 "Identifier=" + identifier,
453 "NodeIdentifier=" + nodeId
459 public class ListPeers {
462 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
463 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
464 connectAndAssert(() -> matchesListPeers(false, false));
465 replyWithPeer("id1");
466 replyWithPeer("id2");
468 assertThat(peers.get(), hasSize(2));
469 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
470 containsInAnyOrder("id1", "id2"));
474 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
475 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
476 connectAndAssert(() -> matchesListPeers(false, true));
477 replyWithPeer("id1", "metadata.foo=bar1");
478 replyWithPeer("id2", "metadata.foo=bar2");
480 assertThat(peers.get(), hasSize(2));
481 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
482 containsInAnyOrder("bar1", "bar2"));
486 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
487 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
488 connectAndAssert(() -> matchesListPeers(true, false));
489 replyWithPeer("id1", "volatile.foo=bar1");
490 replyWithPeer("id2", "volatile.foo=bar2");
492 assertThat(peers.get(), hasSize(2));
493 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
494 containsInAnyOrder("bar1", "bar2"));
497 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
498 return matchesFcpMessage(
500 "WithVolatile=" + withVolatile,
501 "WithMetadata=" + withMetadata
505 private void sendEndOfPeerList() throws IOException {
508 "Identifier=" + identifier,
515 public class AddPeer {
518 public void fromFile() throws InterruptedException, ExecutionException, IOException {
519 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
520 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
521 replyWithPeer("id1");
522 assertThat(peer.get().get().getIdentity(), is("id1"));
526 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
527 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
528 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
529 replyWithPeer("id1");
530 assertThat(peer.get().get().getIdentity(), is("id1"));
534 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
535 NodeRef nodeRef = createNodeRef();
536 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
537 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
543 "dsaGroup.q=subprime",
544 "dsaPubKey.y=dsa-public",
545 "physical.udp=1.2.3.4:5678",
549 replyWithPeer("id1");
550 assertThat(peer.get().get().getIdentity(), is("id1"));
553 private NodeRef createNodeRef() {
554 NodeRef nodeRef = new NodeRef();
555 nodeRef.setIdentity("id1");
556 nodeRef.setName("name");
557 nodeRef.setARK(new ARK("public", "1"));
558 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
559 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
560 nodeRef.setPhysicalUDP("1.2.3.4:5678");
561 nodeRef.setDSAPublicKey("dsa-public");
562 nodeRef.setSignature("sig");
566 private Matcher<List<String>> matchesAddPeer() {
567 return matchesFcpMessage(
569 "Identifier=" + identifier
575 public class ModifyPeer {
578 public void defaultFcpClientCanEnablePeerByName()
579 throws InterruptedException, ExecutionException, IOException {
580 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
581 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
582 replyWithPeer("id1");
583 assertThat(peer.get().get().getIdentity(), is("id1"));
587 public void defaultFcpClientCanDisablePeerByName()
588 throws InterruptedException, ExecutionException, IOException {
589 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
590 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
591 replyWithPeer("id1");
592 assertThat(peer.get().get().getIdentity(), is("id1"));
596 public void defaultFcpClientCanEnablePeerByIdentity()
597 throws InterruptedException, ExecutionException, IOException {
598 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
599 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
600 replyWithPeer("id1");
601 assertThat(peer.get().get().getIdentity(), is("id1"));
605 public void defaultFcpClientCanEnablePeerByHostAndPort()
606 throws InterruptedException, ExecutionException, IOException {
607 Future<Optional<Peer>> peer =
608 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
609 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
610 replyWithPeer("id1");
611 assertThat(peer.get().get().getIdentity(), is("id1"));
615 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
616 Future<Optional<Peer>> peer =
617 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
618 connectAndAssert(() -> allOf(
619 matchesModifyPeer("id1", "AllowLocalAddresses", true),
620 not(contains(startsWith("IsDisabled=")))
622 replyWithPeer("id1");
623 assertThat(peer.get().get().getIdentity(), is("id1"));
627 public void disallowLocalAddressesOfPeer()
628 throws InterruptedException, ExecutionException, IOException {
629 Future<Optional<Peer>> peer =
630 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
631 connectAndAssert(() -> allOf(
632 matchesModifyPeer("id1", "AllowLocalAddresses", false),
633 not(contains(startsWith("IsDisabled=")))
635 replyWithPeer("id1");
636 assertThat(peer.get().get().getIdentity(), is("id1"));
640 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
641 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
642 connectAndAssert(() -> allOf(
643 matchesModifyPeer("id1", "IsBurstOnly", true),
644 not(contains(startsWith("AllowLocalAddresses="))),
645 not(contains(startsWith("IsDisabled=")))
647 replyWithPeer("id1");
648 assertThat(peer.get().get().getIdentity(), is("id1"));
652 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
653 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
654 connectAndAssert(() -> allOf(
655 matchesModifyPeer("id1", "IsBurstOnly", false),
656 not(contains(startsWith("AllowLocalAddresses="))),
657 not(contains(startsWith("IsDisabled=")))
659 replyWithPeer("id1");
660 assertThat(peer.get().get().getIdentity(), is("id1"));
664 public void defaultFcpClientCanSetListenOnlyForPeer()
665 throws InterruptedException, ExecutionException, IOException {
666 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
667 connectAndAssert(() -> allOf(
668 matchesModifyPeer("id1", "IsListenOnly", true),
669 not(contains(startsWith("AllowLocalAddresses="))),
670 not(contains(startsWith("IsDisabled="))),
671 not(contains(startsWith("IsBurstOnly=")))
673 replyWithPeer("id1");
674 assertThat(peer.get().get().getIdentity(), is("id1"));
678 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
679 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
680 connectAndAssert(() -> allOf(
681 matchesModifyPeer("id1", "IsListenOnly", false),
682 not(contains(startsWith("AllowLocalAddresses="))),
683 not(contains(startsWith("IsDisabled="))),
684 not(contains(startsWith("IsBurstOnly=")))
686 replyWithPeer("id1");
687 assertThat(peer.get().get().getIdentity(), is("id1"));
691 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
692 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
693 connectAndAssert(() -> allOf(
694 matchesModifyPeer("id1", "IgnoreSourcePort", true),
695 not(contains(startsWith("AllowLocalAddresses="))),
696 not(contains(startsWith("IsDisabled="))),
697 not(contains(startsWith("IsBurstOnly="))),
698 not(contains(startsWith("IsListenOnly=")))
700 replyWithPeer("id1");
701 assertThat(peer.get().get().getIdentity(), is("id1"));
705 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
706 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
707 connectAndAssert(() -> allOf(
708 matchesModifyPeer("id1", "IgnoreSourcePort", false),
709 not(contains(startsWith("AllowLocalAddresses="))),
710 not(contains(startsWith("IsDisabled="))),
711 not(contains(startsWith("IsBurstOnly="))),
712 not(contains(startsWith("IsListenOnly=")))
714 replyWithPeer("id1");
715 assertThat(peer.get().get().getIdentity(), is("id1"));
719 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
720 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
721 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
722 replyWithUnknownNodeIdentifier();
723 assertThat(peer.get().isPresent(), is(false));
726 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
727 return matchesFcpMessage(
729 "Identifier=" + identifier,
730 "NodeIdentifier=" + nodeIdentifier,
731 setting + "=" + value
737 public class RemovePeer {
740 public void byName() throws InterruptedException, ExecutionException, IOException {
741 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
742 connectAndAssert(() -> matchesRemovePeer("Friend1"));
743 replyWithPeerRemoved("Friend1");
744 assertThat(peer.get(), is(true));
748 public void invalidName() throws InterruptedException, ExecutionException, IOException {
749 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
750 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
751 replyWithUnknownNodeIdentifier();
752 assertThat(peer.get(), is(false));
756 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
757 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
758 connectAndAssert(() -> matchesRemovePeer("id1"));
759 replyWithPeerRemoved("id1");
760 assertThat(peer.get(), is(true));
764 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
765 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
766 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
767 replyWithPeerRemoved("Friend1");
768 assertThat(peer.get(), is(true));
771 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
772 return matchesFcpMessage(
774 "Identifier=" + identifier,
775 "NodeIdentifier=" + nodeIdentifier
779 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
782 "Identifier=" + identifier,
783 "NodeIdentifier=" + nodeIdentifier,
790 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
793 "Identifier=" + identifier,
794 "identity=" + peerId,
796 "ark.pubURI=SSK@3YEf.../ark",
799 "version=Fred,0.7,1.0,1466",
800 "lastGoodVersion=Fred,0.7,1.0,1466"
802 fcpServer.writeLine(additionalLines);
803 fcpServer.writeLine("EndMessage");
808 public class PeerNoteCommands {
810 public class ListPeerNotes {
813 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
814 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
815 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
816 replyWithUnknownNodeIdentifier();
817 assertThat(peerNote.get().isPresent(), is(false));
821 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
822 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
823 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
825 replyWithEndListPeerNotes();
826 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
827 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
831 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
832 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
833 connectAndAssert(() -> matchesListPeerNotes("id1"));
835 replyWithEndListPeerNotes();
836 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
837 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
841 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
842 Future<Optional<PeerNote>> peerNote =
843 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
844 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
846 replyWithEndListPeerNotes();
847 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
848 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
851 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
852 return matchesFcpMessage(
854 "NodeIdentifier=" + nodeIdentifier
858 private void replyWithEndListPeerNotes() throws IOException {
861 "Identifier=" + identifier,
866 private void replyWithPeerNote() throws IOException {
869 "Identifier=" + identifier,
870 "NodeIdentifier=Friend1",
871 "NoteText=RXhhbXBsZSBUZXh0Lg==",
879 public class ModifyPeerNotes {
882 public void byName() throws InterruptedException, ExecutionException, IOException {
883 Future<Boolean> noteUpdated =
884 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
885 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
887 assertThat(noteUpdated.get(), is(true));
891 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
892 Future<Boolean> noteUpdated =
893 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
894 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
895 replyWithUnknownNodeIdentifier();
896 assertThat(noteUpdated.get(), is(false));
900 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
901 throws InterruptedException, ExecutionException, IOException {
902 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
903 assertThat(noteUpdated.get(), is(false));
907 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
908 Future<Boolean> noteUpdated =
909 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
910 connectAndAssert(() -> matchesModifyPeerNote("id1"));
912 assertThat(noteUpdated.get(), is(true));
916 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
917 Future<Boolean> noteUpdated =
918 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
919 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
921 assertThat(noteUpdated.get(), is(true));
924 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
925 return matchesFcpMessage(
927 "Identifier=" + identifier,
928 "NodeIdentifier=" + nodeIdentifier,
934 private void replyWithPeerNote() throws IOException {
937 "Identifier=" + identifier,
938 "NodeIdentifier=Friend1",
949 private void replyWithUnknownNodeIdentifier() throws IOException {
951 "UnknownNodeIdentifier",
952 "Identifier=" + identifier,
953 "NodeIdentifier=id2",
960 public class PluginCommands {
962 private static final String CLASS_NAME = "foo.plugin.Plugin";
964 private void replyWithPluginInfo() throws IOException {
967 "Identifier=" + identifier,
968 "PluginName=superPlugin",
972 "OriginUri=superPlugin",
978 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
979 throws InterruptedException, ExecutionException {
980 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
981 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
982 assertThat(pluginInfo.get().get().isTalkable(), is(true));
983 assertThat(pluginInfo.get().get().getVersion(), is("42"));
984 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
985 assertThat(pluginInfo.get().get().isStarted(), is(true));
988 public class LoadPlugin {
990 public class OfficialPlugins {
993 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
994 Future<Optional<PluginInfo>> pluginInfo =
995 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
996 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
997 assertThat(lines, not(contains(startsWith("Store="))));
998 replyWithPluginInfo();
999 verifyPluginInfo(pluginInfo);
1003 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
1004 Future<Optional<PluginInfo>> pluginInfo =
1005 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
1006 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1007 assertThat(lines, hasItem("Store=true"));
1008 replyWithPluginInfo();
1009 verifyPluginInfo(pluginInfo);
1013 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
1014 Future<Optional<PluginInfo>> pluginInfo =
1015 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
1016 connectAndAssert(() -> createMatcherForOfficialSource("https"));
1017 replyWithPluginInfo();
1018 verifyPluginInfo(pluginInfo);
1021 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
1022 return matchesFcpMessage(
1024 "Identifier=" + identifier,
1025 "PluginURL=superPlugin",
1027 "OfficialSource=" + officialSource
1033 public class FromOtherSources {
1035 private static final String FILE_PATH = "/path/to/plugin.jar";
1036 private static final String URL = "http://server.com/plugin.jar";
1037 private static final String KEY = "KSK@plugin.jar";
1040 public void fromFile() throws ExecutionException, InterruptedException, IOException {
1041 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
1042 connectAndAssert(() -> createMatcher("file", FILE_PATH));
1043 replyWithPluginInfo();
1044 verifyPluginInfo(pluginInfo);
1048 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
1049 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
1050 connectAndAssert(() -> createMatcher("url", URL));
1051 replyWithPluginInfo();
1052 verifyPluginInfo(pluginInfo);
1056 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1057 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
1058 connectAndAssert(() -> createMatcher("freenet", KEY));
1059 replyWithPluginInfo();
1060 verifyPluginInfo(pluginInfo);
1063 private Matcher<List<String>> createMatcher(String urlType, String url) {
1064 return matchesFcpMessage(
1066 "Identifier=" + identifier,
1068 "URLType=" + urlType
1074 public class Failed {
1077 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
1078 Future<Optional<PluginInfo>> pluginInfo =
1079 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1080 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
1081 replyWithProtocolError();
1082 assertThat(pluginInfo.get().isPresent(), is(false));
1089 private void replyWithProtocolError() throws IOException {
1090 fcpServer.writeLine(
1092 "Identifier=" + identifier,
1097 public class ReloadPlugin {
1100 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1101 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1102 connectAndAssert(this::matchReloadPluginMessage);
1103 replyWithPluginInfo();
1104 verifyPluginInfo(pluginInfo);
1108 public void reloadingPluginWithMaxWaitTimeWorks()
1109 throws InterruptedException, ExecutionException, IOException {
1110 Future<Optional<PluginInfo>> pluginInfo =
1111 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1112 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1113 replyWithPluginInfo();
1114 verifyPluginInfo(pluginInfo);
1118 public void reloadingPluginWithPurgeWorks()
1119 throws InterruptedException, ExecutionException, IOException {
1120 Future<Optional<PluginInfo>> pluginInfo =
1121 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1122 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1123 replyWithPluginInfo();
1124 verifyPluginInfo(pluginInfo);
1128 public void reloadingPluginWithStoreWorks()
1129 throws InterruptedException, ExecutionException, IOException {
1130 Future<Optional<PluginInfo>> pluginInfo =
1131 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1132 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1133 replyWithPluginInfo();
1134 verifyPluginInfo(pluginInfo);
1137 private Matcher<List<String>> matchReloadPluginMessage() {
1138 return matchesFcpMessage(
1140 "Identifier=" + identifier,
1141 "PluginName=" + CLASS_NAME
1147 public class RemovePlugin {
1150 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1151 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1152 connectAndAssert(this::matchPluginRemovedMessage);
1153 replyWithPluginRemoved();
1154 assertThat(pluginRemoved.get(), is(true));
1158 public void removingPluginWithMaxWaitTimeWorks()
1159 throws InterruptedException, ExecutionException, IOException {
1160 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1161 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1162 replyWithPluginRemoved();
1163 assertThat(pluginRemoved.get(), is(true));
1167 public void removingPluginWithPurgeWorks()
1168 throws InterruptedException, ExecutionException, IOException {
1169 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1170 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1171 replyWithPluginRemoved();
1172 assertThat(pluginRemoved.get(), is(true));
1175 private void replyWithPluginRemoved() throws IOException {
1176 fcpServer.writeLine(
1178 "Identifier=" + identifier,
1179 "PluginName=" + CLASS_NAME,
1184 private Matcher<List<String>> matchPluginRemovedMessage() {
1185 return matchesFcpMessage(
1187 "Identifier=" + identifier,
1188 "PluginName=" + CLASS_NAME
1194 public class GetPluginInfo {
1197 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1198 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1199 connectAndAssert(this::matchGetPluginInfoMessage);
1200 replyWithPluginInfo();
1201 verifyPluginInfo(pluginInfo);
1205 public void gettingPluginInfoWithDetailsWorks()
1206 throws InterruptedException, ExecutionException, IOException {
1207 Future<Optional<PluginInfo>> pluginInfo =
1208 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1209 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1210 replyWithPluginInfo();
1211 verifyPluginInfo(pluginInfo);
1215 public void protocolErrorIsRecognizedAsFailure()
1216 throws InterruptedException, ExecutionException, IOException {
1217 Future<Optional<PluginInfo>> pluginInfo =
1218 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1219 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1220 replyWithProtocolError();
1221 assertThat(pluginInfo.get(), is(Optional.empty()));
1224 private Matcher<List<String>> matchGetPluginInfoMessage() {
1225 return matchesFcpMessage(
1227 "Identifier=" + identifier,
1228 "PluginName=" + CLASS_NAME
1236 public class UskSubscriptionCommands {
1238 private static final String URI = "USK@some,uri/file.txt";
1241 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1242 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1243 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1244 replyWithSubscribed();
1245 assertThat(uskSubscription.get().get().getUri(), is(URI));
1246 AtomicInteger edition = new AtomicInteger();
1247 CountDownLatch updated = new CountDownLatch(2);
1248 uskSubscription.get().get().onUpdate(e -> {
1250 updated.countDown();
1252 sendUpdateNotification(23);
1253 sendUpdateNotification(24);
1254 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1255 assertThat(edition.get(), is(24));
1259 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1260 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1261 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1262 replyWithSubscribed();
1263 assertThat(uskSubscription.get().get().getUri(), is(URI));
1264 AtomicInteger edition = new AtomicInteger();
1265 CountDownLatch updated = new CountDownLatch(2);
1266 uskSubscription.get().get().onUpdate(e -> {
1268 updated.countDown();
1270 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1271 sendUpdateNotification(23);
1272 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1273 assertThat(edition.get(), is(23));
1277 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1278 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1279 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1280 replyWithSubscribed();
1281 assertThat(uskSubscription.get().get().getUri(), is(URI));
1282 AtomicBoolean updated = new AtomicBoolean();
1283 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1284 uskSubscription.get().get().cancel();
1285 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1286 sendUpdateNotification(23);
1287 assertThat(updated.get(), is(false));
1290 private void replyWithSubscribed() throws IOException {
1291 fcpServer.writeLine(
1293 "Identifier=" + identifier,
1300 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1301 fcpServer.writeLine(
1302 "SubscribedUSKUpdate",
1303 "Identifier=" + identifier,
1305 "Edition=" + edition
1307 fcpServer.writeLine(additionalLines);
1308 fcpServer.writeLine("EndMessage");
1313 public class ClientGet {
1316 public void works() throws InterruptedException, ExecutionException, IOException {
1317 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1318 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1319 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1320 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1321 Optional<Data> data = dataFuture.get();
1326 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1327 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1328 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1329 replyWithGetFailed("not-test");
1330 replyWithGetFailed(identifier);
1331 Optional<Data> data = dataFuture.get();
1332 assertThat(data.isPresent(), is(false));
1336 public void getFailedForDifferentIdentifierIsIgnored()
1337 throws InterruptedException, ExecutionException, IOException {
1338 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1339 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1340 replyWithGetFailed("not-test");
1341 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1342 Optional<Data> data = dataFuture.get();
1346 @Test(expected = ExecutionException.class)
1347 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1348 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1349 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1355 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1356 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1357 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1361 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1362 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1363 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1367 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1368 throws InterruptedException, ExecutionException, IOException {
1369 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1370 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1374 public void clientGetWithPrioritySettingSendsCorrectCommands()
1375 throws InterruptedException, ExecutionException, IOException {
1376 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1377 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1381 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1382 throws InterruptedException, ExecutionException, IOException {
1383 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1384 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1388 public void clientGetWithGlobalSettingSendsCorrectCommands()
1389 throws InterruptedException, ExecutionException, IOException {
1390 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1391 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1394 private void replyWithGetFailed(String identifier) throws IOException {
1395 fcpServer.writeLine(
1397 "Identifier=" + identifier,
1403 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1404 fcpServer.writeLine(
1406 "Identifier=" + identifier,
1407 "DataLength=" + (text.length() + 1),
1408 "StartupTime=1435610539000",
1409 "CompletionTime=1435610540000",
1410 "Metadata.ContentType=" + contentType,
1416 private void verifyData(Optional<Data> data) throws IOException {
1417 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1418 assertThat(data.get().size(), is(6L));
1419 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1420 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1425 public class ClientPut {
1428 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1429 fcpClient.clientPut()
1430 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1435 readMessage("Hello", this::matchesDirectClientPut);
1439 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1440 Future<Optional<Key>> key = fcpClient.clientPut()
1441 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1446 readMessage("Hello", this::matchesDirectClientPut);
1447 replyWithPutFailed("not-the-right-one");
1448 replyWithPutSuccessful(identifier);
1449 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1453 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1454 Future<Optional<Key>> key = fcpClient.clientPut()
1455 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1460 readMessage("Hello", this::matchesDirectClientPut);
1461 replyWithPutSuccessful("not-the-right-one");
1462 replyWithPutFailed(identifier);
1463 assertThat(key.get().isPresent(), is(false));
1467 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1468 fcpClient.clientPut()
1469 .named("otherName.txt")
1470 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1475 readMessage("Hello", () -> allOf(
1476 hasHead("ClientPut"),
1477 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1479 hasTail("EndMessage", "Hello")
1484 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1485 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1486 connectAndAssert(() ->
1487 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1491 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1492 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1493 connectAndAssert(() ->
1494 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1499 private final File ddaFile;
1500 private final File fileToUpload;
1502 public DDA() throws IOException {
1503 ddaFile = createDdaFile();
1504 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1507 private Matcher<List<String>> matchesFileClientPut(File file) {
1508 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1512 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1513 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1514 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1515 sendDdaRequired(identifier);
1516 readMessage(() -> matchesTestDDARequest(ddaFile));
1517 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1518 readMessage(() -> matchesTestDDAResponse(ddaFile));
1519 writeTestDDAComplete(ddaFile);
1520 readMessage(() -> matchesFileClientPut(fileToUpload));
1524 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1525 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1526 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1527 sendDdaRequired(identifier);
1528 readMessage(() -> matchesTestDDARequest(ddaFile));
1529 sendTestDDAReply("/some-other-directory", ddaFile);
1530 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1531 readMessage(() -> matchesTestDDAResponse(ddaFile));
1535 public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1536 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1537 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1538 sendDdaRequired(identifier);
1539 readMessage(() -> matchesTestDDARequest(ddaFile));
1540 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1541 readMessage(this::matchesFailedToReadResponse);
1545 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1546 throws IOException, ExecutionException, InterruptedException {
1547 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1549 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1550 String identifier = extractIdentifier(lines);
1551 fcpServer.writeLine(
1553 "Directory=/some-other-directory",
1556 sendDdaRequired(identifier);
1557 lines = fcpServer.collectUntil(is("EndMessage"));
1558 assertThat(lines, matchesFcpMessage(
1560 "Directory=" + ddaFile.getParent(),
1561 "WantReadDirectory=true",
1562 "WantWriteDirectory=false"
1566 private Matcher<List<String>> matchesFailedToReadResponse() {
1567 return matchesFcpMessage(
1569 "Directory=" + ddaFile.getParent(),
1570 "ReadContent=failed-to-read"
1574 private void writeTestDDAComplete(File tempFile) throws IOException {
1575 fcpServer.writeLine(
1577 "Directory=" + tempFile.getParent(),
1578 "ReadDirectoryAllowed=true",
1583 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1584 return matchesFcpMessage(
1586 "Directory=" + tempFile.getParent(),
1587 "ReadContent=test-content"
1591 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1592 fcpServer.writeLine(
1594 "Directory=" + directory,
1595 "ReadFilename=" + tempFile,
1600 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1601 return matchesFcpMessage(
1603 "Directory=" + tempFile.getParent(),
1604 "WantReadDirectory=true",
1605 "WantWriteDirectory=false"
1609 private void sendDdaRequired(String identifier) throws IOException {
1610 fcpServer.writeLine(
1612 "Identifier=" + identifier,
1620 private void replyWithPutSuccessful(String identifier) throws IOException {
1621 fcpServer.writeLine(
1624 "Identifier=" + identifier,
1629 private void replyWithPutFailed(String identifier) throws IOException {
1630 fcpServer.writeLine(
1632 "Identifier=" + identifier,
1637 private Matcher<List<String>> matchesDirectClientPut() {
1639 hasHead("ClientPut"),
1640 hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
1641 hasTail("EndMessage", "Hello")
1645 private File createDdaFile() throws IOException {
1646 File tempFile = File.createTempFile("test-dda-", ".dat");
1647 tempFile.deleteOnExit();
1648 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1653 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1654 throws InterruptedException, ExecutionException, IOException {
1655 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1657 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1658 String identifier = extractIdentifier(lines);
1659 fcpServer.writeLine(
1661 "Identifier=not-the-right-one",
1665 fcpServer.writeLine(
1667 "Identifier=" + identifier,
1671 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1675 public void clientPutAbortsOnProtocolErrorOtherThan25()
1676 throws InterruptedException, ExecutionException, IOException {
1677 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1679 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1680 String identifier = extractIdentifier(lines);
1681 fcpServer.writeLine(
1683 "Identifier=" + identifier,
1687 assertThat(key.get().isPresent(), is(false));
1691 public void clientPutSendsNotificationsForGeneratedKeys()
1692 throws InterruptedException, ExecutionException, IOException {
1693 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1694 Future<Optional<Key>> key = fcpClient.clientPut()
1695 .onKeyGenerated(generatedKeys::add)
1696 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1701 List<String> lines = fcpServer.collectUntil(is("Hello"));
1702 String identifier = extractIdentifier(lines);
1703 fcpServer.writeLine(
1705 "Identifier=" + identifier,
1709 replyWithPutSuccessful(identifier);
1710 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1711 assertThat(generatedKeys, contains("KSK@foo.txt"));
1716 public class ConfigCommand {
1718 public class GetConfig {
1721 public void defaultFcpClientCanGetConfigWithoutDetails()
1722 throws InterruptedException, ExecutionException, IOException {
1723 Future<ConfigData> configData = fcpClient.getConfig().execute();
1724 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1725 replyWithConfigData();
1726 assertThat(configData.get(), notNullValue());
1730 public void defaultFcpClientCanGetConfigWithCurrent()
1731 throws InterruptedException, ExecutionException, IOException {
1732 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1733 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1734 replyWithConfigData("current.foo=bar");
1735 assertThat(configData.get().getCurrent("foo"), is("bar"));
1739 public void defaultFcpClientCanGetConfigWithDefaults()
1740 throws InterruptedException, ExecutionException, IOException {
1741 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1742 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1743 replyWithConfigData("default.foo=bar");
1744 assertThat(configData.get().getDefault("foo"), is("bar"));
1748 public void defaultFcpClientCanGetConfigWithSortOrder()
1749 throws InterruptedException, ExecutionException, IOException {
1750 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1751 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1752 replyWithConfigData("sortOrder.foo=17");
1753 assertThat(configData.get().getSortOrder("foo"), is(17));
1757 public void defaultFcpClientCanGetConfigWithExpertFlag()
1758 throws InterruptedException, ExecutionException, IOException {
1759 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1760 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1761 replyWithConfigData("expertFlag.foo=true");
1762 assertThat(configData.get().getExpertFlag("foo"), is(true));
1766 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1767 throws InterruptedException, ExecutionException, IOException {
1768 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1769 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1770 replyWithConfigData("forceWriteFlag.foo=true");
1771 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1775 public void defaultFcpClientCanGetConfigWithShortDescription()
1776 throws InterruptedException, ExecutionException, IOException {
1777 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1778 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1779 replyWithConfigData("shortDescription.foo=bar");
1780 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1784 public void defaultFcpClientCanGetConfigWithLongDescription()
1785 throws InterruptedException, ExecutionException, IOException {
1786 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1787 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1788 replyWithConfigData("longDescription.foo=bar");
1789 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1793 public void defaultFcpClientCanGetConfigWithDataTypes()
1794 throws InterruptedException, ExecutionException, IOException {
1795 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1796 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1797 replyWithConfigData("dataType.foo=number");
1798 assertThat(configData.get().getDataType("foo"), is("number"));
1801 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1802 return matchesFcpMessage(
1804 "Identifier=" + identifier,
1805 additionalParameter + "=true"
1811 public class ModifyConfig {
1814 public void defaultFcpClientCanModifyConfigData()
1815 throws InterruptedException, ExecutionException, IOException {
1816 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1817 connectAndAssert(() -> matchesFcpMessage(
1819 "Identifier=" + identifier,
1822 replyWithConfigData("current.foo.bar=baz");
1823 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1828 private void replyWithConfigData(String... additionalLines) throws IOException {
1829 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1830 fcpServer.writeLine(additionalLines);
1831 fcpServer.writeLine("EndMessage");