1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
14 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
62 * Unit test for {@link DefaultFcpClient}.
64 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
69 private static final String INSERT_URI =
70 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71 private static final String REQUEST_URI =
72 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
74 private int threadCounter = 0;
75 private final ExecutorService threadPool =
76 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77 private final FakeTcpServer fcpServer;
78 private final DefaultFcpClient fcpClient;
80 public DefaultFcpClientTest() throws IOException {
81 fcpServer = new FakeTcpServer(threadPool);
82 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
86 public void tearDown() throws IOException {
88 threadPool.shutdown();
91 private void connectNode() throws InterruptedException, ExecutionException, IOException {
92 fcpServer.connect().get();
93 fcpServer.collectUntil(is("EndMessage"));
94 fcpServer.writeLine("NodeHello",
95 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
96 "Revision=build01466",
98 "Version=Fred,0.7,1.0,1466",
100 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
104 "NodeLanguage=ENGLISH",
110 private String extractIdentifier(List<String> lines) {
111 return lines.stream()
112 .filter(s -> s.startsWith("Identifier="))
113 .map(s -> s.substring(s.indexOf('=') + 1))
118 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
119 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
122 private Matcher<Iterable<String>> hasHead(String firstElement) {
123 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
125 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
126 if (!iterable.iterator().hasNext()) {
127 mismatchDescription.appendText("is empty");
130 String element = iterable.iterator().next();
131 if (!element.equals(firstElement)) {
132 mismatchDescription.appendText("starts with ").appendValue(element);
139 public void describeTo(Description description) {
140 description.appendText("starts with ").appendValue(firstElement);
145 private Matcher<List<String>> matchesFcpMessageWithTerminator(
146 String name, String terminator, String... requiredLines) {
147 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
150 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
151 return new TypeSafeDiagnosingMatcher<List<String>>() {
153 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
154 if (item.size() < (ignoreStart + ignoreEnd)) {
155 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
158 for (String line : lines) {
159 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
160 mismatchDescription.appendText("does not contains ").appendValue(line);
168 public void describeTo(Description description) {
169 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
170 description.appendText(", ignoring the first ").appendValue(ignoreStart);
171 description.appendText(" and the last ").appendValue(ignoreEnd);
176 private Matcher<List<String>> hasTail(String... lastElements) {
177 return new TypeSafeDiagnosingMatcher<List<String>>() {
179 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
180 if (list.size() < lastElements.length) {
181 mismatchDescription.appendText("is too small");
184 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
185 if (!tail.equals(Arrays.asList(lastElements))) {
186 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
193 public void describeTo(Description description) {
194 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
200 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
201 Future<NodeData> nodeData = fcpClient.getNode().execute();
203 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
204 String identifier = extractIdentifier(lines);
205 assertThat(lines, matchesFcpMessage(
207 "Identifier=" + identifier,
208 "GiveOpennetRef=false",
214 "Identifier=" + identifier,
215 "ark.pubURI=SSK@3YEf.../ark",
218 "version=Fred,0.7,1.0,1466",
219 "lastGoodVersion=Fred,0.7,1.0,1466",
222 assertThat(nodeData.get(), notNullValue());
226 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
227 throws InterruptedException, ExecutionException, IOException {
228 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
230 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
231 String identifier = extractIdentifier(lines);
232 assertThat(lines, matchesFcpMessage(
234 "Identifier=" + identifier,
235 "GiveOpennetRef=true",
241 "Identifier=" + identifier,
243 "ark.pubURI=SSK@3YEf.../ark",
246 "version=Fred,0.7,1.0,1466",
247 "lastGoodVersion=Fred,0.7,1.0,1466",
250 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
254 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
255 throws InterruptedException, ExecutionException, IOException {
256 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
258 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
259 String identifier = extractIdentifier(lines);
260 assertThat(lines, matchesFcpMessage(
262 "Identifier=" + identifier,
263 "GiveOpennetRef=false",
269 "Identifier=" + identifier,
271 "ark.pubURI=SSK@3YEf.../ark",
274 "version=Fred,0.7,1.0,1466",
275 "lastGoodVersion=Fred,0.7,1.0,1466",
276 "ark.privURI=SSK@XdHMiRl",
279 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
283 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
284 throws InterruptedException, ExecutionException, IOException {
285 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
287 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
288 String identifier = extractIdentifier(lines);
289 assertThat(lines, matchesFcpMessage(
291 "Identifier=" + identifier,
292 "GiveOpennetRef=false",
298 "Identifier=" + identifier,
300 "ark.pubURI=SSK@3YEf.../ark",
303 "version=Fred,0.7,1.0,1466",
304 "lastGoodVersion=Fred,0.7,1.0,1466",
305 "volatile.freeJavaMemory=205706528",
308 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
312 public void defaultFcpClientCanGetConfigWithoutDetails()
313 throws InterruptedException, ExecutionException, IOException {
314 Future<ConfigData> configData = fcpClient.getConfig().execute();
316 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
317 String identifier = extractIdentifier(lines);
318 assertThat(lines, matchesFcpMessage(
320 "Identifier=" + identifier
324 "Identifier=" + identifier,
327 assertThat(configData.get(), notNullValue());
331 public void defaultFcpClientCanGetConfigWithCurrent()
332 throws InterruptedException, ExecutionException, IOException {
333 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
335 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
336 String identifier = extractIdentifier(lines);
337 assertThat(lines, matchesFcpMessage(
339 "Identifier=" + identifier,
344 "Identifier=" + identifier,
348 assertThat(configData.get().getCurrent("foo"), is("bar"));
352 public void defaultFcpClientCanGetConfigWithDefaults()
353 throws InterruptedException, ExecutionException, IOException {
354 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
356 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
357 String identifier = extractIdentifier(lines);
358 assertThat(lines, matchesFcpMessage(
360 "Identifier=" + identifier,
365 "Identifier=" + identifier,
369 assertThat(configData.get().getDefault("foo"), is("bar"));
373 public void defaultFcpClientCanGetConfigWithSortOrder()
374 throws InterruptedException, ExecutionException, IOException {
375 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
377 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
378 String identifier = extractIdentifier(lines);
379 assertThat(lines, matchesFcpMessage(
381 "Identifier=" + identifier,
386 "Identifier=" + identifier,
390 assertThat(configData.get().getSortOrder("foo"), is(17));
394 public void defaultFcpClientCanGetConfigWithExpertFlag()
395 throws InterruptedException, ExecutionException, IOException {
396 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
398 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
399 String identifier = extractIdentifier(lines);
400 assertThat(lines, matchesFcpMessage(
402 "Identifier=" + identifier,
403 "WithExpertFlag=true"
407 "Identifier=" + identifier,
408 "expertFlag.foo=true",
411 assertThat(configData.get().getExpertFlag("foo"), is(true));
415 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
416 throws InterruptedException, ExecutionException, IOException {
417 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
419 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
420 String identifier = extractIdentifier(lines);
421 assertThat(lines, matchesFcpMessage(
423 "Identifier=" + identifier,
424 "WithForceWriteFlag=true"
428 "Identifier=" + identifier,
429 "forceWriteFlag.foo=true",
432 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
436 public void defaultFcpClientCanGetConfigWithShortDescription()
437 throws InterruptedException, ExecutionException, IOException {
438 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
440 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
441 String identifier = extractIdentifier(lines);
442 assertThat(lines, matchesFcpMessage(
444 "Identifier=" + identifier,
445 "WithShortDescription=true"
449 "Identifier=" + identifier,
450 "shortDescription.foo=bar",
453 assertThat(configData.get().getShortDescription("foo"), is("bar"));
457 public void defaultFcpClientCanGetConfigWithLongDescription()
458 throws InterruptedException, ExecutionException, IOException {
459 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
461 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
462 String identifier = extractIdentifier(lines);
463 assertThat(lines, matchesFcpMessage(
465 "Identifier=" + identifier,
466 "WithLongDescription=true"
470 "Identifier=" + identifier,
471 "longDescription.foo=bar",
474 assertThat(configData.get().getLongDescription("foo"), is("bar"));
478 public void defaultFcpClientCanGetConfigWithDataTypes()
479 throws InterruptedException, ExecutionException, IOException {
480 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
482 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
483 String identifier = extractIdentifier(lines);
484 assertThat(lines, matchesFcpMessage(
486 "Identifier=" + identifier,
491 "Identifier=" + identifier,
492 "dataType.foo=number",
495 assertThat(configData.get().getDataType("foo"), is("number"));
499 public void defaultFcpClientCanModifyConfigData() throws InterruptedException, ExecutionException, IOException {
500 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
502 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
503 String identifier = extractIdentifier(lines);
504 assertThat(lines, matchesFcpMessage(
506 "Identifier=" + identifier,
511 "Identifier=" + identifier,
512 "current.foo.bar=baz",
515 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
518 private List<String> lines;
519 private String identifier;
521 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
522 throws InterruptedException, ExecutionException, IOException {
524 readMessage(requestMatcher);
527 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
528 readMessage("EndMessage", requestMatcher);
531 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
532 lines = fcpServer.collectUntil(is(terminator));
533 identifier = extractIdentifier(lines);
534 assertThat(lines, requestMatcher.get());
537 public class ConnectionsAndKeyPairs {
539 public class Connections {
541 @Test(expected = ExecutionException.class)
542 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
543 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
544 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
546 "CloseConnectionDuplicateClientName",
552 @Test(expected = ExecutionException.class)
553 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
554 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
555 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
561 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
562 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
563 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
566 keyPair = fcpClient.generateKeypair().execute();
567 readMessage(() -> matchesFcpMessage("GenerateSSK"));
568 identifier = extractIdentifier(lines);
574 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
575 throws InterruptedException, ExecutionException, IOException {
576 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
577 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
582 } catch (ExecutionException e) {
585 keyPair = fcpClient.generateKeypair().execute();
586 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
593 public class GenerateKeyPair {
596 public void defaultFcpClientCanGenerateKeypair()
597 throws ExecutionException, InterruptedException, IOException {
598 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
599 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
601 FcpKeyPair keyPair = keyPairFuture.get();
602 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
603 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
608 private void replyWithKeyPair() throws IOException {
609 fcpServer.writeLine("SSKKeypair",
610 "InsertURI=" + INSERT_URI + "",
611 "RequestURI=" + REQUEST_URI + "",
612 "Identifier=" + identifier,
620 public class PeerCommands {
622 public class ListPeer {
625 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
626 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
627 connectAndAssert(() -> matchesListPeer("id1"));
628 replyWithPeer("id1");
629 assertThat(peer.get().get().getIdentity(), is("id1"));
633 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
634 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
635 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
636 replyWithPeer("id1");
637 assertThat(peer.get().get().getIdentity(), is("id1"));
641 public void byName() throws InterruptedException, ExecutionException, IOException {
642 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
643 connectAndAssert(() -> matchesListPeer("FriendNode"));
644 replyWithPeer("id1");
645 assertThat(peer.get().get().getIdentity(), is("id1"));
649 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
650 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
651 connectAndAssert(() -> matchesListPeer("id2"));
652 replyWithUnknownNodeIdentifier();
653 assertThat(peer.get().isPresent(), is(false));
656 private Matcher<List<String>> matchesListPeer(String nodeId) {
657 return matchesFcpMessage(
659 "Identifier=" + identifier,
660 "NodeIdentifier=" + nodeId
666 public class ListPeers {
669 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
670 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
671 connectAndAssert(() -> matchesListPeers(false, false));
672 replyWithPeer("id1");
673 replyWithPeer("id2");
675 assertThat(peers.get(), hasSize(2));
676 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
677 containsInAnyOrder("id1", "id2"));
681 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
682 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
683 connectAndAssert(() -> matchesListPeers(false, true));
684 replyWithPeer("id1", "metadata.foo=bar1");
685 replyWithPeer("id2", "metadata.foo=bar2");
687 assertThat(peers.get(), hasSize(2));
688 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
689 containsInAnyOrder("bar1", "bar2"));
693 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
694 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
695 connectAndAssert(() -> matchesListPeers(true, false));
696 replyWithPeer("id1", "volatile.foo=bar1");
697 replyWithPeer("id2", "volatile.foo=bar2");
699 assertThat(peers.get(), hasSize(2));
700 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
701 containsInAnyOrder("bar1", "bar2"));
704 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
705 return matchesFcpMessage(
707 "WithVolatile=" + withVolatile,
708 "WithMetadata=" + withMetadata
712 private void sendEndOfPeerList() throws IOException {
715 "Identifier=" + identifier,
722 public class AddPeer {
725 public void fromFile() throws InterruptedException, ExecutionException, IOException {
726 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
727 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
728 replyWithPeer("id1");
729 assertThat(peer.get().get().getIdentity(), is("id1"));
733 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
734 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
735 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
736 replyWithPeer("id1");
737 assertThat(peer.get().get().getIdentity(), is("id1"));
741 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
742 NodeRef nodeRef = createNodeRef();
743 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
744 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
750 "dsaGroup.q=subprime",
751 "dsaPubKey.y=dsa-public",
752 "physical.udp=1.2.3.4:5678",
756 replyWithPeer("id1");
757 assertThat(peer.get().get().getIdentity(), is("id1"));
760 private NodeRef createNodeRef() {
761 NodeRef nodeRef = new NodeRef();
762 nodeRef.setIdentity("id1");
763 nodeRef.setName("name");
764 nodeRef.setARK(new ARK("public", "1"));
765 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
766 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
767 nodeRef.setPhysicalUDP("1.2.3.4:5678");
768 nodeRef.setDSAPublicKey("dsa-public");
769 nodeRef.setSignature("sig");
773 private Matcher<List<String>> matchesAddPeer() {
774 return matchesFcpMessage(
776 "Identifier=" + identifier
782 public class ModifyPeer {
785 public void defaultFcpClientCanEnablePeerByName()
786 throws InterruptedException, ExecutionException, IOException {
787 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
788 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
789 replyWithPeer("id1");
790 assertThat(peer.get().get().getIdentity(), is("id1"));
794 public void defaultFcpClientCanDisablePeerByName()
795 throws InterruptedException, ExecutionException, IOException {
796 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
797 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
798 replyWithPeer("id1");
799 assertThat(peer.get().get().getIdentity(), is("id1"));
803 public void defaultFcpClientCanEnablePeerByIdentity()
804 throws InterruptedException, ExecutionException, IOException {
805 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
806 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
807 replyWithPeer("id1");
808 assertThat(peer.get().get().getIdentity(), is("id1"));
812 public void defaultFcpClientCanEnablePeerByHostAndPort()
813 throws InterruptedException, ExecutionException, IOException {
814 Future<Optional<Peer>> peer =
815 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
816 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
817 replyWithPeer("id1");
818 assertThat(peer.get().get().getIdentity(), is("id1"));
822 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
823 Future<Optional<Peer>> peer =
824 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
825 connectAndAssert(() -> allOf(
826 matchesModifyPeer("id1", "AllowLocalAddresses", true),
827 not(contains(startsWith("IsDisabled=")))
829 replyWithPeer("id1");
830 assertThat(peer.get().get().getIdentity(), is("id1"));
834 public void disallowLocalAddressesOfPeer()
835 throws InterruptedException, ExecutionException, IOException {
836 Future<Optional<Peer>> peer =
837 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
838 connectAndAssert(() -> allOf(
839 matchesModifyPeer("id1", "AllowLocalAddresses", false),
840 not(contains(startsWith("IsDisabled=")))
842 replyWithPeer("id1");
843 assertThat(peer.get().get().getIdentity(), is("id1"));
847 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
848 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
849 connectAndAssert(() -> allOf(
850 matchesModifyPeer("id1", "IsBurstOnly", true),
851 not(contains(startsWith("AllowLocalAddresses="))),
852 not(contains(startsWith("IsDisabled=")))
854 replyWithPeer("id1");
855 assertThat(peer.get().get().getIdentity(), is("id1"));
859 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
860 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
861 connectAndAssert(() -> allOf(
862 matchesModifyPeer("id1", "IsBurstOnly", false),
863 not(contains(startsWith("AllowLocalAddresses="))),
864 not(contains(startsWith("IsDisabled=")))
866 replyWithPeer("id1");
867 assertThat(peer.get().get().getIdentity(), is("id1"));
871 public void defaultFcpClientCanSetListenOnlyForPeer()
872 throws InterruptedException, ExecutionException, IOException {
873 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
874 connectAndAssert(() -> allOf(
875 matchesModifyPeer("id1", "IsListenOnly", true),
876 not(contains(startsWith("AllowLocalAddresses="))),
877 not(contains(startsWith("IsDisabled="))),
878 not(contains(startsWith("IsBurstOnly=")))
880 replyWithPeer("id1");
881 assertThat(peer.get().get().getIdentity(), is("id1"));
885 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
886 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
887 connectAndAssert(() -> allOf(
888 matchesModifyPeer("id1", "IsListenOnly", false),
889 not(contains(startsWith("AllowLocalAddresses="))),
890 not(contains(startsWith("IsDisabled="))),
891 not(contains(startsWith("IsBurstOnly=")))
893 replyWithPeer("id1");
894 assertThat(peer.get().get().getIdentity(), is("id1"));
898 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
899 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
900 connectAndAssert(() -> allOf(
901 matchesModifyPeer("id1", "IgnoreSourcePort", true),
902 not(contains(startsWith("AllowLocalAddresses="))),
903 not(contains(startsWith("IsDisabled="))),
904 not(contains(startsWith("IsBurstOnly="))),
905 not(contains(startsWith("IsListenOnly=")))
907 replyWithPeer("id1");
908 assertThat(peer.get().get().getIdentity(), is("id1"));
912 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
913 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
914 connectAndAssert(() -> allOf(
915 matchesModifyPeer("id1", "IgnoreSourcePort", false),
916 not(contains(startsWith("AllowLocalAddresses="))),
917 not(contains(startsWith("IsDisabled="))),
918 not(contains(startsWith("IsBurstOnly="))),
919 not(contains(startsWith("IsListenOnly=")))
921 replyWithPeer("id1");
922 assertThat(peer.get().get().getIdentity(), is("id1"));
926 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
927 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
928 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
929 replyWithUnknownNodeIdentifier();
930 assertThat(peer.get().isPresent(), is(false));
933 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
934 return matchesFcpMessage(
936 "Identifier=" + identifier,
937 "NodeIdentifier=" + nodeIdentifier,
938 setting + "=" + value
944 public class RemovePeer {
947 public void byName() throws InterruptedException, ExecutionException, IOException {
948 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
949 connectAndAssert(() -> matchesRemovePeer("Friend1"));
950 replyWithPeerRemoved("Friend1");
951 assertThat(peer.get(), is(true));
955 public void invalidName() throws InterruptedException, ExecutionException, IOException {
956 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
957 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
958 replyWithUnknownNodeIdentifier();
959 assertThat(peer.get(), is(false));
963 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
964 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
965 connectAndAssert(() -> matchesRemovePeer("id1"));
966 replyWithPeerRemoved("id1");
967 assertThat(peer.get(), is(true));
971 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
972 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
973 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
974 replyWithPeerRemoved("Friend1");
975 assertThat(peer.get(), is(true));
978 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
979 return matchesFcpMessage(
981 "Identifier=" + identifier,
982 "NodeIdentifier=" + nodeIdentifier
986 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
989 "Identifier=" + identifier,
990 "NodeIdentifier=" + nodeIdentifier,
997 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
1000 "Identifier=" + identifier,
1001 "identity=" + peerId,
1003 "ark.pubURI=SSK@3YEf.../ark",
1006 "version=Fred,0.7,1.0,1466",
1007 "lastGoodVersion=Fred,0.7,1.0,1466"
1009 fcpServer.writeLine(additionalLines);
1010 fcpServer.writeLine("EndMessage");
1015 public class PeerNoteCommands {
1017 public class ListPeerNotes {
1020 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1021 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1022 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
1023 replyWithUnknownNodeIdentifier();
1024 assertThat(peerNote.get().isPresent(), is(false));
1028 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
1029 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1030 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
1031 replyWithPeerNote();
1032 replyWithEndListPeerNotes();
1033 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1034 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1038 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1039 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
1040 connectAndAssert(() -> matchesListPeerNotes("id1"));
1041 replyWithPeerNote();
1042 replyWithEndListPeerNotes();
1043 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1044 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1048 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1049 Future<Optional<PeerNote>> peerNote =
1050 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
1051 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
1052 replyWithPeerNote();
1053 replyWithEndListPeerNotes();
1054 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1055 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1058 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
1059 return matchesFcpMessage(
1061 "NodeIdentifier=" + nodeIdentifier
1065 private void replyWithEndListPeerNotes() throws IOException {
1066 fcpServer.writeLine(
1068 "Identifier=" + identifier,
1073 private void replyWithPeerNote() throws IOException {
1074 fcpServer.writeLine(
1076 "Identifier=" + identifier,
1077 "NodeIdentifier=Friend1",
1078 "NoteText=RXhhbXBsZSBUZXh0Lg==",
1086 public class ModifyPeerNotes {
1089 public void byName() throws InterruptedException, ExecutionException, IOException {
1090 Future<Boolean> noteUpdated =
1091 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1092 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
1093 replyWithPeerNote();
1094 assertThat(noteUpdated.get(), is(true));
1098 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1099 Future<Boolean> noteUpdated =
1100 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1101 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
1102 replyWithUnknownNodeIdentifier();
1103 assertThat(noteUpdated.get(), is(false));
1107 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
1108 throws InterruptedException, ExecutionException, IOException {
1109 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
1110 assertThat(noteUpdated.get(), is(false));
1114 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
1115 Future<Boolean> noteUpdated =
1116 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
1117 connectAndAssert(() -> matchesModifyPeerNote("id1"));
1118 replyWithPeerNote();
1119 assertThat(noteUpdated.get(), is(true));
1123 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1124 Future<Boolean> noteUpdated =
1125 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
1126 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
1127 replyWithPeerNote();
1128 assertThat(noteUpdated.get(), is(true));
1131 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
1132 return matchesFcpMessage(
1134 "Identifier=" + identifier,
1135 "NodeIdentifier=" + nodeIdentifier,
1141 private void replyWithPeerNote() throws IOException {
1142 fcpServer.writeLine(
1144 "Identifier=" + identifier,
1145 "NodeIdentifier=Friend1",
1156 private void replyWithUnknownNodeIdentifier() throws IOException {
1157 fcpServer.writeLine(
1158 "UnknownNodeIdentifier",
1159 "Identifier=" + identifier,
1160 "NodeIdentifier=id2",
1167 public class PluginCommands {
1169 private static final String CLASS_NAME = "foo.plugin.Plugin";
1171 private void replyWithPluginInfo() throws IOException {
1172 fcpServer.writeLine(
1174 "Identifier=" + identifier,
1175 "PluginName=superPlugin",
1177 "LongVersion=1.2.3",
1179 "OriginUri=superPlugin",
1185 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
1186 throws InterruptedException, ExecutionException {
1187 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
1188 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
1189 assertThat(pluginInfo.get().get().isTalkable(), is(true));
1190 assertThat(pluginInfo.get().get().getVersion(), is("42"));
1191 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
1192 assertThat(pluginInfo.get().get().isStarted(), is(true));
1195 public class LoadPlugin {
1197 public class OfficialPlugins {
1200 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1201 Future<Optional<PluginInfo>> pluginInfo =
1202 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1203 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1204 assertThat(lines, not(contains(startsWith("Store="))));
1205 replyWithPluginInfo();
1206 verifyPluginInfo(pluginInfo);
1210 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
1211 Future<Optional<PluginInfo>> pluginInfo =
1212 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
1213 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1214 assertThat(lines, hasItem("Store=true"));
1215 replyWithPluginInfo();
1216 verifyPluginInfo(pluginInfo);
1220 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
1221 Future<Optional<PluginInfo>> pluginInfo =
1222 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
1223 connectAndAssert(() -> createMatcherForOfficialSource("https"));
1224 replyWithPluginInfo();
1225 verifyPluginInfo(pluginInfo);
1228 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
1229 return matchesFcpMessage(
1231 "Identifier=" + identifier,
1232 "PluginURL=superPlugin",
1234 "OfficialSource=" + officialSource
1240 public class FromOtherSources {
1242 private static final String FILE_PATH = "/path/to/plugin.jar";
1243 private static final String URL = "http://server.com/plugin.jar";
1244 private static final String KEY = "KSK@plugin.jar";
1247 public void fromFile() throws ExecutionException, InterruptedException, IOException {
1248 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
1249 connectAndAssert(() -> createMatcher("file", FILE_PATH));
1250 replyWithPluginInfo();
1251 verifyPluginInfo(pluginInfo);
1255 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
1256 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
1257 connectAndAssert(() -> createMatcher("url", URL));
1258 replyWithPluginInfo();
1259 verifyPluginInfo(pluginInfo);
1263 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1264 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
1265 connectAndAssert(() -> createMatcher("freenet", KEY));
1266 replyWithPluginInfo();
1267 verifyPluginInfo(pluginInfo);
1270 private Matcher<List<String>> createMatcher(String urlType, String url) {
1271 return matchesFcpMessage(
1273 "Identifier=" + identifier,
1275 "URLType=" + urlType
1281 public class Failed {
1284 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
1285 Future<Optional<PluginInfo>> pluginInfo =
1286 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1287 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
1288 replyWithProtocolError();
1289 assertThat(pluginInfo.get().isPresent(), is(false));
1296 private void replyWithProtocolError() throws IOException {
1297 fcpServer.writeLine(
1299 "Identifier=" + identifier,
1304 public class ReloadPlugin {
1307 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1308 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1309 connectAndAssert(this::matchReloadPluginMessage);
1310 replyWithPluginInfo();
1311 verifyPluginInfo(pluginInfo);
1315 public void reloadingPluginWithMaxWaitTimeWorks()
1316 throws InterruptedException, ExecutionException, IOException {
1317 Future<Optional<PluginInfo>> pluginInfo =
1318 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1319 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1320 replyWithPluginInfo();
1321 verifyPluginInfo(pluginInfo);
1325 public void reloadingPluginWithPurgeWorks()
1326 throws InterruptedException, ExecutionException, IOException {
1327 Future<Optional<PluginInfo>> pluginInfo =
1328 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1329 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1330 replyWithPluginInfo();
1331 verifyPluginInfo(pluginInfo);
1335 public void reloadingPluginWithStoreWorks()
1336 throws InterruptedException, ExecutionException, IOException {
1337 Future<Optional<PluginInfo>> pluginInfo =
1338 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1339 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1340 replyWithPluginInfo();
1341 verifyPluginInfo(pluginInfo);
1344 private Matcher<List<String>> matchReloadPluginMessage() {
1345 return matchesFcpMessage(
1347 "Identifier=" + identifier,
1348 "PluginName=" + CLASS_NAME
1354 public class RemovePlugin {
1357 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1358 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1359 connectAndAssert(this::matchPluginRemovedMessage);
1360 replyWithPluginRemoved();
1361 assertThat(pluginRemoved.get(), is(true));
1365 public void removingPluginWithMaxWaitTimeWorks()
1366 throws InterruptedException, ExecutionException, IOException {
1367 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1368 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1369 replyWithPluginRemoved();
1370 assertThat(pluginRemoved.get(), is(true));
1374 public void removingPluginWithPurgeWorks()
1375 throws InterruptedException, ExecutionException, IOException {
1376 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1377 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1378 replyWithPluginRemoved();
1379 assertThat(pluginRemoved.get(), is(true));
1382 private void replyWithPluginRemoved() throws IOException {
1383 fcpServer.writeLine(
1385 "Identifier=" + identifier,
1386 "PluginName=" + CLASS_NAME,
1391 private Matcher<List<String>> matchPluginRemovedMessage() {
1392 return matchesFcpMessage(
1394 "Identifier=" + identifier,
1395 "PluginName=" + CLASS_NAME
1401 public class GetPluginInfo {
1404 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1405 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1406 connectAndAssert(this::matchGetPluginInfoMessage);
1407 replyWithPluginInfo();
1408 verifyPluginInfo(pluginInfo);
1412 public void gettingPluginInfoWithDetailsWorks()
1413 throws InterruptedException, ExecutionException, IOException {
1414 Future<Optional<PluginInfo>> pluginInfo =
1415 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1416 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1417 replyWithPluginInfo();
1418 verifyPluginInfo(pluginInfo);
1422 public void protocolErrorIsRecognizedAsFailure()
1423 throws InterruptedException, ExecutionException, IOException {
1424 Future<Optional<PluginInfo>> pluginInfo =
1425 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1426 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1427 replyWithProtocolError();
1428 assertThat(pluginInfo.get(), is(Optional.empty()));
1431 private Matcher<List<String>> matchGetPluginInfoMessage() {
1432 return matchesFcpMessage(
1434 "Identifier=" + identifier,
1435 "PluginName=" + CLASS_NAME
1443 public class UskSubscriptionCommands {
1445 private static final String URI = "USK@some,uri/file.txt";
1448 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1449 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1450 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1451 replyWithSubscribed();
1452 assertThat(uskSubscription.get().get().getUri(), is(URI));
1453 AtomicInteger edition = new AtomicInteger();
1454 CountDownLatch updated = new CountDownLatch(2);
1455 uskSubscription.get().get().onUpdate(e -> {
1457 updated.countDown();
1459 sendUpdateNotification(23);
1460 sendUpdateNotification(24);
1461 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1462 assertThat(edition.get(), is(24));
1466 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1467 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1468 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1469 replyWithSubscribed();
1470 assertThat(uskSubscription.get().get().getUri(), is(URI));
1471 AtomicInteger edition = new AtomicInteger();
1472 CountDownLatch updated = new CountDownLatch(2);
1473 uskSubscription.get().get().onUpdate(e -> {
1475 updated.countDown();
1477 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1478 sendUpdateNotification(23);
1479 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1480 assertThat(edition.get(), is(23));
1484 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1485 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1486 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1487 replyWithSubscribed();
1488 assertThat(uskSubscription.get().get().getUri(), is(URI));
1489 AtomicBoolean updated = new AtomicBoolean();
1490 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1491 uskSubscription.get().get().cancel();
1492 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1493 sendUpdateNotification(23);
1494 assertThat(updated.get(), is(false));
1497 private void replyWithSubscribed() throws IOException {
1498 fcpServer.writeLine(
1500 "Identifier=" + identifier,
1507 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1508 fcpServer.writeLine(
1509 "SubscribedUSKUpdate",
1510 "Identifier=" + identifier,
1512 "Edition=" + edition
1514 fcpServer.writeLine(additionalLines);
1515 fcpServer.writeLine("EndMessage");
1520 public class ClientGet {
1523 public void works() throws InterruptedException, ExecutionException, IOException {
1524 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1525 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1526 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1527 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1528 Optional<Data> data = dataFuture.get();
1533 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1534 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1535 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1536 replyWithGetFailed("not-test");
1537 replyWithGetFailed(identifier);
1538 Optional<Data> data = dataFuture.get();
1539 assertThat(data.isPresent(), is(false));
1543 public void getFailedForDifferentIdentifierIsIgnored()
1544 throws InterruptedException, ExecutionException, IOException {
1545 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1546 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1547 replyWithGetFailed("not-test");
1548 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1549 Optional<Data> data = dataFuture.get();
1553 @Test(expected = ExecutionException.class)
1554 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1555 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1556 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1562 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1563 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1564 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1568 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1569 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1570 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1574 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1575 throws InterruptedException, ExecutionException, IOException {
1576 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1577 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1581 public void clientGetWithPrioritySettingSendsCorrectCommands()
1582 throws InterruptedException, ExecutionException, IOException {
1583 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1584 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1588 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1589 throws InterruptedException, ExecutionException, IOException {
1590 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1591 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1595 public void clientGetWithGlobalSettingSendsCorrectCommands()
1596 throws InterruptedException, ExecutionException, IOException {
1597 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1598 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1601 private void replyWithGetFailed(String identifier) throws IOException {
1602 fcpServer.writeLine(
1604 "Identifier=" + identifier,
1610 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1611 fcpServer.writeLine(
1613 "Identifier=" + identifier,
1614 "DataLength=" + (text.length() + 1),
1615 "StartupTime=1435610539000",
1616 "CompletionTime=1435610540000",
1617 "Metadata.ContentType=" + contentType,
1623 private void verifyData(Optional<Data> data) throws IOException {
1624 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1625 assertThat(data.get().size(), is(6L));
1626 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1627 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1632 public class ClientPut {
1635 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1636 fcpClient.clientPut()
1637 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1642 readMessage("Hello", this::matchesDirectClientPut);
1646 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1647 Future<Optional<Key>> key = fcpClient.clientPut()
1648 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1653 readMessage("Hello", this::matchesDirectClientPut);
1654 replyWithPutFailed("not-the-right-one");
1655 replyWithPutSuccessful(identifier);
1656 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1660 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1661 Future<Optional<Key>> key = fcpClient.clientPut()
1662 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1667 readMessage("Hello", this::matchesDirectClientPut);
1668 replyWithPutSuccessful("not-the-right-one");
1669 replyWithPutFailed(identifier);
1670 assertThat(key.get().isPresent(), is(false));
1674 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1675 fcpClient.clientPut()
1676 .named("otherName.txt")
1677 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1682 readMessage("Hello", () -> allOf(
1683 hasHead("ClientPut"),
1684 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1686 hasTail("EndMessage", "Hello")
1691 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1692 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1693 connectAndAssert(() ->
1694 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1698 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1699 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1700 connectAndAssert(() ->
1701 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1706 private final File ddaFile;
1707 private final File fileToUpload;
1709 public DDA() throws IOException {
1710 ddaFile = createDdaFile();
1711 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1714 private Matcher<List<String>> matchesFileClientPut(File file) {
1715 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1719 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1720 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1721 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1722 sendDdaRequired(identifier);
1723 readMessage(() -> matchesTestDDARequest(ddaFile));
1724 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1725 readMessage(() -> matchesTestDDAResponse(ddaFile));
1726 writeTestDDAComplete(ddaFile);
1727 readMessage(() -> matchesFileClientPut(fileToUpload));
1731 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1732 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1733 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1734 sendDdaRequired(identifier);
1735 readMessage(() -> matchesTestDDARequest(ddaFile));
1736 sendTestDDAReply("/some-other-directory", ddaFile);
1737 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1738 readMessage(() -> matchesTestDDAResponse(ddaFile));
1742 public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1743 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1744 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1745 sendDdaRequired(identifier);
1746 readMessage(() -> matchesTestDDARequest(ddaFile));
1747 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1748 readMessage(this::matchesFailedToReadResponse);
1752 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1753 throws IOException, ExecutionException, InterruptedException {
1754 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1756 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1757 String identifier = extractIdentifier(lines);
1758 fcpServer.writeLine(
1760 "Directory=/some-other-directory",
1763 sendDdaRequired(identifier);
1764 lines = fcpServer.collectUntil(is("EndMessage"));
1765 assertThat(lines, matchesFcpMessage(
1767 "Directory=" + ddaFile.getParent(),
1768 "WantReadDirectory=true",
1769 "WantWriteDirectory=false"
1773 private Matcher<List<String>> matchesFailedToReadResponse() {
1774 return matchesFcpMessage(
1776 "Directory=" + ddaFile.getParent(),
1777 "ReadContent=failed-to-read"
1781 private void writeTestDDAComplete(File tempFile) throws IOException {
1782 fcpServer.writeLine(
1784 "Directory=" + tempFile.getParent(),
1785 "ReadDirectoryAllowed=true",
1790 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1791 return matchesFcpMessage(
1793 "Directory=" + tempFile.getParent(),
1794 "ReadContent=test-content"
1798 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1799 fcpServer.writeLine(
1801 "Directory=" + directory,
1802 "ReadFilename=" + tempFile,
1807 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1808 return matchesFcpMessage(
1810 "Directory=" + tempFile.getParent(),
1811 "WantReadDirectory=true",
1812 "WantWriteDirectory=false"
1816 private void sendDdaRequired(String identifier) throws IOException {
1817 fcpServer.writeLine(
1819 "Identifier=" + identifier,
1827 private void replyWithPutSuccessful(String identifier) throws IOException {
1828 fcpServer.writeLine(
1831 "Identifier=" + identifier,
1836 private void replyWithPutFailed(String identifier) throws IOException {
1837 fcpServer.writeLine(
1839 "Identifier=" + identifier,
1844 private Matcher<List<String>> matchesDirectClientPut() {
1846 hasHead("ClientPut"),
1847 hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
1848 hasTail("EndMessage", "Hello")
1852 private File createDdaFile() throws IOException {
1853 File tempFile = File.createTempFile("test-dda-", ".dat");
1854 tempFile.deleteOnExit();
1855 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1860 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1861 throws InterruptedException, ExecutionException, IOException {
1862 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1864 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1865 String identifier = extractIdentifier(lines);
1866 fcpServer.writeLine(
1868 "Identifier=not-the-right-one",
1872 fcpServer.writeLine(
1874 "Identifier=" + identifier,
1878 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1882 public void clientPutAbortsOnProtocolErrorOtherThan25()
1883 throws InterruptedException, ExecutionException, IOException {
1884 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1886 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1887 String identifier = extractIdentifier(lines);
1888 fcpServer.writeLine(
1890 "Identifier=" + identifier,
1894 assertThat(key.get().isPresent(), is(false));
1898 public void clientPutSendsNotificationsForGeneratedKeys()
1899 throws InterruptedException, ExecutionException, IOException {
1900 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1901 Future<Optional<Key>> key = fcpClient.clientPut()
1902 .onKeyGenerated(generatedKeys::add)
1903 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1908 List<String> lines = fcpServer.collectUntil(is("Hello"));
1909 String identifier = extractIdentifier(lines);
1910 fcpServer.writeLine(
1912 "Identifier=" + identifier,
1916 replyWithPutSuccessful(identifier);
1917 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1918 assertThat(generatedKeys, contains("KSK@foo.txt"));