Add progress consumer to ClientPut command
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
4 import static org.hamcrest.MatcherAssert.assertThat;
5 import static org.hamcrest.Matchers.allOf;
6 import static org.hamcrest.Matchers.contains;
7 import static org.hamcrest.Matchers.containsInAnyOrder;
8 import static org.hamcrest.Matchers.hasItem;
9 import static org.hamcrest.Matchers.hasSize;
10 import static org.hamcrest.Matchers.is;
11 import static org.hamcrest.Matchers.not;
12 import static org.hamcrest.Matchers.notNullValue;
13 import static org.hamcrest.Matchers.startsWith;
14
15 import java.io.ByteArrayInputStream;
16 import java.io.File;
17 import java.io.IOException;
18 import java.net.URL;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
36
37 import net.pterodactylus.fcp.ARK;
38 import net.pterodactylus.fcp.ConfigData;
39 import net.pterodactylus.fcp.DSAGroup;
40 import net.pterodactylus.fcp.FcpKeyPair;
41 import net.pterodactylus.fcp.Key;
42 import net.pterodactylus.fcp.NodeData;
43 import net.pterodactylus.fcp.NodeRef;
44 import net.pterodactylus.fcp.Peer;
45 import net.pterodactylus.fcp.PeerNote;
46 import net.pterodactylus.fcp.PluginInfo;
47 import net.pterodactylus.fcp.Priority;
48 import net.pterodactylus.fcp.RequestProgress;
49 import net.pterodactylus.fcp.fake.FakeTcpServer;
50 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
51
52 import com.google.common.io.ByteStreams;
53 import com.google.common.io.Files;
54 import com.nitorcreations.junit.runners.NestedRunner;
55 import org.hamcrest.Description;
56 import org.hamcrest.Matcher;
57 import org.hamcrest.Matchers;
58 import org.hamcrest.TypeSafeDiagnosingMatcher;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Before;
62 import org.junit.Ignore;
63 import org.junit.Test;
64 import org.junit.rules.TemporaryFolder;
65 import org.junit.runner.RunWith;
66
67 /**
68  * Unit test for {@link DefaultFcpClient}.
69  *
70  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
71  */
72 @RunWith(NestedRunner.class)
73 public class DefaultFcpClientTest {
74
75         private static final String INSERT_URI =
76                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
77         private static final String REQUEST_URI =
78                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
79
80         private int threadCounter = 0;
81         private final ExecutorService threadPool =
82                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
83         private final FakeTcpServer fcpServer;
84         private final DefaultFcpClient fcpClient;
85
86         public DefaultFcpClientTest() throws IOException {
87                 fcpServer = new FakeTcpServer(threadPool);
88                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
89         }
90
91         @After
92         public void tearDown() throws IOException {
93                 fcpServer.close();
94                 threadPool.shutdown();
95         }
96
97         private void connectNode() throws InterruptedException, ExecutionException, IOException {
98                 fcpServer.connect().get();
99                 fcpServer.collectUntil(is("EndMessage"));
100                 fcpServer.writeLine("NodeHello",
101                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
102                         "Revision=build01466",
103                         "Testnet=false",
104                         "Version=Fred,0.7,1.0,1466",
105                         "Build=1466",
106                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
107                         "Node=Fred",
108                         "ExtBuild=29",
109                         "FCPVersion=2.0",
110                         "NodeLanguage=ENGLISH",
111                         "ExtRevision=v29",
112                         "EndMessage"
113                 );
114         }
115
116         private String extractIdentifier(List<String> lines) {
117                 return lines.stream()
118                         .filter(s -> s.startsWith("Identifier="))
119                         .map(s -> s.substring(s.indexOf('=') + 1))
120                         .findFirst()
121                         .orElse("");
122         }
123
124         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
125                 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
126         }
127
128         private Matcher<Iterable<String>> hasHead(String firstElement) {
129                 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
130                         @Override
131                         protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
132                                 if (!iterable.iterator().hasNext()) {
133                                         mismatchDescription.appendText("is empty");
134                                         return false;
135                                 }
136                                 String element = iterable.iterator().next();
137                                 if (!element.equals(firstElement)) {
138                                         mismatchDescription.appendText("starts with ").appendValue(element);
139                                         return false;
140                                 }
141                                 return true;
142                         }
143
144                         @Override
145                         public void describeTo(Description description) {
146                                 description.appendText("starts with ").appendValue(firstElement);
147                         }
148                 };
149         }
150
151         private Matcher<List<String>> matchesFcpMessageWithTerminator(
152                 String name, String terminator, String... requiredLines) {
153                 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
154         }
155
156         private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
157                 return new TypeSafeDiagnosingMatcher<List<String>>() {
158                         @Override
159                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
160                                 if (item.size() < (ignoreStart + ignoreEnd)) {
161                                         mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
162                                         return false;
163                                 }
164                                 for (String line : lines) {
165                                         if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
166                                                 mismatchDescription.appendText("does not contains ").appendValue(line);
167                                                 return false;
168                                         }
169                                 }
170                                 return true;
171                         }
172
173                         @Override
174                         public void describeTo(Description description) {
175                                 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
176                                 description.appendText(", ignoring the first ").appendValue(ignoreStart);
177                                 description.appendText(" and the last ").appendValue(ignoreEnd);
178                         }
179                 };
180         }
181
182         private Matcher<List<String>> hasTail(String... lastElements) {
183                 return new TypeSafeDiagnosingMatcher<List<String>>() {
184                         @Override
185                         protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
186                                 if (list.size() < lastElements.length) {
187                                         mismatchDescription.appendText("is too small");
188                                         return false;
189                                 }
190                                 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
191                                 if (!tail.equals(Arrays.asList(lastElements))) {
192                                         mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
193                                         return false;
194                                 }
195                                 return true;
196                         }
197
198                         @Override
199                         public void describeTo(Description description) {
200                                 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
201                         }
202                 };
203         }
204
205         private List<String> lines;
206         private String identifier;
207
208         private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
209         throws InterruptedException, ExecutionException, IOException {
210                 connectNode();
211                 readMessage(requestMatcher);
212         }
213
214         private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
215                 readMessage("EndMessage", requestMatcher);
216         }
217
218         private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
219                 lines = fcpServer.collectUntil(is(terminator));
220                 identifier = extractIdentifier(lines);
221                 assertThat(lines, requestMatcher.get());
222         }
223
224         private void replyWithProtocolError() throws IOException {
225                 fcpServer.writeLine(
226                         "ProtocolError",
227                         "Identifier=" + identifier,
228                         "EndMessage"
229                 );
230         }
231
232         public class ConnectionsAndKeyPairs {
233
234                 public class Connections {
235
236                         @Test(expected = ExecutionException.class)
237                         public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
238                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
239                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
240                                 fcpServer.writeLine(
241                                         "CloseConnectionDuplicateClientName",
242                                         "EndMessage"
243                                 );
244                                 keyPairFuture.get();
245                         }
246
247                         @Test(expected = ExecutionException.class)
248                         public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
249                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
250                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
251                                 fcpServer.close();
252                                 keyPairFuture.get();
253                         }
254
255                         @Test
256                         public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
257                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
258                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
259                                 replyWithKeyPair();
260                                 keyPair.get();
261                                 keyPair = fcpClient.generateKeypair().execute();
262                                 readMessage(() -> matchesFcpMessage("GenerateSSK"));
263                                 identifier = extractIdentifier(lines);
264                                 replyWithKeyPair();
265                                 keyPair.get();
266                         }
267
268                         @Test
269                         public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
270                         throws InterruptedException, ExecutionException, IOException {
271                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
272                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
273                                 fcpServer.close();
274                                 try {
275                                         keyPair.get();
276                                         Assert.fail();
277                                 } catch (ExecutionException e) {
278                                         /* ignore. */
279                                 }
280                                 keyPair = fcpClient.generateKeypair().execute();
281                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
282                                 replyWithKeyPair();
283                                 keyPair.get();
284                         }
285
286                 }
287
288                 public class GenerateKeyPair {
289
290                         @Test
291                         public void defaultFcpClientCanGenerateKeypair()
292                         throws ExecutionException, InterruptedException, IOException {
293                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
294                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
295                                 replyWithKeyPair();
296                                 FcpKeyPair keyPair = keyPairFuture.get();
297                                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
298                                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
299                         }
300
301                 }
302
303                 private void replyWithKeyPair() throws IOException {
304                         fcpServer.writeLine("SSKKeypair",
305                                 "InsertURI=" + INSERT_URI + "",
306                                 "RequestURI=" + REQUEST_URI + "",
307                                 "Identifier=" + identifier,
308                                 "EndMessage");
309                 }
310
311         }
312
313         public class Peers {
314
315                 public class PeerCommands {
316
317                         public class ListPeer {
318
319                                 @Test
320                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
321                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
322                                         connectAndAssert(() -> matchesListPeer("id1"));
323                                         replyWithPeer("id1");
324                                         assertThat(peer.get().get().getIdentity(), is("id1"));
325                                 }
326
327                                 @Test
328                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
329                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
330                                         connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
331                                         replyWithPeer("id1");
332                                         assertThat(peer.get().get().getIdentity(), is("id1"));
333                                 }
334
335                                 @Test
336                                 public void byName() throws InterruptedException, ExecutionException, IOException {
337                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
338                                         connectAndAssert(() -> matchesListPeer("FriendNode"));
339                                         replyWithPeer("id1");
340                                         assertThat(peer.get().get().getIdentity(), is("id1"));
341                                 }
342
343                                 @Test
344                                 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
345                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
346                                         connectAndAssert(() -> matchesListPeer("id2"));
347                                         replyWithUnknownNodeIdentifier();
348                                         assertThat(peer.get().isPresent(), is(false));
349                                 }
350
351                                 private Matcher<List<String>> matchesListPeer(String nodeId) {
352                                         return matchesFcpMessage(
353                                                 "ListPeer",
354                                                 "Identifier=" + identifier,
355                                                 "NodeIdentifier=" + nodeId
356                                         );
357                                 }
358
359                         }
360
361                         public class ListPeers {
362
363                                 @Test
364                                 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
365                                         Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
366                                         connectAndAssert(() -> matchesListPeers(false, false));
367                                         replyWithPeer("id1");
368                                         replyWithPeer("id2");
369                                         sendEndOfPeerList();
370                                         assertThat(peers.get(), hasSize(2));
371                                         assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
372                                                 containsInAnyOrder("id1", "id2"));
373                                 }
374
375                                 @Test
376                                 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
377                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
378                                         connectAndAssert(() -> matchesListPeers(false, true));
379                                         replyWithPeer("id1", "metadata.foo=bar1");
380                                         replyWithPeer("id2", "metadata.foo=bar2");
381                                         sendEndOfPeerList();
382                                         assertThat(peers.get(), hasSize(2));
383                                         assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
384                                                 containsInAnyOrder("bar1", "bar2"));
385                                 }
386
387                                 @Test
388                                 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
389                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
390                                         connectAndAssert(() -> matchesListPeers(true, false));
391                                         replyWithPeer("id1", "volatile.foo=bar1");
392                                         replyWithPeer("id2", "volatile.foo=bar2");
393                                         sendEndOfPeerList();
394                                         assertThat(peers.get(), hasSize(2));
395                                         assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
396                                                 containsInAnyOrder("bar1", "bar2"));
397                                 }
398
399                                 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
400                                         return matchesFcpMessage(
401                                                 "ListPeers",
402                                                 "WithVolatile=" + withVolatile,
403                                                 "WithMetadata=" + withMetadata
404                                         );
405                                 }
406
407                                 private void sendEndOfPeerList() throws IOException {
408                                         fcpServer.writeLine(
409                                                 "EndListPeers",
410                                                 "Identifier=" + identifier,
411                                                 "EndMessage"
412                                         );
413                                 }
414
415                         }
416
417                         public class AddPeer {
418
419                                 @Test
420                                 public void fromFile() throws InterruptedException, ExecutionException, IOException {
421                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
422                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
423                                         replyWithPeer("id1");
424                                         assertThat(peer.get().get().getIdentity(), is("id1"));
425                                 }
426
427                                 @Test
428                                 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
429                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
430                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
431                                         replyWithPeer("id1");
432                                         assertThat(peer.get().get().getIdentity(), is("id1"));
433                                 }
434
435                                 @Test
436                                 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
437                                         NodeRef nodeRef = createNodeRef();
438                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
439                                         connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
440                                                 "myName=name",
441                                                 "ark.pubURI=public",
442                                                 "ark.number=1",
443                                                 "dsaGroup.g=base",
444                                                 "dsaGroup.p=prime",
445                                                 "dsaGroup.q=subprime",
446                                                 "dsaPubKey.y=dsa-public",
447                                                 "physical.udp=1.2.3.4:5678",
448                                                 "auth.negTypes=3;5",
449                                                 "sig=sig"
450                                         )));
451                                         replyWithPeer("id1");
452                                         assertThat(peer.get().get().getIdentity(), is("id1"));
453                                 }
454
455                                 @Test
456                                 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
457                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
458                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
459                                         replyWithProtocolError();
460                                         assertThat(peer.get().isPresent(), is(false));
461                                 }
462
463                                 private NodeRef createNodeRef() {
464                                         NodeRef nodeRef = new NodeRef();
465                                         nodeRef.setIdentity("id1");
466                                         nodeRef.setName("name");
467                                         nodeRef.setARK(new ARK("public", "1"));
468                                         nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
469                                         nodeRef.setNegotiationTypes(new int[] { 3, 5 });
470                                         nodeRef.setPhysicalUDP("1.2.3.4:5678");
471                                         nodeRef.setDSAPublicKey("dsa-public");
472                                         nodeRef.setSignature("sig");
473                                         return nodeRef;
474                                 }
475
476                                 private Matcher<List<String>> matchesAddPeer() {
477                                         return matchesFcpMessage(
478                                                 "AddPeer",
479                                                 "Identifier=" + identifier
480                                         );
481                                 }
482
483                         }
484
485                         public class ModifyPeer {
486
487                                 @Test
488                                 public void defaultFcpClientCanEnablePeerByName()
489                                 throws InterruptedException, ExecutionException, IOException {
490                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
491                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
492                                         replyWithPeer("id1");
493                                         assertThat(peer.get().get().getIdentity(), is("id1"));
494                                 }
495
496                                 @Test
497                                 public void defaultFcpClientCanDisablePeerByName()
498                                 throws InterruptedException, ExecutionException, IOException {
499                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
500                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
501                                         replyWithPeer("id1");
502                                         assertThat(peer.get().get().getIdentity(), is("id1"));
503                                 }
504
505                                 @Test
506                                 public void defaultFcpClientCanEnablePeerByIdentity()
507                                 throws InterruptedException, ExecutionException, IOException {
508                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
509                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
510                                         replyWithPeer("id1");
511                                         assertThat(peer.get().get().getIdentity(), is("id1"));
512                                 }
513
514                                 @Test
515                                 public void defaultFcpClientCanEnablePeerByHostAndPort()
516                                 throws InterruptedException, ExecutionException, IOException {
517                                         Future<Optional<Peer>> peer =
518                                                 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
519                                         connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
520                                         replyWithPeer("id1");
521                                         assertThat(peer.get().get().getIdentity(), is("id1"));
522                                 }
523
524                                 @Test
525                                 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
526                                         Future<Optional<Peer>> peer =
527                                                 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
528                                         connectAndAssert(() -> allOf(
529                                                 matchesModifyPeer("id1", "AllowLocalAddresses", true),
530                                                 not(contains(startsWith("IsDisabled=")))
531                                         ));
532                                         replyWithPeer("id1");
533                                         assertThat(peer.get().get().getIdentity(), is("id1"));
534                                 }
535
536                                 @Test
537                                 public void disallowLocalAddressesOfPeer()
538                                 throws InterruptedException, ExecutionException, IOException {
539                                         Future<Optional<Peer>> peer =
540                                                 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
541                                         connectAndAssert(() -> allOf(
542                                                 matchesModifyPeer("id1", "AllowLocalAddresses", false),
543                                                 not(contains(startsWith("IsDisabled=")))
544                                         ));
545                                         replyWithPeer("id1");
546                                         assertThat(peer.get().get().getIdentity(), is("id1"));
547                                 }
548
549                                 @Test
550                                 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
551                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
552                                         connectAndAssert(() -> allOf(
553                                                 matchesModifyPeer("id1", "IsBurstOnly", true),
554                                                 not(contains(startsWith("AllowLocalAddresses="))),
555                                                 not(contains(startsWith("IsDisabled=")))
556                                         ));
557                                         replyWithPeer("id1");
558                                         assertThat(peer.get().get().getIdentity(), is("id1"));
559                                 }
560
561                                 @Test
562                                 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
563                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
564                                         connectAndAssert(() -> allOf(
565                                                 matchesModifyPeer("id1", "IsBurstOnly", false),
566                                                 not(contains(startsWith("AllowLocalAddresses="))),
567                                                 not(contains(startsWith("IsDisabled=")))
568                                         ));
569                                         replyWithPeer("id1");
570                                         assertThat(peer.get().get().getIdentity(), is("id1"));
571                                 }
572
573                                 @Test
574                                 public void defaultFcpClientCanSetListenOnlyForPeer()
575                                 throws InterruptedException, ExecutionException, IOException {
576                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
577                                         connectAndAssert(() -> allOf(
578                                                 matchesModifyPeer("id1", "IsListenOnly", true),
579                                                 not(contains(startsWith("AllowLocalAddresses="))),
580                                                 not(contains(startsWith("IsDisabled="))),
581                                                 not(contains(startsWith("IsBurstOnly=")))
582                                         ));
583                                         replyWithPeer("id1");
584                                         assertThat(peer.get().get().getIdentity(), is("id1"));
585                                 }
586
587                                 @Test
588                                 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
589                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
590                                         connectAndAssert(() -> allOf(
591                                                 matchesModifyPeer("id1", "IsListenOnly", false),
592                                                 not(contains(startsWith("AllowLocalAddresses="))),
593                                                 not(contains(startsWith("IsDisabled="))),
594                                                 not(contains(startsWith("IsBurstOnly=")))
595                                         ));
596                                         replyWithPeer("id1");
597                                         assertThat(peer.get().get().getIdentity(), is("id1"));
598                                 }
599
600                                 @Test
601                                 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
602                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
603                                         connectAndAssert(() -> allOf(
604                                                 matchesModifyPeer("id1", "IgnoreSourcePort", true),
605                                                 not(contains(startsWith("AllowLocalAddresses="))),
606                                                 not(contains(startsWith("IsDisabled="))),
607                                                 not(contains(startsWith("IsBurstOnly="))),
608                                                 not(contains(startsWith("IsListenOnly=")))
609                                         ));
610                                         replyWithPeer("id1");
611                                         assertThat(peer.get().get().getIdentity(), is("id1"));
612                                 }
613
614                                 @Test
615                                 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
616                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
617                                         connectAndAssert(() -> allOf(
618                                                 matchesModifyPeer("id1", "IgnoreSourcePort", false),
619                                                 not(contains(startsWith("AllowLocalAddresses="))),
620                                                 not(contains(startsWith("IsDisabled="))),
621                                                 not(contains(startsWith("IsBurstOnly="))),
622                                                 not(contains(startsWith("IsListenOnly=")))
623                                         ));
624                                         replyWithPeer("id1");
625                                         assertThat(peer.get().get().getIdentity(), is("id1"));
626                                 }
627
628                                 @Test
629                                 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
630                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
631                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
632                                         replyWithUnknownNodeIdentifier();
633                                         assertThat(peer.get().isPresent(), is(false));
634                                 }
635
636                                 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
637                                         return matchesFcpMessage(
638                                                 "ModifyPeer",
639                                                 "Identifier=" + identifier,
640                                                 "NodeIdentifier=" + nodeIdentifier,
641                                                 setting + "=" + value
642                                         );
643                                 }
644
645                         }
646
647                         public class RemovePeer {
648
649                                 @Test
650                                 public void byName() throws InterruptedException, ExecutionException, IOException {
651                                         Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
652                                         connectAndAssert(() -> matchesRemovePeer("Friend1"));
653                                         replyWithPeerRemoved("Friend1");
654                                         assertThat(peer.get(), is(true));
655                                 }
656
657                                 @Test
658                                 public void invalidName() throws InterruptedException, ExecutionException, IOException {
659                                         Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
660                                         connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
661                                         replyWithUnknownNodeIdentifier();
662                                         assertThat(peer.get(), is(false));
663                                 }
664
665                                 @Test
666                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
667                                         Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
668                                         connectAndAssert(() -> matchesRemovePeer("id1"));
669                                         replyWithPeerRemoved("id1");
670                                         assertThat(peer.get(), is(true));
671                                 }
672
673                                 @Test
674                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
675                                         Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
676                                         connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
677                                         replyWithPeerRemoved("Friend1");
678                                         assertThat(peer.get(), is(true));
679                                 }
680
681                                 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
682                                         return matchesFcpMessage(
683                                                 "RemovePeer",
684                                                 "Identifier=" + identifier,
685                                                 "NodeIdentifier=" + nodeIdentifier
686                                         );
687                                 }
688
689                                 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
690                                         fcpServer.writeLine(
691                                                 "PeerRemoved",
692                                                 "Identifier=" + identifier,
693                                                 "NodeIdentifier=" + nodeIdentifier,
694                                                 "EndMessage"
695                                         );
696                                 }
697
698                         }
699
700                         private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
701                                 fcpServer.writeLine(
702                                         "Peer",
703                                         "Identifier=" + identifier,
704                                         "identity=" + peerId,
705                                         "opennet=false",
706                                         "ark.pubURI=SSK@3YEf.../ark",
707                                         "ark.number=78",
708                                         "auth.negTypes=2",
709                                         "version=Fred,0.7,1.0,1466",
710                                         "lastGoodVersion=Fred,0.7,1.0,1466"
711                                 );
712                                 fcpServer.writeLine(additionalLines);
713                                 fcpServer.writeLine("EndMessage");
714                         }
715
716                 }
717
718                 public class PeerNoteCommands {
719
720                         public class ListPeerNotes {
721
722                                 @Test
723                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
724                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
725                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
726                                         replyWithUnknownNodeIdentifier();
727                                         assertThat(peerNote.get().isPresent(), is(false));
728                                 }
729
730                                 @Test
731                                 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
732                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
733                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
734                                         replyWithPeerNote();
735                                         replyWithEndListPeerNotes();
736                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
737                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
738                                 }
739
740                                 @Test
741                                 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
742                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
743                                         connectAndAssert(() -> matchesListPeerNotes("id1"));
744                                         replyWithPeerNote();
745                                         replyWithEndListPeerNotes();
746                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
747                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
748                                 }
749
750                                 @Test
751                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
752                                         Future<Optional<PeerNote>> peerNote =
753                                                 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
754                                         connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
755                                         replyWithPeerNote();
756                                         replyWithEndListPeerNotes();
757                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
758                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
759                                 }
760
761                                 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
762                                         return matchesFcpMessage(
763                                                 "ListPeerNotes",
764                                                 "NodeIdentifier=" + nodeIdentifier
765                                         );
766                                 }
767
768                                 private void replyWithEndListPeerNotes() throws IOException {
769                                         fcpServer.writeLine(
770                                                 "EndListPeerNotes",
771                                                 "Identifier=" + identifier,
772                                                 "EndMessage"
773                                         );
774                                 }
775
776                                 private void replyWithPeerNote() throws IOException {
777                                         fcpServer.writeLine(
778                                                 "PeerNote",
779                                                 "Identifier=" + identifier,
780                                                 "NodeIdentifier=Friend1",
781                                                 "NoteText=RXhhbXBsZSBUZXh0Lg==",
782                                                 "PeerNoteType=1",
783                                                 "EndMessage"
784                                         );
785                                 }
786
787                         }
788
789                         public class ModifyPeerNotes {
790
791                                 @Test
792                                 public void byName() throws InterruptedException, ExecutionException, IOException {
793                                         Future<Boolean> noteUpdated =
794                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
795                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
796                                         replyWithPeerNote();
797                                         assertThat(noteUpdated.get(), is(true));
798                                 }
799
800                                 @Test
801                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
802                                         Future<Boolean> noteUpdated =
803                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
804                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
805                                         replyWithUnknownNodeIdentifier();
806                                         assertThat(noteUpdated.get(), is(false));
807                                 }
808
809                                 @Test
810                                 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
811                                 throws InterruptedException, ExecutionException, IOException {
812                                         Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
813                                         assertThat(noteUpdated.get(), is(false));
814                                 }
815
816                                 @Test
817                                 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
818                                         Future<Boolean> noteUpdated =
819                                                 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
820                                         connectAndAssert(() -> matchesModifyPeerNote("id1"));
821                                         replyWithPeerNote();
822                                         assertThat(noteUpdated.get(), is(true));
823                                 }
824
825                                 @Test
826                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
827                                         Future<Boolean> noteUpdated =
828                                                 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
829                                         connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
830                                         replyWithPeerNote();
831                                         assertThat(noteUpdated.get(), is(true));
832                                 }
833
834                                 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
835                                         return matchesFcpMessage(
836                                                 "ModifyPeerNote",
837                                                 "Identifier=" + identifier,
838                                                 "NodeIdentifier=" + nodeIdentifier,
839                                                 "PeerNoteType=1",
840                                                 "NoteText=Zm9v"
841                                         );
842                                 }
843
844                                 private void replyWithPeerNote() throws IOException {
845                                         fcpServer.writeLine(
846                                                 "PeerNote",
847                                                 "Identifier=" + identifier,
848                                                 "NodeIdentifier=Friend1",
849                                                 "NoteText=Zm9v",
850                                                 "PeerNoteType=1",
851                                                 "EndMessage"
852                                         );
853                                 }
854
855                         }
856
857                 }
858
859                 private void replyWithUnknownNodeIdentifier() throws IOException {
860                         fcpServer.writeLine(
861                                 "UnknownNodeIdentifier",
862                                 "Identifier=" + identifier,
863                                 "NodeIdentifier=id2",
864                                 "EndMessage"
865                         );
866                 }
867
868         }
869
870         public class PluginCommands {
871
872                 private static final String CLASS_NAME = "foo.plugin.Plugin";
873
874                 private void replyWithPluginInfo() throws IOException {
875                         fcpServer.writeLine(
876                                 "PluginInfo",
877                                 "Identifier=" + identifier,
878                                 "PluginName=superPlugin",
879                                 "IsTalkable=true",
880                                 "LongVersion=1.2.3",
881                                 "Version=42",
882                                 "OriginUri=superPlugin",
883                                 "Started=true",
884                                 "EndMessage"
885                         );
886                 }
887
888                 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
889                 throws InterruptedException, ExecutionException {
890                         assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
891                         assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
892                         assertThat(pluginInfo.get().get().isTalkable(), is(true));
893                         assertThat(pluginInfo.get().get().getVersion(), is("42"));
894                         assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
895                         assertThat(pluginInfo.get().get().isStarted(), is(true));
896                 }
897
898                 public class LoadPlugin {
899
900                         public class OfficialPlugins {
901
902                                 @Test
903                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
904                                         Future<Optional<PluginInfo>> pluginInfo =
905                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
906                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
907                                         assertThat(lines, not(contains(startsWith("Store="))));
908                                         replyWithPluginInfo();
909                                         verifyPluginInfo(pluginInfo);
910                                 }
911
912                                 @Test
913                                 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
914                                         Future<Optional<PluginInfo>> pluginInfo =
915                                                 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
916                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
917                                         assertThat(lines, hasItem("Store=true"));
918                                         replyWithPluginInfo();
919                                         verifyPluginInfo(pluginInfo);
920                                 }
921
922                                 @Test
923                                 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
924                                         Future<Optional<PluginInfo>> pluginInfo =
925                                                 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
926                                         connectAndAssert(() -> createMatcherForOfficialSource("https"));
927                                         replyWithPluginInfo();
928                                         verifyPluginInfo(pluginInfo);
929                                 }
930
931                                 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
932                                         return matchesFcpMessage(
933                                                 "LoadPlugin",
934                                                 "Identifier=" + identifier,
935                                                 "PluginURL=superPlugin",
936                                                 "URLType=official",
937                                                 "OfficialSource=" + officialSource
938                                         );
939                                 }
940
941                         }
942
943                         public class FromOtherSources {
944
945                                 private static final String FILE_PATH = "/path/to/plugin.jar";
946                                 private static final String URL = "http://server.com/plugin.jar";
947                                 private static final String KEY = "KSK@plugin.jar";
948
949                                 @Test
950                                 public void fromFile() throws ExecutionException, InterruptedException, IOException {
951                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
952                                         connectAndAssert(() -> createMatcher("file", FILE_PATH));
953                                         replyWithPluginInfo();
954                                         verifyPluginInfo(pluginInfo);
955                                 }
956
957                                 @Test
958                                 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
959                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
960                                         connectAndAssert(() -> createMatcher("url", URL));
961                                         replyWithPluginInfo();
962                                         verifyPluginInfo(pluginInfo);
963                                 }
964
965                                 @Test
966                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
967                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
968                                         connectAndAssert(() -> createMatcher("freenet", KEY));
969                                         replyWithPluginInfo();
970                                         verifyPluginInfo(pluginInfo);
971                                 }
972
973                                 private Matcher<List<String>> createMatcher(String urlType, String url) {
974                                         return matchesFcpMessage(
975                                                 "LoadPlugin",
976                                                 "Identifier=" + identifier,
977                                                 "PluginURL=" + url,
978                                                 "URLType=" + urlType
979                                         );
980                                 }
981
982                         }
983
984                         public class Failed {
985
986                                 @Test
987                                 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
988                                         Future<Optional<PluginInfo>> pluginInfo =
989                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
990                                         connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
991                                         replyWithProtocolError();
992                                         assertThat(pluginInfo.get().isPresent(), is(false));
993                                 }
994
995                         }
996
997                 }
998
999                 public class ReloadPlugin {
1000
1001                         @Test
1002                         public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1003                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1004                                 connectAndAssert(this::matchReloadPluginMessage);
1005                                 replyWithPluginInfo();
1006                                 verifyPluginInfo(pluginInfo);
1007                         }
1008
1009                         @Test
1010                         public void reloadingPluginWithMaxWaitTimeWorks()
1011                         throws InterruptedException, ExecutionException, IOException {
1012                                 Future<Optional<PluginInfo>> pluginInfo =
1013                                         fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1014                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1015                                 replyWithPluginInfo();
1016                                 verifyPluginInfo(pluginInfo);
1017                         }
1018
1019                         @Test
1020                         public void reloadingPluginWithPurgeWorks()
1021                         throws InterruptedException, ExecutionException, IOException {
1022                                 Future<Optional<PluginInfo>> pluginInfo =
1023                                         fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1024                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1025                                 replyWithPluginInfo();
1026                                 verifyPluginInfo(pluginInfo);
1027                         }
1028
1029                         @Test
1030                         public void reloadingPluginWithStoreWorks()
1031                         throws InterruptedException, ExecutionException, IOException {
1032                                 Future<Optional<PluginInfo>> pluginInfo =
1033                                         fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1034                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1035                                 replyWithPluginInfo();
1036                                 verifyPluginInfo(pluginInfo);
1037                         }
1038
1039                         @Test
1040                         public void protocolErrorIsRecognizedAsFailure()
1041                         throws InterruptedException, ExecutionException, IOException {
1042                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1043                                 connectAndAssert(() -> matchReloadPluginMessage());
1044                                 replyWithProtocolError();
1045                                 assertThat(pluginInfo.get().isPresent(), is(false));
1046                         }
1047
1048                         private Matcher<List<String>> matchReloadPluginMessage() {
1049                                 return matchesFcpMessage(
1050                                         "ReloadPlugin",
1051                                         "Identifier=" + identifier,
1052                                         "PluginName=" + CLASS_NAME
1053                                 );
1054                         }
1055
1056                 }
1057
1058                 public class RemovePlugin {
1059
1060                         @Test
1061                         public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1062                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1063                                 connectAndAssert(this::matchPluginRemovedMessage);
1064                                 replyWithPluginRemoved();
1065                                 assertThat(pluginRemoved.get(), is(true));
1066                         }
1067
1068                         @Test
1069                         public void removingPluginWithMaxWaitTimeWorks()
1070                         throws InterruptedException, ExecutionException, IOException {
1071                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1072                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1073                                 replyWithPluginRemoved();
1074                                 assertThat(pluginRemoved.get(), is(true));
1075                         }
1076
1077                         @Test
1078                         public void removingPluginWithPurgeWorks()
1079                         throws InterruptedException, ExecutionException, IOException {
1080                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1081                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1082                                 replyWithPluginRemoved();
1083                                 assertThat(pluginRemoved.get(), is(true));
1084                         }
1085
1086                         private void replyWithPluginRemoved() throws IOException {
1087                                 fcpServer.writeLine(
1088                                         "PluginRemoved",
1089                                         "Identifier=" + identifier,
1090                                         "PluginName=" + CLASS_NAME,
1091                                         "EndMessage"
1092                                 );
1093                         }
1094
1095                         private Matcher<List<String>> matchPluginRemovedMessage() {
1096                                 return matchesFcpMessage(
1097                                         "RemovePlugin",
1098                                         "Identifier=" + identifier,
1099                                         "PluginName=" + CLASS_NAME
1100                                 );
1101                         }
1102
1103                 }
1104
1105                 public class GetPluginInfo {
1106
1107                         @Test
1108                         public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1109                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1110                                 connectAndAssert(this::matchGetPluginInfoMessage);
1111                                 replyWithPluginInfo();
1112                                 verifyPluginInfo(pluginInfo);
1113                         }
1114
1115                         @Test
1116                         public void gettingPluginInfoWithDetailsWorks()
1117                         throws InterruptedException, ExecutionException, IOException {
1118                                 Future<Optional<PluginInfo>> pluginInfo =
1119                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1120                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1121                                 replyWithPluginInfo();
1122                                 verifyPluginInfo(pluginInfo);
1123                         }
1124
1125                         @Test
1126                         public void protocolErrorIsRecognizedAsFailure()
1127                         throws InterruptedException, ExecutionException, IOException {
1128                                 Future<Optional<PluginInfo>> pluginInfo =
1129                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1130                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1131                                 replyWithProtocolError();
1132                                 assertThat(pluginInfo.get(), is(Optional.empty()));
1133                         }
1134
1135                         private Matcher<List<String>> matchGetPluginInfoMessage() {
1136                                 return matchesFcpMessage(
1137                                         "GetPluginInfo",
1138                                         "Identifier=" + identifier,
1139                                         "PluginName=" + CLASS_NAME
1140                                 );
1141                         }
1142
1143                 }
1144
1145         }
1146
1147         public class UskSubscriptionCommands {
1148
1149                 private static final String URI = "USK@some,uri/file.txt";
1150
1151                 @Test
1152                 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1153                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1154                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1155                         replyWithSubscribed();
1156                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1157                         AtomicInteger edition = new AtomicInteger();
1158                         CountDownLatch updated = new CountDownLatch(2);
1159                         uskSubscription.get().get().onUpdate(e -> {
1160                                 edition.set(e);
1161                                 updated.countDown();
1162                         });
1163                         sendUpdateNotification(23);
1164                         sendUpdateNotification(24);
1165                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1166                         assertThat(edition.get(), is(24));
1167                 }
1168
1169                 @Test
1170                 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1171                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1172                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1173                         replyWithSubscribed();
1174                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1175                         AtomicInteger edition = new AtomicInteger();
1176                         CountDownLatch updated = new CountDownLatch(2);
1177                         uskSubscription.get().get().onUpdate(e -> {
1178                                 edition.set(e);
1179                                 updated.countDown();
1180                         });
1181                         uskSubscription.get().get().onUpdate(e -> updated.countDown());
1182                         sendUpdateNotification(23);
1183                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1184                         assertThat(edition.get(), is(23));
1185                 }
1186
1187                 @Test
1188                 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1189                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1190                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1191                         replyWithSubscribed();
1192                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1193                         AtomicBoolean updated = new AtomicBoolean();
1194                         uskSubscription.get().get().onUpdate(e -> updated.set(true));
1195                         uskSubscription.get().get().cancel();
1196                         readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1197                         sendUpdateNotification(23);
1198                         assertThat(updated.get(), is(false));
1199                 }
1200
1201                 private void replyWithSubscribed() throws IOException {
1202                         fcpServer.writeLine(
1203                                 "SubscribedUSK",
1204                                 "Identifier=" + identifier,
1205                                 "URI=" + URI,
1206                                 "DontPoll=false",
1207                                 "EndMessage"
1208                         );
1209                 }
1210
1211                 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1212                         fcpServer.writeLine(
1213                                 "SubscribedUSKUpdate",
1214                                 "Identifier=" + identifier,
1215                                 "URI=" + URI,
1216                                 "Edition=" + edition
1217                         );
1218                         fcpServer.writeLine(additionalLines);
1219                         fcpServer.writeLine("EndMessage");
1220                 }
1221
1222         }
1223
1224         public class ClientGet {
1225
1226                 @Test
1227                 public void works() throws InterruptedException, ExecutionException, IOException {
1228                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1229                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1230                         replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1231                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1232                         Optional<Data> data = dataFuture.get();
1233                         verifyData(data);
1234                 }
1235
1236                 @Test
1237                 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1238                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1239                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1240                         replyWithGetFailed("not-test");
1241                         replyWithGetFailed(identifier);
1242                         Optional<Data> data = dataFuture.get();
1243                         assertThat(data.isPresent(), is(false));
1244                 }
1245
1246                 @Test
1247                 public void getFailedForDifferentIdentifierIsIgnored()
1248                 throws InterruptedException, ExecutionException, IOException {
1249                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1250                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1251                         replyWithGetFailed("not-test");
1252                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1253                         Optional<Data> data = dataFuture.get();
1254                         verifyData(data);
1255                 }
1256
1257                 @Test(expected = ExecutionException.class)
1258                 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1259                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1260                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1261                         fcpServer.close();
1262                         dataFuture.get();
1263                 }
1264
1265                 @Test
1266                 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1267                         fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1268                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1269                 }
1270
1271                 @Test
1272                 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1273                         fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1274                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1275                 }
1276
1277                 @Test
1278                 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1279                 throws InterruptedException, ExecutionException, IOException {
1280                         fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1281                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1282                 }
1283
1284                 @Test
1285                 public void clientGetWithPrioritySettingSendsCorrectCommands()
1286                 throws InterruptedException, ExecutionException, IOException {
1287                         fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1288                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1289                 }
1290
1291                 @Test
1292                 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1293                 throws InterruptedException, ExecutionException, IOException {
1294                         fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1295                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1296                 }
1297
1298                 @Test
1299                 public void clientGetWithGlobalSettingSendsCorrectCommands()
1300                 throws InterruptedException, ExecutionException, IOException {
1301                         fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1302                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1303                 }
1304
1305                 private void replyWithGetFailed(String identifier) throws IOException {
1306                         fcpServer.writeLine(
1307                                 "GetFailed",
1308                                 "Identifier=" + identifier,
1309                                 "Code=3",
1310                                 "EndMessage"
1311                         );
1312                 }
1313
1314                 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1315                         fcpServer.writeLine(
1316                                 "AllData",
1317                                 "Identifier=" + identifier,
1318                                 "DataLength=" + (text.length() + 1),
1319                                 "StartupTime=1435610539000",
1320                                 "CompletionTime=1435610540000",
1321                                 "Metadata.ContentType=" + contentType,
1322                                 "Data",
1323                                 text
1324                         );
1325                 }
1326
1327                 private void verifyData(Optional<Data> data) throws IOException {
1328                         assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1329                         assertThat(data.get().size(), is(6L));
1330                         assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1331                                 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1332                 }
1333
1334         }
1335
1336         public class ClientPut {
1337
1338                 @Test
1339                 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1340                         fcpClient.clientPut()
1341                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1342                                 .length(6)
1343                                 .uri("KSK@foo.txt")
1344                                 .execute();
1345                         connectNode();
1346                         readMessage("Hello", this::matchesDirectClientPut);
1347                 }
1348
1349                 @Test
1350                 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1351                         Future<Optional<Key>> key = fcpClient.clientPut()
1352                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1353                                 .length(6)
1354                                 .uri("KSK@foo.txt")
1355                                 .execute();
1356                         connectNode();
1357                         readMessage("Hello", this::matchesDirectClientPut);
1358                         replyWithPutFailed("not-the-right-one");
1359                         replyWithPutSuccessful(identifier);
1360                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1361                 }
1362
1363                 @Test
1364                 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1365                         Future<Optional<Key>> key = fcpClient.clientPut()
1366                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1367                                 .length(6)
1368                                 .uri("KSK@foo.txt")
1369                                 .execute();
1370                         connectNode();
1371                         readMessage("Hello", this::matchesDirectClientPut);
1372                         replyWithPutSuccessful("not-the-right-one");
1373                         replyWithPutFailed(identifier);
1374                         assertThat(key.get().isPresent(), is(false));
1375                 }
1376
1377                 @Test
1378                 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1379                         fcpClient.clientPut()
1380                                 .named("otherName.txt")
1381                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1382                                 .length(6)
1383                                 .uri("KSK@foo.txt")
1384                                 .execute();
1385                         connectNode();
1386                         readMessage("Hello", () -> allOf(
1387                                 hasHead("ClientPut"),
1388                                 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1389                                         "URI=KSK@foo.txt"),
1390                                 hasTail("EndMessage", "Hello")
1391                         ));
1392                 }
1393
1394                 @Test
1395                 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1396                         fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1397                         connectAndAssert(() ->
1398                                 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1399                 }
1400
1401                 @Test
1402                 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1403                         fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1404                         connectAndAssert(() ->
1405                                 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1406                 }
1407
1408                 public class DDA {
1409
1410                         private final File ddaFile;
1411                         private final File fileToUpload;
1412
1413                         public DDA() throws IOException {
1414                                 ddaFile = createDdaFile();
1415                                 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1416                         }
1417
1418                         private Matcher<List<String>> matchesFileClientPut(File file) {
1419                                 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1420                         }
1421
1422                         @Test
1423                         public void completeDda() throws IOException, ExecutionException, InterruptedException {
1424                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1425                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1426                                 sendDdaRequired(identifier);
1427                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1428                                 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1429                                 readMessage(() -> matchesTestDDAResponse(ddaFile));
1430                                 writeTestDDAComplete(ddaFile);
1431                                 readMessage(() -> matchesFileClientPut(fileToUpload));
1432                         }
1433
1434                         @Test
1435                         public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1436                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1437                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1438                                 sendDdaRequired(identifier);
1439                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1440                                 sendTestDDAReply("/some-other-directory", ddaFile);
1441                                 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1442                                 readMessage(() -> matchesTestDDAResponse(ddaFile));
1443                         }
1444
1445                         @Test
1446                         public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1447                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1448                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1449                                 sendDdaRequired(identifier);
1450                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1451                                 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1452                                 readMessage(this::matchesFailedToReadResponse);
1453                         }
1454
1455                         @Test
1456                         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1457                         throws IOException, ExecutionException, InterruptedException {
1458                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1459                                 connectNode();
1460                                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1461                                 String identifier = extractIdentifier(lines);
1462                                 fcpServer.writeLine(
1463                                         "TestDDAComplete",
1464                                         "Directory=/some-other-directory",
1465                                         "EndMessage"
1466                                 );
1467                                 sendDdaRequired(identifier);
1468                                 lines = fcpServer.collectUntil(is("EndMessage"));
1469                                 assertThat(lines, matchesFcpMessage(
1470                                         "TestDDARequest",
1471                                         "Directory=" + ddaFile.getParent(),
1472                                         "WantReadDirectory=true",
1473                                         "WantWriteDirectory=false"
1474                                 ));
1475                         }
1476
1477                         private Matcher<List<String>> matchesFailedToReadResponse() {
1478                                 return matchesFcpMessage(
1479                                         "TestDDAResponse",
1480                                         "Directory=" + ddaFile.getParent(),
1481                                         "ReadContent=failed-to-read"
1482                                 );
1483                         }
1484
1485                         private void writeTestDDAComplete(File tempFile) throws IOException {
1486                                 fcpServer.writeLine(
1487                                         "TestDDAComplete",
1488                                         "Directory=" + tempFile.getParent(),
1489                                         "ReadDirectoryAllowed=true",
1490                                         "EndMessage"
1491                                 );
1492                         }
1493
1494                         private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1495                                 return matchesFcpMessage(
1496                                         "TestDDAResponse",
1497                                         "Directory=" + tempFile.getParent(),
1498                                         "ReadContent=test-content"
1499                                 );
1500                         }
1501
1502                         private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1503                                 fcpServer.writeLine(
1504                                         "TestDDAReply",
1505                                         "Directory=" + directory,
1506                                         "ReadFilename=" + tempFile,
1507                                         "EndMessage"
1508                                 );
1509                         }
1510
1511                         private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1512                                 return matchesFcpMessage(
1513                                         "TestDDARequest",
1514                                         "Directory=" + tempFile.getParent(),
1515                                         "WantReadDirectory=true",
1516                                         "WantWriteDirectory=false"
1517                                 );
1518                         }
1519
1520                         private void sendDdaRequired(String identifier) throws IOException {
1521                                 fcpServer.writeLine(
1522                                         "ProtocolError",
1523                                         "Identifier=" + identifier,
1524                                         "Code=25",
1525                                         "EndMessage"
1526                                 );
1527                         }
1528
1529                 }
1530
1531                 private void replyWithPutSuccessful(String identifier) throws IOException {
1532                         fcpServer.writeLine(
1533                                 "PutSuccessful",
1534                                 "URI=KSK@foo.txt",
1535                                 "Identifier=" + identifier,
1536                                 "EndMessage"
1537                         );
1538                 }
1539
1540                 private void replyWithPutFailed(String identifier) throws IOException {
1541                         fcpServer.writeLine(
1542                                 "PutFailed",
1543                                 "Identifier=" + identifier,
1544                                 "EndMessage"
1545                         );
1546                 }
1547
1548                 private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
1549                         List<String> lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
1550                         Arrays.asList(additionalLines).forEach(lines::add);
1551                         return allOf(
1552                                 hasHead("ClientPut"),
1553                                 hasParameters(1, 2, lines.toArray(new String[lines.size()])),
1554                                 hasTail("EndMessage", "Hello")
1555                         );
1556                 }
1557
1558                 private File createDdaFile() throws IOException {
1559                         File tempFile = File.createTempFile("test-dda-", ".dat");
1560                         tempFile.deleteOnExit();
1561                         Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1562                         return tempFile;
1563                 }
1564
1565                 @Test
1566                 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1567                 throws InterruptedException, ExecutionException, IOException {
1568                         Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1569                         connectNode();
1570                         List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1571                         String identifier = extractIdentifier(lines);
1572                         fcpServer.writeLine(
1573                                 "ProtocolError",
1574                                 "Identifier=not-the-right-one",
1575                                 "Code=25",
1576                                 "EndMessage"
1577                         );
1578                         fcpServer.writeLine(
1579                                 "PutSuccessful",
1580                                 "Identifier=" + identifier,
1581                                 "URI=KSK@foo.txt",
1582                                 "EndMessage"
1583                         );
1584                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1585                 }
1586
1587                 @Test
1588                 public void clientPutAbortsOnProtocolErrorOtherThan25()
1589                 throws InterruptedException, ExecutionException, IOException {
1590                         Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1591                         connectNode();
1592                         List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1593                         String identifier = extractIdentifier(lines);
1594                         fcpServer.writeLine(
1595                                 "ProtocolError",
1596                                 "Identifier=" + identifier,
1597                                 "Code=1",
1598                                 "EndMessage"
1599                         );
1600                         assertThat(key.get().isPresent(), is(false));
1601                 }
1602
1603                 @Test
1604                 public void clientPutSendsNotificationsForGeneratedKeys()
1605                 throws InterruptedException, ExecutionException, IOException {
1606                         List<String> generatedKeys = new CopyOnWriteArrayList<>();
1607                         Future<Optional<Key>> key = fcpClient.clientPut()
1608                                 .onKeyGenerated(generatedKeys::add)
1609                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1610                                 .length(6)
1611                                 .uri("KSK@foo.txt")
1612                                 .execute();
1613                         connectNode();
1614                         List<String> lines = fcpServer.collectUntil(is("Hello"));
1615                         String identifier = extractIdentifier(lines);
1616                         fcpServer.writeLine(
1617                                 "URIGenerated",
1618                                 "Identifier=" + identifier,
1619                                 "URI=KSK@foo.txt",
1620                                 "EndMessage"
1621                         );
1622                         replyWithPutSuccessful(identifier);
1623                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1624                         assertThat(generatedKeys, contains("KSK@foo.txt"));
1625                 }
1626
1627                 @Test
1628                 public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException {
1629                         List<RequestProgress> requestProgress = new ArrayList<>();
1630                     Future<Optional<Key>> key = fcpClient.clientPut()
1631                                 .onProgress(requestProgress::add)
1632                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1633                                 .length(6)
1634                                 .uri("KSK@foo.txt")
1635                                 .execute();
1636                         connectNode();
1637                         readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
1638                         replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1639                         replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1640                         replyWithPutSuccessful(identifier);
1641                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1642                         assertThat(requestProgress, contains(
1643                                 isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1644                                 isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1645                         ));
1646                 }
1647
1648                 private void replyWithSimpleProgress(
1649                         int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
1650                         boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
1651                         fcpServer.writeLine(
1652                                 "SimpleProgress",
1653                                 "Identifier=" + identifier,
1654                                 "Total=" + total,
1655                                 "Required=" + required,
1656                                 "Failed=" + failed,
1657                                 "FatallyFailed=" + fatallyFailed,
1658                                 "Succeeded=" + succeeded,
1659                                 "LastProgress=" + lastProgress,
1660                                 "FinalizedTotal=" + finalizedTotal,
1661                                 "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
1662                                 "EndMessage"
1663                         );
1664                 }
1665
1666         }
1667
1668         public class ClientPutDiskDir {
1669
1670                 private final TemporaryFolder folder = new TemporaryFolder();
1671
1672                 @Before
1673                 public void setup() throws IOException {
1674                         folder.create();
1675                         Files.write("file1\n", folder.newFile("file1.txt"), StandardCharsets.UTF_8);
1676                         Files.write("file2\n", folder.newFile("file2.txt"), StandardCharsets.UTF_8);
1677                         File directory = folder.newFolder("dir");
1678                         Files.write("file3\n", new File(directory, "file3.txt"), StandardCharsets.UTF_8);
1679                 }
1680
1681                 @After
1682                 public void removeFolder() {
1683                     folder.delete();
1684                 }
1685
1686                 @Test
1687                 public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1688                         Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(folder.getRoot()).uri("CHK@").execute();
1689                         connectAndAssert(this::matchesClientPutDiskDir);
1690                         fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1691                         assertThat(key.get().get().getKey(), is("CHK@abc"));
1692                 }
1693
1694                 @Test
1695                 public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
1696                         Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(folder.getRoot()).uri("CHK@").execute();
1697                         connectAndAssert(this::matchesClientPutDiskDir);
1698                         replyWithProtocolError();
1699                         assertThat(key.get().isPresent(), is(false));
1700                 }
1701
1702                 private Matcher<List<String>> matchesClientPutDiskDir() {
1703                         return matchesFcpMessage(
1704                                 "ClientPutDiskDir",
1705                                 "Identifier=" + identifier,
1706                                 "URI=CHK@",
1707                                 "Filename=" + folder.getRoot().getPath()
1708                         );
1709                 }
1710
1711         }
1712
1713         public class ConfigCommand {
1714
1715                 public class GetConfig {
1716
1717                         @Test
1718                         public void defaultFcpClientCanGetConfigWithoutDetails()
1719                         throws InterruptedException, ExecutionException, IOException {
1720                                 Future<ConfigData> configData = fcpClient.getConfig().execute();
1721                                 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1722                                 replyWithConfigData();
1723                                 assertThat(configData.get(), notNullValue());
1724                         }
1725
1726                         @Test
1727                         public void defaultFcpClientCanGetConfigWithCurrent()
1728                         throws InterruptedException, ExecutionException, IOException {
1729                                 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1730                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1731                                 replyWithConfigData("current.foo=bar");
1732                                 assertThat(configData.get().getCurrent("foo"), is("bar"));
1733                         }
1734
1735                         @Test
1736                         public void defaultFcpClientCanGetConfigWithDefaults()
1737                         throws InterruptedException, ExecutionException, IOException {
1738                                 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1739                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1740                                 replyWithConfigData("default.foo=bar");
1741                                 assertThat(configData.get().getDefault("foo"), is("bar"));
1742                         }
1743
1744                         @Test
1745                         public void defaultFcpClientCanGetConfigWithSortOrder()
1746                         throws InterruptedException, ExecutionException, IOException {
1747                                 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1748                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1749                                 replyWithConfigData("sortOrder.foo=17");
1750                                 assertThat(configData.get().getSortOrder("foo"), is(17));
1751                         }
1752
1753                         @Test
1754                         public void defaultFcpClientCanGetConfigWithExpertFlag()
1755                         throws InterruptedException, ExecutionException, IOException {
1756                                 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1757                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1758                                 replyWithConfigData("expertFlag.foo=true");
1759                                 assertThat(configData.get().getExpertFlag("foo"), is(true));
1760                         }
1761
1762                         @Test
1763                         public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1764                         throws InterruptedException, ExecutionException, IOException {
1765                                 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1766                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1767                                 replyWithConfigData("forceWriteFlag.foo=true");
1768                                 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1769                         }
1770
1771                         @Test
1772                         public void defaultFcpClientCanGetConfigWithShortDescription()
1773                         throws InterruptedException, ExecutionException, IOException {
1774                                 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1775                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1776                                 replyWithConfigData("shortDescription.foo=bar");
1777                                 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1778                         }
1779
1780                         @Test
1781                         public void defaultFcpClientCanGetConfigWithLongDescription()
1782                         throws InterruptedException, ExecutionException, IOException {
1783                                 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1784                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1785                                 replyWithConfigData("longDescription.foo=bar");
1786                                 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1787                         }
1788
1789                         @Test
1790                         public void defaultFcpClientCanGetConfigWithDataTypes()
1791                         throws InterruptedException, ExecutionException, IOException {
1792                                 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1793                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1794                                 replyWithConfigData("dataType.foo=number");
1795                                 assertThat(configData.get().getDataType("foo"), is("number"));
1796                         }
1797
1798                         private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1799                                 return matchesFcpMessage(
1800                                         "GetConfig",
1801                                         "Identifier=" + identifier,
1802                                         additionalParameter + "=true"
1803                                 );
1804                         }
1805
1806                 }
1807
1808                 public class ModifyConfig {
1809
1810                         @Test
1811                         public void defaultFcpClientCanModifyConfigData()
1812                         throws InterruptedException, ExecutionException, IOException {
1813                                 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1814                                 connectAndAssert(() -> matchesFcpMessage(
1815                                         "ModifyConfig",
1816                                         "Identifier=" + identifier,
1817                                         "foo.bar=baz"
1818                                 ));
1819                                 replyWithConfigData("current.foo.bar=baz");
1820                                 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1821                         }
1822
1823                 }
1824
1825                 private void replyWithConfigData(String... additionalLines) throws IOException {
1826                         fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1827                         fcpServer.writeLine(additionalLines);
1828                         fcpServer.writeLine("EndMessage");
1829                 }
1830
1831         }
1832
1833         public class NodeInformation {
1834
1835                 @Test
1836                 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1837                         Future<NodeData> nodeData = fcpClient.getNode().execute();
1838                         connectAndAssert(() -> matchesGetNode(false, false, false));
1839                         replyWithNodeData();
1840                         assertThat(nodeData.get(), notNullValue());
1841                         assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1842                 }
1843
1844                 @Test
1845                 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1846                 throws InterruptedException, ExecutionException, IOException {
1847                         Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1848                         connectAndAssert(() -> matchesGetNode(true, false, false));
1849                         replyWithNodeData("opennet=true");
1850                         assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1851                         assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1852                 }
1853
1854                 @Test
1855                 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1856                 throws InterruptedException, ExecutionException, IOException {
1857                         Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1858                         connectAndAssert(() -> matchesGetNode(false, true, false));
1859                         replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1860                         assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1861                 }
1862
1863                 @Test
1864                 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1865                 throws InterruptedException, ExecutionException, IOException {
1866                         Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1867                         connectAndAssert(() -> matchesGetNode(false, false, true));
1868                         replyWithNodeData("volatile.freeJavaMemory=205706528");
1869                         assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1870                 }
1871
1872                 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1873                         return matchesFcpMessage(
1874                                 "GetNode",
1875                                 "Identifier=" + identifier,
1876                                 "GiveOpennetRef=" + withOpennetRef,
1877                                 "WithPrivate=" + withPrivate,
1878                                 "WithVolatile=" + withVolatile
1879                         );
1880                 }
1881
1882                 private void replyWithNodeData(String... additionalLines) throws IOException {
1883                         fcpServer.writeLine(
1884                                 "NodeData",
1885                                 "Identifier=" + identifier,
1886                                 "ark.pubURI=SSK@3YEf.../ark",
1887                                 "ark.number=78",
1888                                 "auth.negTypes=2",
1889                                 "version=Fred,0.7,1.0,1466",
1890                                 "lastGoodVersion=Fred,0.7,1.0,1466"
1891                         );
1892                         fcpServer.writeLine(additionalLines);
1893                         fcpServer.writeLine("EndMessage");
1894                 }
1895
1896         }
1897
1898 }