1 package net.pterodactylus.fcp.quelaton;
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
14 import java.io.ByteArrayInputStream;
16 import java.io.IOException;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
62 * Unit test for {@link DefaultFcpClient}.
64 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
69 private static final String INSERT_URI =
70 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71 private static final String REQUEST_URI =
72 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
74 private int threadCounter = 0;
75 private final ExecutorService threadPool =
76 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77 private final FakeTcpServer fcpServer;
78 private final DefaultFcpClient fcpClient;
80 public DefaultFcpClientTest() throws IOException {
81 fcpServer = new FakeTcpServer(threadPool);
82 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
86 public void tearDown() throws IOException {
88 threadPool.shutdown();
91 private void connectNode() throws InterruptedException, ExecutionException, IOException {
92 fcpServer.connect().get();
93 fcpServer.collectUntil(is("EndMessage"));
94 fcpServer.writeLine("NodeHello",
95 "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
96 "Revision=build01466",
98 "Version=Fred,0.7,1.0,1466",
100 "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
104 "NodeLanguage=ENGLISH",
110 private String extractIdentifier(List<String> lines) {
111 return lines.stream()
112 .filter(s -> s.startsWith("Identifier="))
113 .map(s -> s.substring(s.indexOf('=') + 1))
118 private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
119 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
122 private Matcher<Iterable<String>> hasHead(String firstElement) {
123 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
125 protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
126 if (!iterable.iterator().hasNext()) {
127 mismatchDescription.appendText("is empty");
130 String element = iterable.iterator().next();
131 if (!element.equals(firstElement)) {
132 mismatchDescription.appendText("starts with ").appendValue(element);
139 public void describeTo(Description description) {
140 description.appendText("starts with ").appendValue(firstElement);
145 private Matcher<List<String>> matchesFcpMessageWithTerminator(
146 String name, String terminator, String... requiredLines) {
147 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
150 private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
151 return new TypeSafeDiagnosingMatcher<List<String>>() {
153 protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
154 if (item.size() < (ignoreStart + ignoreEnd)) {
155 mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
158 for (String line : lines) {
159 if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
160 mismatchDescription.appendText("does not contains ").appendValue(line);
168 public void describeTo(Description description) {
169 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
170 description.appendText(", ignoring the first ").appendValue(ignoreStart);
171 description.appendText(" and the last ").appendValue(ignoreEnd);
176 private Matcher<List<String>> hasTail(String... lastElements) {
177 return new TypeSafeDiagnosingMatcher<List<String>>() {
179 protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
180 if (list.size() < lastElements.length) {
181 mismatchDescription.appendText("is too small");
184 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
185 if (!tail.equals(Arrays.asList(lastElements))) {
186 mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
193 public void describeTo(Description description) {
194 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
199 private List<String> lines;
200 private String identifier;
202 private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
203 throws InterruptedException, ExecutionException, IOException {
205 readMessage(requestMatcher);
208 private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
209 readMessage("EndMessage", requestMatcher);
212 private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
213 lines = fcpServer.collectUntil(is(terminator));
214 identifier = extractIdentifier(lines);
215 assertThat(lines, requestMatcher.get());
218 private void replyWithProtocolError() throws IOException {
221 "Identifier=" + identifier,
226 public class ConnectionsAndKeyPairs {
228 public class Connections {
230 @Test(expected = ExecutionException.class)
231 public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
232 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
233 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
235 "CloseConnectionDuplicateClientName",
241 @Test(expected = ExecutionException.class)
242 public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
243 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
244 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
250 public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
251 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
252 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
255 keyPair = fcpClient.generateKeypair().execute();
256 readMessage(() -> matchesFcpMessage("GenerateSSK"));
257 identifier = extractIdentifier(lines);
263 public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
264 throws InterruptedException, ExecutionException, IOException {
265 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
266 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
271 } catch (ExecutionException e) {
274 keyPair = fcpClient.generateKeypair().execute();
275 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
282 public class GenerateKeyPair {
285 public void defaultFcpClientCanGenerateKeypair()
286 throws ExecutionException, InterruptedException, IOException {
287 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
288 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
290 FcpKeyPair keyPair = keyPairFuture.get();
291 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
292 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
297 private void replyWithKeyPair() throws IOException {
298 fcpServer.writeLine("SSKKeypair",
299 "InsertURI=" + INSERT_URI + "",
300 "RequestURI=" + REQUEST_URI + "",
301 "Identifier=" + identifier,
309 public class PeerCommands {
311 public class ListPeer {
314 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
315 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
316 connectAndAssert(() -> matchesListPeer("id1"));
317 replyWithPeer("id1");
318 assertThat(peer.get().get().getIdentity(), is("id1"));
322 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
323 Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
324 connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
325 replyWithPeer("id1");
326 assertThat(peer.get().get().getIdentity(), is("id1"));
330 public void byName() throws InterruptedException, ExecutionException, IOException {
331 Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
332 connectAndAssert(() -> matchesListPeer("FriendNode"));
333 replyWithPeer("id1");
334 assertThat(peer.get().get().getIdentity(), is("id1"));
338 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
339 Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
340 connectAndAssert(() -> matchesListPeer("id2"));
341 replyWithUnknownNodeIdentifier();
342 assertThat(peer.get().isPresent(), is(false));
345 private Matcher<List<String>> matchesListPeer(String nodeId) {
346 return matchesFcpMessage(
348 "Identifier=" + identifier,
349 "NodeIdentifier=" + nodeId
355 public class ListPeers {
358 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
359 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
360 connectAndAssert(() -> matchesListPeers(false, false));
361 replyWithPeer("id1");
362 replyWithPeer("id2");
364 assertThat(peers.get(), hasSize(2));
365 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
366 containsInAnyOrder("id1", "id2"));
370 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
371 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
372 connectAndAssert(() -> matchesListPeers(false, true));
373 replyWithPeer("id1", "metadata.foo=bar1");
374 replyWithPeer("id2", "metadata.foo=bar2");
376 assertThat(peers.get(), hasSize(2));
377 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
378 containsInAnyOrder("bar1", "bar2"));
382 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
383 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
384 connectAndAssert(() -> matchesListPeers(true, false));
385 replyWithPeer("id1", "volatile.foo=bar1");
386 replyWithPeer("id2", "volatile.foo=bar2");
388 assertThat(peers.get(), hasSize(2));
389 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
390 containsInAnyOrder("bar1", "bar2"));
393 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
394 return matchesFcpMessage(
396 "WithVolatile=" + withVolatile,
397 "WithMetadata=" + withMetadata
401 private void sendEndOfPeerList() throws IOException {
404 "Identifier=" + identifier,
411 public class AddPeer {
414 public void fromFile() throws InterruptedException, ExecutionException, IOException {
415 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
416 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
417 replyWithPeer("id1");
418 assertThat(peer.get().get().getIdentity(), is("id1"));
422 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
423 Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
424 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
425 replyWithPeer("id1");
426 assertThat(peer.get().get().getIdentity(), is("id1"));
430 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
431 NodeRef nodeRef = createNodeRef();
432 Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
433 connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
439 "dsaGroup.q=subprime",
440 "dsaPubKey.y=dsa-public",
441 "physical.udp=1.2.3.4:5678",
445 replyWithPeer("id1");
446 assertThat(peer.get().get().getIdentity(), is("id1"));
450 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
451 Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
452 connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
453 replyWithProtocolError();
454 assertThat(peer.get().isPresent(), is(false));
457 private NodeRef createNodeRef() {
458 NodeRef nodeRef = new NodeRef();
459 nodeRef.setIdentity("id1");
460 nodeRef.setName("name");
461 nodeRef.setARK(new ARK("public", "1"));
462 nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
463 nodeRef.setNegotiationTypes(new int[] { 3, 5 });
464 nodeRef.setPhysicalUDP("1.2.3.4:5678");
465 nodeRef.setDSAPublicKey("dsa-public");
466 nodeRef.setSignature("sig");
470 private Matcher<List<String>> matchesAddPeer() {
471 return matchesFcpMessage(
473 "Identifier=" + identifier
479 public class ModifyPeer {
482 public void defaultFcpClientCanEnablePeerByName()
483 throws InterruptedException, ExecutionException, IOException {
484 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
485 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
486 replyWithPeer("id1");
487 assertThat(peer.get().get().getIdentity(), is("id1"));
491 public void defaultFcpClientCanDisablePeerByName()
492 throws InterruptedException, ExecutionException, IOException {
493 Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
494 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
495 replyWithPeer("id1");
496 assertThat(peer.get().get().getIdentity(), is("id1"));
500 public void defaultFcpClientCanEnablePeerByIdentity()
501 throws InterruptedException, ExecutionException, IOException {
502 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
503 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
504 replyWithPeer("id1");
505 assertThat(peer.get().get().getIdentity(), is("id1"));
509 public void defaultFcpClientCanEnablePeerByHostAndPort()
510 throws InterruptedException, ExecutionException, IOException {
511 Future<Optional<Peer>> peer =
512 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
513 connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
514 replyWithPeer("id1");
515 assertThat(peer.get().get().getIdentity(), is("id1"));
519 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
520 Future<Optional<Peer>> peer =
521 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
522 connectAndAssert(() -> allOf(
523 matchesModifyPeer("id1", "AllowLocalAddresses", true),
524 not(contains(startsWith("IsDisabled=")))
526 replyWithPeer("id1");
527 assertThat(peer.get().get().getIdentity(), is("id1"));
531 public void disallowLocalAddressesOfPeer()
532 throws InterruptedException, ExecutionException, IOException {
533 Future<Optional<Peer>> peer =
534 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
535 connectAndAssert(() -> allOf(
536 matchesModifyPeer("id1", "AllowLocalAddresses", false),
537 not(contains(startsWith("IsDisabled=")))
539 replyWithPeer("id1");
540 assertThat(peer.get().get().getIdentity(), is("id1"));
544 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
545 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
546 connectAndAssert(() -> allOf(
547 matchesModifyPeer("id1", "IsBurstOnly", true),
548 not(contains(startsWith("AllowLocalAddresses="))),
549 not(contains(startsWith("IsDisabled=")))
551 replyWithPeer("id1");
552 assertThat(peer.get().get().getIdentity(), is("id1"));
556 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
557 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
558 connectAndAssert(() -> allOf(
559 matchesModifyPeer("id1", "IsBurstOnly", false),
560 not(contains(startsWith("AllowLocalAddresses="))),
561 not(contains(startsWith("IsDisabled=")))
563 replyWithPeer("id1");
564 assertThat(peer.get().get().getIdentity(), is("id1"));
568 public void defaultFcpClientCanSetListenOnlyForPeer()
569 throws InterruptedException, ExecutionException, IOException {
570 Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
571 connectAndAssert(() -> allOf(
572 matchesModifyPeer("id1", "IsListenOnly", true),
573 not(contains(startsWith("AllowLocalAddresses="))),
574 not(contains(startsWith("IsDisabled="))),
575 not(contains(startsWith("IsBurstOnly=")))
577 replyWithPeer("id1");
578 assertThat(peer.get().get().getIdentity(), is("id1"));
582 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
583 Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
584 connectAndAssert(() -> allOf(
585 matchesModifyPeer("id1", "IsListenOnly", false),
586 not(contains(startsWith("AllowLocalAddresses="))),
587 not(contains(startsWith("IsDisabled="))),
588 not(contains(startsWith("IsBurstOnly=")))
590 replyWithPeer("id1");
591 assertThat(peer.get().get().getIdentity(), is("id1"));
595 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
596 Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
597 connectAndAssert(() -> allOf(
598 matchesModifyPeer("id1", "IgnoreSourcePort", true),
599 not(contains(startsWith("AllowLocalAddresses="))),
600 not(contains(startsWith("IsDisabled="))),
601 not(contains(startsWith("IsBurstOnly="))),
602 not(contains(startsWith("IsListenOnly=")))
604 replyWithPeer("id1");
605 assertThat(peer.get().get().getIdentity(), is("id1"));
609 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
610 Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
611 connectAndAssert(() -> allOf(
612 matchesModifyPeer("id1", "IgnoreSourcePort", false),
613 not(contains(startsWith("AllowLocalAddresses="))),
614 not(contains(startsWith("IsDisabled="))),
615 not(contains(startsWith("IsBurstOnly="))),
616 not(contains(startsWith("IsListenOnly=")))
618 replyWithPeer("id1");
619 assertThat(peer.get().get().getIdentity(), is("id1"));
623 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
624 Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
625 connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
626 replyWithUnknownNodeIdentifier();
627 assertThat(peer.get().isPresent(), is(false));
630 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
631 return matchesFcpMessage(
633 "Identifier=" + identifier,
634 "NodeIdentifier=" + nodeIdentifier,
635 setting + "=" + value
641 public class RemovePeer {
644 public void byName() throws InterruptedException, ExecutionException, IOException {
645 Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
646 connectAndAssert(() -> matchesRemovePeer("Friend1"));
647 replyWithPeerRemoved("Friend1");
648 assertThat(peer.get(), is(true));
652 public void invalidName() throws InterruptedException, ExecutionException, IOException {
653 Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
654 connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
655 replyWithUnknownNodeIdentifier();
656 assertThat(peer.get(), is(false));
660 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
661 Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
662 connectAndAssert(() -> matchesRemovePeer("id1"));
663 replyWithPeerRemoved("id1");
664 assertThat(peer.get(), is(true));
668 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
669 Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
670 connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
671 replyWithPeerRemoved("Friend1");
672 assertThat(peer.get(), is(true));
675 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
676 return matchesFcpMessage(
678 "Identifier=" + identifier,
679 "NodeIdentifier=" + nodeIdentifier
683 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
686 "Identifier=" + identifier,
687 "NodeIdentifier=" + nodeIdentifier,
694 private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
697 "Identifier=" + identifier,
698 "identity=" + peerId,
700 "ark.pubURI=SSK@3YEf.../ark",
703 "version=Fred,0.7,1.0,1466",
704 "lastGoodVersion=Fred,0.7,1.0,1466"
706 fcpServer.writeLine(additionalLines);
707 fcpServer.writeLine("EndMessage");
712 public class PeerNoteCommands {
714 public class ListPeerNotes {
717 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
718 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
719 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
720 replyWithUnknownNodeIdentifier();
721 assertThat(peerNote.get().isPresent(), is(false));
725 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
726 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
727 connectAndAssert(() -> matchesListPeerNotes("Friend1"));
729 replyWithEndListPeerNotes();
730 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
731 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
735 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
736 Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
737 connectAndAssert(() -> matchesListPeerNotes("id1"));
739 replyWithEndListPeerNotes();
740 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
741 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
745 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
746 Future<Optional<PeerNote>> peerNote =
747 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
748 connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
750 replyWithEndListPeerNotes();
751 assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
752 assertThat(peerNote.get().get().getPeerNoteType(), is(1));
755 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
756 return matchesFcpMessage(
758 "NodeIdentifier=" + nodeIdentifier
762 private void replyWithEndListPeerNotes() throws IOException {
765 "Identifier=" + identifier,
770 private void replyWithPeerNote() throws IOException {
773 "Identifier=" + identifier,
774 "NodeIdentifier=Friend1",
775 "NoteText=RXhhbXBsZSBUZXh0Lg==",
783 public class ModifyPeerNotes {
786 public void byName() throws InterruptedException, ExecutionException, IOException {
787 Future<Boolean> noteUpdated =
788 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
789 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
791 assertThat(noteUpdated.get(), is(true));
795 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
796 Future<Boolean> noteUpdated =
797 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
798 connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
799 replyWithUnknownNodeIdentifier();
800 assertThat(noteUpdated.get(), is(false));
804 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
805 throws InterruptedException, ExecutionException, IOException {
806 Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
807 assertThat(noteUpdated.get(), is(false));
811 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
812 Future<Boolean> noteUpdated =
813 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
814 connectAndAssert(() -> matchesModifyPeerNote("id1"));
816 assertThat(noteUpdated.get(), is(true));
820 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
821 Future<Boolean> noteUpdated =
822 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
823 connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
825 assertThat(noteUpdated.get(), is(true));
828 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
829 return matchesFcpMessage(
831 "Identifier=" + identifier,
832 "NodeIdentifier=" + nodeIdentifier,
838 private void replyWithPeerNote() throws IOException {
841 "Identifier=" + identifier,
842 "NodeIdentifier=Friend1",
853 private void replyWithUnknownNodeIdentifier() throws IOException {
855 "UnknownNodeIdentifier",
856 "Identifier=" + identifier,
857 "NodeIdentifier=id2",
864 public class PluginCommands {
866 private static final String CLASS_NAME = "foo.plugin.Plugin";
868 private void replyWithPluginInfo() throws IOException {
871 "Identifier=" + identifier,
872 "PluginName=superPlugin",
876 "OriginUri=superPlugin",
882 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
883 throws InterruptedException, ExecutionException {
884 assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
885 assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
886 assertThat(pluginInfo.get().get().isTalkable(), is(true));
887 assertThat(pluginInfo.get().get().getVersion(), is("42"));
888 assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
889 assertThat(pluginInfo.get().get().isStarted(), is(true));
892 public class LoadPlugin {
894 public class OfficialPlugins {
897 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
898 Future<Optional<PluginInfo>> pluginInfo =
899 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
900 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
901 assertThat(lines, not(contains(startsWith("Store="))));
902 replyWithPluginInfo();
903 verifyPluginInfo(pluginInfo);
907 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
908 Future<Optional<PluginInfo>> pluginInfo =
909 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
910 connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
911 assertThat(lines, hasItem("Store=true"));
912 replyWithPluginInfo();
913 verifyPluginInfo(pluginInfo);
917 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
918 Future<Optional<PluginInfo>> pluginInfo =
919 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
920 connectAndAssert(() -> createMatcherForOfficialSource("https"));
921 replyWithPluginInfo();
922 verifyPluginInfo(pluginInfo);
925 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
926 return matchesFcpMessage(
928 "Identifier=" + identifier,
929 "PluginURL=superPlugin",
931 "OfficialSource=" + officialSource
937 public class FromOtherSources {
939 private static final String FILE_PATH = "/path/to/plugin.jar";
940 private static final String URL = "http://server.com/plugin.jar";
941 private static final String KEY = "KSK@plugin.jar";
944 public void fromFile() throws ExecutionException, InterruptedException, IOException {
945 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
946 connectAndAssert(() -> createMatcher("file", FILE_PATH));
947 replyWithPluginInfo();
948 verifyPluginInfo(pluginInfo);
952 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
953 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
954 connectAndAssert(() -> createMatcher("url", URL));
955 replyWithPluginInfo();
956 verifyPluginInfo(pluginInfo);
960 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
961 Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
962 connectAndAssert(() -> createMatcher("freenet", KEY));
963 replyWithPluginInfo();
964 verifyPluginInfo(pluginInfo);
967 private Matcher<List<String>> createMatcher(String urlType, String url) {
968 return matchesFcpMessage(
970 "Identifier=" + identifier,
978 public class Failed {
981 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
982 Future<Optional<PluginInfo>> pluginInfo =
983 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
984 connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
985 replyWithProtocolError();
986 assertThat(pluginInfo.get().isPresent(), is(false));
993 public class ReloadPlugin {
996 public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
997 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
998 connectAndAssert(this::matchReloadPluginMessage);
999 replyWithPluginInfo();
1000 verifyPluginInfo(pluginInfo);
1004 public void reloadingPluginWithMaxWaitTimeWorks()
1005 throws InterruptedException, ExecutionException, IOException {
1006 Future<Optional<PluginInfo>> pluginInfo =
1007 fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1008 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1009 replyWithPluginInfo();
1010 verifyPluginInfo(pluginInfo);
1014 public void reloadingPluginWithPurgeWorks()
1015 throws InterruptedException, ExecutionException, IOException {
1016 Future<Optional<PluginInfo>> pluginInfo =
1017 fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1018 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1019 replyWithPluginInfo();
1020 verifyPluginInfo(pluginInfo);
1024 public void reloadingPluginWithStoreWorks()
1025 throws InterruptedException, ExecutionException, IOException {
1026 Future<Optional<PluginInfo>> pluginInfo =
1027 fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1028 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1029 replyWithPluginInfo();
1030 verifyPluginInfo(pluginInfo);
1033 private Matcher<List<String>> matchReloadPluginMessage() {
1034 return matchesFcpMessage(
1036 "Identifier=" + identifier,
1037 "PluginName=" + CLASS_NAME
1043 public class RemovePlugin {
1046 public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1047 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1048 connectAndAssert(this::matchPluginRemovedMessage);
1049 replyWithPluginRemoved();
1050 assertThat(pluginRemoved.get(), is(true));
1054 public void removingPluginWithMaxWaitTimeWorks()
1055 throws InterruptedException, ExecutionException, IOException {
1056 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1057 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1058 replyWithPluginRemoved();
1059 assertThat(pluginRemoved.get(), is(true));
1063 public void removingPluginWithPurgeWorks()
1064 throws InterruptedException, ExecutionException, IOException {
1065 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1066 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1067 replyWithPluginRemoved();
1068 assertThat(pluginRemoved.get(), is(true));
1071 private void replyWithPluginRemoved() throws IOException {
1072 fcpServer.writeLine(
1074 "Identifier=" + identifier,
1075 "PluginName=" + CLASS_NAME,
1080 private Matcher<List<String>> matchPluginRemovedMessage() {
1081 return matchesFcpMessage(
1083 "Identifier=" + identifier,
1084 "PluginName=" + CLASS_NAME
1090 public class GetPluginInfo {
1093 public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1094 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1095 connectAndAssert(this::matchGetPluginInfoMessage);
1096 replyWithPluginInfo();
1097 verifyPluginInfo(pluginInfo);
1101 public void gettingPluginInfoWithDetailsWorks()
1102 throws InterruptedException, ExecutionException, IOException {
1103 Future<Optional<PluginInfo>> pluginInfo =
1104 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1105 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1106 replyWithPluginInfo();
1107 verifyPluginInfo(pluginInfo);
1111 public void protocolErrorIsRecognizedAsFailure()
1112 throws InterruptedException, ExecutionException, IOException {
1113 Future<Optional<PluginInfo>> pluginInfo =
1114 fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1115 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1116 replyWithProtocolError();
1117 assertThat(pluginInfo.get(), is(Optional.empty()));
1120 private Matcher<List<String>> matchGetPluginInfoMessage() {
1121 return matchesFcpMessage(
1123 "Identifier=" + identifier,
1124 "PluginName=" + CLASS_NAME
1132 public class UskSubscriptionCommands {
1134 private static final String URI = "USK@some,uri/file.txt";
1137 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1138 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1139 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1140 replyWithSubscribed();
1141 assertThat(uskSubscription.get().get().getUri(), is(URI));
1142 AtomicInteger edition = new AtomicInteger();
1143 CountDownLatch updated = new CountDownLatch(2);
1144 uskSubscription.get().get().onUpdate(e -> {
1146 updated.countDown();
1148 sendUpdateNotification(23);
1149 sendUpdateNotification(24);
1150 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1151 assertThat(edition.get(), is(24));
1155 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1156 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1157 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1158 replyWithSubscribed();
1159 assertThat(uskSubscription.get().get().getUri(), is(URI));
1160 AtomicInteger edition = new AtomicInteger();
1161 CountDownLatch updated = new CountDownLatch(2);
1162 uskSubscription.get().get().onUpdate(e -> {
1164 updated.countDown();
1166 uskSubscription.get().get().onUpdate(e -> updated.countDown());
1167 sendUpdateNotification(23);
1168 assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1169 assertThat(edition.get(), is(23));
1173 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1174 Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1175 connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1176 replyWithSubscribed();
1177 assertThat(uskSubscription.get().get().getUri(), is(URI));
1178 AtomicBoolean updated = new AtomicBoolean();
1179 uskSubscription.get().get().onUpdate(e -> updated.set(true));
1180 uskSubscription.get().get().cancel();
1181 readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1182 sendUpdateNotification(23);
1183 assertThat(updated.get(), is(false));
1186 private void replyWithSubscribed() throws IOException {
1187 fcpServer.writeLine(
1189 "Identifier=" + identifier,
1196 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1197 fcpServer.writeLine(
1198 "SubscribedUSKUpdate",
1199 "Identifier=" + identifier,
1201 "Edition=" + edition
1203 fcpServer.writeLine(additionalLines);
1204 fcpServer.writeLine("EndMessage");
1209 public class ClientGet {
1212 public void works() throws InterruptedException, ExecutionException, IOException {
1213 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1214 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1215 replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1216 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1217 Optional<Data> data = dataFuture.get();
1222 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1223 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1224 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1225 replyWithGetFailed("not-test");
1226 replyWithGetFailed(identifier);
1227 Optional<Data> data = dataFuture.get();
1228 assertThat(data.isPresent(), is(false));
1232 public void getFailedForDifferentIdentifierIsIgnored()
1233 throws InterruptedException, ExecutionException, IOException {
1234 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1235 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1236 replyWithGetFailed("not-test");
1237 replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1238 Optional<Data> data = dataFuture.get();
1242 @Test(expected = ExecutionException.class)
1243 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1244 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1245 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1251 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1252 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1253 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1257 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1258 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1259 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1263 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1264 throws InterruptedException, ExecutionException, IOException {
1265 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1266 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1270 public void clientGetWithPrioritySettingSendsCorrectCommands()
1271 throws InterruptedException, ExecutionException, IOException {
1272 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1273 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1277 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1278 throws InterruptedException, ExecutionException, IOException {
1279 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1280 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1284 public void clientGetWithGlobalSettingSendsCorrectCommands()
1285 throws InterruptedException, ExecutionException, IOException {
1286 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1287 connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1290 private void replyWithGetFailed(String identifier) throws IOException {
1291 fcpServer.writeLine(
1293 "Identifier=" + identifier,
1299 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1300 fcpServer.writeLine(
1302 "Identifier=" + identifier,
1303 "DataLength=" + (text.length() + 1),
1304 "StartupTime=1435610539000",
1305 "CompletionTime=1435610540000",
1306 "Metadata.ContentType=" + contentType,
1312 private void verifyData(Optional<Data> data) throws IOException {
1313 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1314 assertThat(data.get().size(), is(6L));
1315 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1316 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1321 public class ClientPut {
1324 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1325 fcpClient.clientPut()
1326 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1331 readMessage("Hello", this::matchesDirectClientPut);
1335 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1336 Future<Optional<Key>> key = fcpClient.clientPut()
1337 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1342 readMessage("Hello", this::matchesDirectClientPut);
1343 replyWithPutFailed("not-the-right-one");
1344 replyWithPutSuccessful(identifier);
1345 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1349 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1350 Future<Optional<Key>> key = fcpClient.clientPut()
1351 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1356 readMessage("Hello", this::matchesDirectClientPut);
1357 replyWithPutSuccessful("not-the-right-one");
1358 replyWithPutFailed(identifier);
1359 assertThat(key.get().isPresent(), is(false));
1363 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1364 fcpClient.clientPut()
1365 .named("otherName.txt")
1366 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1371 readMessage("Hello", () -> allOf(
1372 hasHead("ClientPut"),
1373 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1375 hasTail("EndMessage", "Hello")
1380 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1381 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1382 connectAndAssert(() ->
1383 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1387 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1388 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1389 connectAndAssert(() ->
1390 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1395 private final File ddaFile;
1396 private final File fileToUpload;
1398 public DDA() throws IOException {
1399 ddaFile = createDdaFile();
1400 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1403 private Matcher<List<String>> matchesFileClientPut(File file) {
1404 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1408 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1409 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1410 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1411 sendDdaRequired(identifier);
1412 readMessage(() -> matchesTestDDARequest(ddaFile));
1413 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1414 readMessage(() -> matchesTestDDAResponse(ddaFile));
1415 writeTestDDAComplete(ddaFile);
1416 readMessage(() -> matchesFileClientPut(fileToUpload));
1420 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1421 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1422 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1423 sendDdaRequired(identifier);
1424 readMessage(() -> matchesTestDDARequest(ddaFile));
1425 sendTestDDAReply("/some-other-directory", ddaFile);
1426 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1427 readMessage(() -> matchesTestDDAResponse(ddaFile));
1431 public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1432 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1433 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1434 sendDdaRequired(identifier);
1435 readMessage(() -> matchesTestDDARequest(ddaFile));
1436 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1437 readMessage(this::matchesFailedToReadResponse);
1441 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1442 throws IOException, ExecutionException, InterruptedException {
1443 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1445 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1446 String identifier = extractIdentifier(lines);
1447 fcpServer.writeLine(
1449 "Directory=/some-other-directory",
1452 sendDdaRequired(identifier);
1453 lines = fcpServer.collectUntil(is("EndMessage"));
1454 assertThat(lines, matchesFcpMessage(
1456 "Directory=" + ddaFile.getParent(),
1457 "WantReadDirectory=true",
1458 "WantWriteDirectory=false"
1462 private Matcher<List<String>> matchesFailedToReadResponse() {
1463 return matchesFcpMessage(
1465 "Directory=" + ddaFile.getParent(),
1466 "ReadContent=failed-to-read"
1470 private void writeTestDDAComplete(File tempFile) throws IOException {
1471 fcpServer.writeLine(
1473 "Directory=" + tempFile.getParent(),
1474 "ReadDirectoryAllowed=true",
1479 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1480 return matchesFcpMessage(
1482 "Directory=" + tempFile.getParent(),
1483 "ReadContent=test-content"
1487 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1488 fcpServer.writeLine(
1490 "Directory=" + directory,
1491 "ReadFilename=" + tempFile,
1496 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1497 return matchesFcpMessage(
1499 "Directory=" + tempFile.getParent(),
1500 "WantReadDirectory=true",
1501 "WantWriteDirectory=false"
1505 private void sendDdaRequired(String identifier) throws IOException {
1506 fcpServer.writeLine(
1508 "Identifier=" + identifier,
1516 private void replyWithPutSuccessful(String identifier) throws IOException {
1517 fcpServer.writeLine(
1520 "Identifier=" + identifier,
1525 private void replyWithPutFailed(String identifier) throws IOException {
1526 fcpServer.writeLine(
1528 "Identifier=" + identifier,
1533 private Matcher<List<String>> matchesDirectClientPut() {
1535 hasHead("ClientPut"),
1536 hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
1537 hasTail("EndMessage", "Hello")
1541 private File createDdaFile() throws IOException {
1542 File tempFile = File.createTempFile("test-dda-", ".dat");
1543 tempFile.deleteOnExit();
1544 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1549 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1550 throws InterruptedException, ExecutionException, IOException {
1551 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1553 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1554 String identifier = extractIdentifier(lines);
1555 fcpServer.writeLine(
1557 "Identifier=not-the-right-one",
1561 fcpServer.writeLine(
1563 "Identifier=" + identifier,
1567 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1571 public void clientPutAbortsOnProtocolErrorOtherThan25()
1572 throws InterruptedException, ExecutionException, IOException {
1573 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1575 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1576 String identifier = extractIdentifier(lines);
1577 fcpServer.writeLine(
1579 "Identifier=" + identifier,
1583 assertThat(key.get().isPresent(), is(false));
1587 public void clientPutSendsNotificationsForGeneratedKeys()
1588 throws InterruptedException, ExecutionException, IOException {
1589 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1590 Future<Optional<Key>> key = fcpClient.clientPut()
1591 .onKeyGenerated(generatedKeys::add)
1592 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1597 List<String> lines = fcpServer.collectUntil(is("Hello"));
1598 String identifier = extractIdentifier(lines);
1599 fcpServer.writeLine(
1601 "Identifier=" + identifier,
1605 replyWithPutSuccessful(identifier);
1606 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1607 assertThat(generatedKeys, contains("KSK@foo.txt"));
1612 public class ConfigCommand {
1614 public class GetConfig {
1617 public void defaultFcpClientCanGetConfigWithoutDetails()
1618 throws InterruptedException, ExecutionException, IOException {
1619 Future<ConfigData> configData = fcpClient.getConfig().execute();
1620 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1621 replyWithConfigData();
1622 assertThat(configData.get(), notNullValue());
1626 public void defaultFcpClientCanGetConfigWithCurrent()
1627 throws InterruptedException, ExecutionException, IOException {
1628 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1629 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1630 replyWithConfigData("current.foo=bar");
1631 assertThat(configData.get().getCurrent("foo"), is("bar"));
1635 public void defaultFcpClientCanGetConfigWithDefaults()
1636 throws InterruptedException, ExecutionException, IOException {
1637 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1638 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1639 replyWithConfigData("default.foo=bar");
1640 assertThat(configData.get().getDefault("foo"), is("bar"));
1644 public void defaultFcpClientCanGetConfigWithSortOrder()
1645 throws InterruptedException, ExecutionException, IOException {
1646 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1647 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1648 replyWithConfigData("sortOrder.foo=17");
1649 assertThat(configData.get().getSortOrder("foo"), is(17));
1653 public void defaultFcpClientCanGetConfigWithExpertFlag()
1654 throws InterruptedException, ExecutionException, IOException {
1655 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1656 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1657 replyWithConfigData("expertFlag.foo=true");
1658 assertThat(configData.get().getExpertFlag("foo"), is(true));
1662 public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1663 throws InterruptedException, ExecutionException, IOException {
1664 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1665 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1666 replyWithConfigData("forceWriteFlag.foo=true");
1667 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1671 public void defaultFcpClientCanGetConfigWithShortDescription()
1672 throws InterruptedException, ExecutionException, IOException {
1673 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1674 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1675 replyWithConfigData("shortDescription.foo=bar");
1676 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1680 public void defaultFcpClientCanGetConfigWithLongDescription()
1681 throws InterruptedException, ExecutionException, IOException {
1682 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1683 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1684 replyWithConfigData("longDescription.foo=bar");
1685 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1689 public void defaultFcpClientCanGetConfigWithDataTypes()
1690 throws InterruptedException, ExecutionException, IOException {
1691 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1692 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1693 replyWithConfigData("dataType.foo=number");
1694 assertThat(configData.get().getDataType("foo"), is("number"));
1697 private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1698 return matchesFcpMessage(
1700 "Identifier=" + identifier,
1701 additionalParameter + "=true"
1707 public class ModifyConfig {
1710 public void defaultFcpClientCanModifyConfigData()
1711 throws InterruptedException, ExecutionException, IOException {
1712 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1713 connectAndAssert(() -> matchesFcpMessage(
1715 "Identifier=" + identifier,
1718 replyWithConfigData("current.foo.bar=baz");
1719 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1724 private void replyWithConfigData(String... additionalLines) throws IOException {
1725 fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1726 fcpServer.writeLine(additionalLines);
1727 fcpServer.writeLine("EndMessage");
1732 public class NodeInformation {
1735 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1736 Future<NodeData> nodeData = fcpClient.getNode().execute();
1737 connectAndAssert(() -> matchesGetNode(false, false, false));
1738 replyWithNodeData();
1739 assertThat(nodeData.get(), notNullValue());
1740 assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1744 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1745 throws InterruptedException, ExecutionException, IOException {
1746 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1747 connectAndAssert(() -> matchesGetNode(true, false, false));
1748 replyWithNodeData("opennet=true");
1749 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1750 assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1754 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1755 throws InterruptedException, ExecutionException, IOException {
1756 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1757 connectAndAssert(() -> matchesGetNode(false, true, false));
1758 replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1759 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1763 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1764 throws InterruptedException, ExecutionException, IOException {
1765 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1766 connectAndAssert(() -> matchesGetNode(false, false, true));
1767 replyWithNodeData("volatile.freeJavaMemory=205706528");
1768 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1771 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1772 return matchesFcpMessage(
1774 "Identifier=" + identifier,
1775 "GiveOpennetRef=" + withOpennetRef,
1776 "WithPrivate=" + withPrivate,
1777 "WithVolatile=" + withVolatile
1781 private void replyWithNodeData(String... additionalLines) throws IOException {
1782 fcpServer.writeLine(
1784 "Identifier=" + identifier,
1785 "ark.pubURI=SSK@3YEf.../ark",
1788 "version=Fred,0.7,1.0,1466",
1789 "lastGoodVersion=Fred,0.7,1.0,1466"
1791 fcpServer.writeLine(additionalLines);
1792 fcpServer.writeLine("EndMessage");