Follow redirects in ClientGet
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
4 import static org.hamcrest.MatcherAssert.assertThat;
5 import static org.hamcrest.Matchers.allOf;
6 import static org.hamcrest.Matchers.contains;
7 import static org.hamcrest.Matchers.containsInAnyOrder;
8 import static org.hamcrest.Matchers.hasItem;
9 import static org.hamcrest.Matchers.hasSize;
10 import static org.hamcrest.Matchers.is;
11 import static org.hamcrest.Matchers.not;
12 import static org.hamcrest.Matchers.notNullValue;
13 import static org.hamcrest.Matchers.startsWith;
14
15 import java.io.ByteArrayInputStream;
16 import java.io.File;
17 import java.io.IOException;
18 import java.net.URL;
19 import java.nio.charset.StandardCharsets;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
36
37 import net.pterodactylus.fcp.ARK;
38 import net.pterodactylus.fcp.ConfigData;
39 import net.pterodactylus.fcp.DSAGroup;
40 import net.pterodactylus.fcp.FcpKeyPair;
41 import net.pterodactylus.fcp.Key;
42 import net.pterodactylus.fcp.NodeData;
43 import net.pterodactylus.fcp.NodeRef;
44 import net.pterodactylus.fcp.Peer;
45 import net.pterodactylus.fcp.PeerNote;
46 import net.pterodactylus.fcp.PluginInfo;
47 import net.pterodactylus.fcp.Priority;
48 import net.pterodactylus.fcp.RequestProgress;
49 import net.pterodactylus.fcp.fake.FakeTcpServer;
50 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
51
52 import com.google.common.io.ByteStreams;
53 import com.google.common.io.Files;
54 import com.nitorcreations.junit.runners.NestedRunner;
55 import org.hamcrest.Description;
56 import org.hamcrest.Matcher;
57 import org.hamcrest.Matchers;
58 import org.hamcrest.TypeSafeDiagnosingMatcher;
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
63
64 /**
65  * Unit test for {@link DefaultFcpClient}.
66  *
67  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
68  */
69 @RunWith(NestedRunner.class)
70 public class DefaultFcpClientTest {
71
72         private static final String INSERT_URI =
73                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
74         private static final String REQUEST_URI =
75                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
76
77         private int threadCounter = 0;
78         private final ExecutorService threadPool =
79                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
80         private final FakeTcpServer fcpServer;
81         private final DefaultFcpClient fcpClient;
82
83         public DefaultFcpClientTest() throws IOException {
84                 fcpServer = new FakeTcpServer(threadPool);
85                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
86         }
87
88         @After
89         public void tearDown() throws IOException {
90                 fcpServer.close();
91                 threadPool.shutdown();
92         }
93
94         private void connectNode() throws InterruptedException, ExecutionException, IOException {
95                 fcpServer.connect().get();
96                 fcpServer.collectUntil(is("EndMessage"));
97                 fcpServer.writeLine("NodeHello",
98                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
99                         "Revision=build01466",
100                         "Testnet=false",
101                         "Version=Fred,0.7,1.0,1466",
102                         "Build=1466",
103                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
104                         "Node=Fred",
105                         "ExtBuild=29",
106                         "FCPVersion=2.0",
107                         "NodeLanguage=ENGLISH",
108                         "ExtRevision=v29",
109                         "EndMessage"
110                 );
111         }
112
113         private String extractIdentifier(List<String> lines) {
114                 return lines.stream()
115                         .filter(s -> s.startsWith("Identifier="))
116                         .map(s -> s.substring(s.indexOf('=') + 1))
117                         .findFirst()
118                         .orElse("");
119         }
120
121         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
122                 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
123         }
124
125         private Matcher<Iterable<String>> hasHead(String firstElement) {
126                 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
127                         @Override
128                         protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
129                                 if (!iterable.iterator().hasNext()) {
130                                         mismatchDescription.appendText("is empty");
131                                         return false;
132                                 }
133                                 String element = iterable.iterator().next();
134                                 if (!element.equals(firstElement)) {
135                                         mismatchDescription.appendText("starts with ").appendValue(element);
136                                         return false;
137                                 }
138                                 return true;
139                         }
140
141                         @Override
142                         public void describeTo(Description description) {
143                                 description.appendText("starts with ").appendValue(firstElement);
144                         }
145                 };
146         }
147
148         private Matcher<List<String>> matchesFcpMessageWithTerminator(
149                 String name, String terminator, String... requiredLines) {
150                 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
151         }
152
153         private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
154                 return new TypeSafeDiagnosingMatcher<List<String>>() {
155                         @Override
156                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
157                                 if (item.size() < (ignoreStart + ignoreEnd)) {
158                                         mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
159                                         return false;
160                                 }
161                                 for (String line : lines) {
162                                         if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
163                                                 mismatchDescription.appendText("does not contains ").appendValue(line);
164                                                 return false;
165                                         }
166                                 }
167                                 return true;
168                         }
169
170                         @Override
171                         public void describeTo(Description description) {
172                                 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
173                                 description.appendText(", ignoring the first ").appendValue(ignoreStart);
174                                 description.appendText(" and the last ").appendValue(ignoreEnd);
175                         }
176                 };
177         }
178
179         private Matcher<List<String>> hasTail(String... lastElements) {
180                 return new TypeSafeDiagnosingMatcher<List<String>>() {
181                         @Override
182                         protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
183                                 if (list.size() < lastElements.length) {
184                                         mismatchDescription.appendText("is too small");
185                                         return false;
186                                 }
187                                 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
188                                 if (!tail.equals(Arrays.asList(lastElements))) {
189                                         mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
190                                         return false;
191                                 }
192                                 return true;
193                         }
194
195                         @Override
196                         public void describeTo(Description description) {
197                                 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
198                         }
199                 };
200         }
201
202         private List<String> lines;
203         private String identifier;
204
205         private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
206         throws InterruptedException, ExecutionException, IOException {
207                 connectNode();
208                 readMessage(requestMatcher);
209         }
210
211         private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
212                 readMessage("EndMessage", requestMatcher);
213         }
214
215         private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
216                 lines = fcpServer.collectUntil(is(terminator));
217                 identifier = extractIdentifier(lines);
218                 assertThat(lines, requestMatcher.get());
219         }
220
221         private void replyWithProtocolError() throws IOException {
222                 fcpServer.writeLine(
223                         "ProtocolError",
224                         "Identifier=" + identifier,
225                         "EndMessage"
226                 );
227         }
228
229         public class ConnectionsAndKeyPairs {
230
231                 public class Connections {
232
233                         @Test(expected = ExecutionException.class)
234                         public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
235                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
236                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
237                                 fcpServer.writeLine(
238                                         "CloseConnectionDuplicateClientName",
239                                         "EndMessage"
240                                 );
241                                 keyPairFuture.get();
242                         }
243
244                         @Test(expected = ExecutionException.class)
245                         public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
246                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
247                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
248                                 fcpServer.close();
249                                 keyPairFuture.get();
250                         }
251
252                         @Test
253                         public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
254                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
255                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
256                                 replyWithKeyPair();
257                                 keyPair.get();
258                                 keyPair = fcpClient.generateKeypair().execute();
259                                 readMessage(() -> matchesFcpMessage("GenerateSSK"));
260                                 identifier = extractIdentifier(lines);
261                                 replyWithKeyPair();
262                                 keyPair.get();
263                         }
264
265                         @Test
266                         public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
267                         throws InterruptedException, ExecutionException, IOException {
268                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
269                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
270                                 fcpServer.close();
271                                 try {
272                                         keyPair.get();
273                                         Assert.fail();
274                                 } catch (ExecutionException e) {
275                                         /* ignore. */
276                                 }
277                                 keyPair = fcpClient.generateKeypair().execute();
278                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
279                                 replyWithKeyPair();
280                                 keyPair.get();
281                         }
282
283                 }
284
285                 public class GenerateKeyPair {
286
287                         @Test
288                         public void defaultFcpClientCanGenerateKeypair()
289                         throws ExecutionException, InterruptedException, IOException {
290                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
291                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
292                                 replyWithKeyPair();
293                                 FcpKeyPair keyPair = keyPairFuture.get();
294                                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
295                                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
296                         }
297
298                 }
299
300                 private void replyWithKeyPair() throws IOException {
301                         fcpServer.writeLine("SSKKeypair",
302                                 "InsertURI=" + INSERT_URI + "",
303                                 "RequestURI=" + REQUEST_URI + "",
304                                 "Identifier=" + identifier,
305                                 "EndMessage");
306                 }
307
308         }
309
310         public class Peers {
311
312                 public class PeerCommands {
313
314                         public class ListPeer {
315
316                                 @Test
317                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
318                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
319                                         connectAndAssert(() -> matchesListPeer("id1"));
320                                         replyWithPeer("id1");
321                                         assertThat(peer.get().get().getIdentity(), is("id1"));
322                                 }
323
324                                 @Test
325                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
326                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
327                                         connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
328                                         replyWithPeer("id1");
329                                         assertThat(peer.get().get().getIdentity(), is("id1"));
330                                 }
331
332                                 @Test
333                                 public void byName() throws InterruptedException, ExecutionException, IOException {
334                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
335                                         connectAndAssert(() -> matchesListPeer("FriendNode"));
336                                         replyWithPeer("id1");
337                                         assertThat(peer.get().get().getIdentity(), is("id1"));
338                                 }
339
340                                 @Test
341                                 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
342                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
343                                         connectAndAssert(() -> matchesListPeer("id2"));
344                                         replyWithUnknownNodeIdentifier();
345                                         assertThat(peer.get().isPresent(), is(false));
346                                 }
347
348                                 private Matcher<List<String>> matchesListPeer(String nodeId) {
349                                         return matchesFcpMessage(
350                                                 "ListPeer",
351                                                 "Identifier=" + identifier,
352                                                 "NodeIdentifier=" + nodeId
353                                         );
354                                 }
355
356                         }
357
358                         public class ListPeers {
359
360                                 @Test
361                                 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
362                                         Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
363                                         connectAndAssert(() -> matchesListPeers(false, false));
364                                         replyWithPeer("id1");
365                                         replyWithPeer("id2");
366                                         sendEndOfPeerList();
367                                         assertThat(peers.get(), hasSize(2));
368                                         assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
369                                                 containsInAnyOrder("id1", "id2"));
370                                 }
371
372                                 @Test
373                                 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
374                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
375                                         connectAndAssert(() -> matchesListPeers(false, true));
376                                         replyWithPeer("id1", "metadata.foo=bar1");
377                                         replyWithPeer("id2", "metadata.foo=bar2");
378                                         sendEndOfPeerList();
379                                         assertThat(peers.get(), hasSize(2));
380                                         assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
381                                                 containsInAnyOrder("bar1", "bar2"));
382                                 }
383
384                                 @Test
385                                 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
386                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
387                                         connectAndAssert(() -> matchesListPeers(true, false));
388                                         replyWithPeer("id1", "volatile.foo=bar1");
389                                         replyWithPeer("id2", "volatile.foo=bar2");
390                                         sendEndOfPeerList();
391                                         assertThat(peers.get(), hasSize(2));
392                                         assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
393                                                 containsInAnyOrder("bar1", "bar2"));
394                                 }
395
396                                 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
397                                         return matchesFcpMessage(
398                                                 "ListPeers",
399                                                 "WithVolatile=" + withVolatile,
400                                                 "WithMetadata=" + withMetadata
401                                         );
402                                 }
403
404                                 private void sendEndOfPeerList() throws IOException {
405                                         fcpServer.writeLine(
406                                                 "EndListPeers",
407                                                 "Identifier=" + identifier,
408                                                 "EndMessage"
409                                         );
410                                 }
411
412                         }
413
414                         public class AddPeer {
415
416                                 @Test
417                                 public void fromFile() throws InterruptedException, ExecutionException, IOException {
418                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
419                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
420                                         replyWithPeer("id1");
421                                         assertThat(peer.get().get().getIdentity(), is("id1"));
422                                 }
423
424                                 @Test
425                                 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
426                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
427                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
428                                         replyWithPeer("id1");
429                                         assertThat(peer.get().get().getIdentity(), is("id1"));
430                                 }
431
432                                 @Test
433                                 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
434                                         NodeRef nodeRef = createNodeRef();
435                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
436                                         connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
437                                                 "myName=name",
438                                                 "ark.pubURI=public",
439                                                 "ark.number=1",
440                                                 "dsaGroup.g=base",
441                                                 "dsaGroup.p=prime",
442                                                 "dsaGroup.q=subprime",
443                                                 "dsaPubKey.y=dsa-public",
444                                                 "physical.udp=1.2.3.4:5678",
445                                                 "auth.negTypes=3;5",
446                                                 "sig=sig"
447                                         )));
448                                         replyWithPeer("id1");
449                                         assertThat(peer.get().get().getIdentity(), is("id1"));
450                                 }
451
452                                 @Test
453                                 public void protocolErrorEndsCommand() throws InterruptedException, ExecutionException, IOException {
454                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
455                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
456                                         replyWithProtocolError();
457                                         assertThat(peer.get().isPresent(), is(false));
458                                 }
459
460                                 private NodeRef createNodeRef() {
461                                         NodeRef nodeRef = new NodeRef();
462                                         nodeRef.setIdentity("id1");
463                                         nodeRef.setName("name");
464                                         nodeRef.setARK(new ARK("public", "1"));
465                                         nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
466                                         nodeRef.setNegotiationTypes(new int[] { 3, 5 });
467                                         nodeRef.setPhysicalUDP("1.2.3.4:5678");
468                                         nodeRef.setDSAPublicKey("dsa-public");
469                                         nodeRef.setSignature("sig");
470                                         return nodeRef;
471                                 }
472
473                                 private Matcher<List<String>> matchesAddPeer() {
474                                         return matchesFcpMessage(
475                                                 "AddPeer",
476                                                 "Identifier=" + identifier
477                                         );
478                                 }
479
480                         }
481
482                         public class ModifyPeer {
483
484                                 @Test
485                                 public void defaultFcpClientCanEnablePeerByName()
486                                 throws InterruptedException, ExecutionException, IOException {
487                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
488                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
489                                         replyWithPeer("id1");
490                                         assertThat(peer.get().get().getIdentity(), is("id1"));
491                                 }
492
493                                 @Test
494                                 public void defaultFcpClientCanDisablePeerByName()
495                                 throws InterruptedException, ExecutionException, IOException {
496                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
497                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
498                                         replyWithPeer("id1");
499                                         assertThat(peer.get().get().getIdentity(), is("id1"));
500                                 }
501
502                                 @Test
503                                 public void defaultFcpClientCanEnablePeerByIdentity()
504                                 throws InterruptedException, ExecutionException, IOException {
505                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
506                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
507                                         replyWithPeer("id1");
508                                         assertThat(peer.get().get().getIdentity(), is("id1"));
509                                 }
510
511                                 @Test
512                                 public void defaultFcpClientCanEnablePeerByHostAndPort()
513                                 throws InterruptedException, ExecutionException, IOException {
514                                         Future<Optional<Peer>> peer =
515                                                 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
516                                         connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
517                                         replyWithPeer("id1");
518                                         assertThat(peer.get().get().getIdentity(), is("id1"));
519                                 }
520
521                                 @Test
522                                 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
523                                         Future<Optional<Peer>> peer =
524                                                 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
525                                         connectAndAssert(() -> allOf(
526                                                 matchesModifyPeer("id1", "AllowLocalAddresses", true),
527                                                 not(contains(startsWith("IsDisabled=")))
528                                         ));
529                                         replyWithPeer("id1");
530                                         assertThat(peer.get().get().getIdentity(), is("id1"));
531                                 }
532
533                                 @Test
534                                 public void disallowLocalAddressesOfPeer()
535                                 throws InterruptedException, ExecutionException, IOException {
536                                         Future<Optional<Peer>> peer =
537                                                 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
538                                         connectAndAssert(() -> allOf(
539                                                 matchesModifyPeer("id1", "AllowLocalAddresses", false),
540                                                 not(contains(startsWith("IsDisabled=")))
541                                         ));
542                                         replyWithPeer("id1");
543                                         assertThat(peer.get().get().getIdentity(), is("id1"));
544                                 }
545
546                                 @Test
547                                 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
548                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
549                                         connectAndAssert(() -> allOf(
550                                                 matchesModifyPeer("id1", "IsBurstOnly", true),
551                                                 not(contains(startsWith("AllowLocalAddresses="))),
552                                                 not(contains(startsWith("IsDisabled=")))
553                                         ));
554                                         replyWithPeer("id1");
555                                         assertThat(peer.get().get().getIdentity(), is("id1"));
556                                 }
557
558                                 @Test
559                                 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
560                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
561                                         connectAndAssert(() -> allOf(
562                                                 matchesModifyPeer("id1", "IsBurstOnly", false),
563                                                 not(contains(startsWith("AllowLocalAddresses="))),
564                                                 not(contains(startsWith("IsDisabled=")))
565                                         ));
566                                         replyWithPeer("id1");
567                                         assertThat(peer.get().get().getIdentity(), is("id1"));
568                                 }
569
570                                 @Test
571                                 public void defaultFcpClientCanSetListenOnlyForPeer()
572                                 throws InterruptedException, ExecutionException, IOException {
573                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
574                                         connectAndAssert(() -> allOf(
575                                                 matchesModifyPeer("id1", "IsListenOnly", true),
576                                                 not(contains(startsWith("AllowLocalAddresses="))),
577                                                 not(contains(startsWith("IsDisabled="))),
578                                                 not(contains(startsWith("IsBurstOnly=")))
579                                         ));
580                                         replyWithPeer("id1");
581                                         assertThat(peer.get().get().getIdentity(), is("id1"));
582                                 }
583
584                                 @Test
585                                 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
586                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
587                                         connectAndAssert(() -> allOf(
588                                                 matchesModifyPeer("id1", "IsListenOnly", false),
589                                                 not(contains(startsWith("AllowLocalAddresses="))),
590                                                 not(contains(startsWith("IsDisabled="))),
591                                                 not(contains(startsWith("IsBurstOnly=")))
592                                         ));
593                                         replyWithPeer("id1");
594                                         assertThat(peer.get().get().getIdentity(), is("id1"));
595                                 }
596
597                                 @Test
598                                 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
599                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
600                                         connectAndAssert(() -> allOf(
601                                                 matchesModifyPeer("id1", "IgnoreSourcePort", true),
602                                                 not(contains(startsWith("AllowLocalAddresses="))),
603                                                 not(contains(startsWith("IsDisabled="))),
604                                                 not(contains(startsWith("IsBurstOnly="))),
605                                                 not(contains(startsWith("IsListenOnly=")))
606                                         ));
607                                         replyWithPeer("id1");
608                                         assertThat(peer.get().get().getIdentity(), is("id1"));
609                                 }
610
611                                 @Test
612                                 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
613                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
614                                         connectAndAssert(() -> allOf(
615                                                 matchesModifyPeer("id1", "IgnoreSourcePort", false),
616                                                 not(contains(startsWith("AllowLocalAddresses="))),
617                                                 not(contains(startsWith("IsDisabled="))),
618                                                 not(contains(startsWith("IsBurstOnly="))),
619                                                 not(contains(startsWith("IsListenOnly=")))
620                                         ));
621                                         replyWithPeer("id1");
622                                         assertThat(peer.get().get().getIdentity(), is("id1"));
623                                 }
624
625                                 @Test
626                                 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
627                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
628                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
629                                         replyWithUnknownNodeIdentifier();
630                                         assertThat(peer.get().isPresent(), is(false));
631                                 }
632
633                                 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
634                                         return matchesFcpMessage(
635                                                 "ModifyPeer",
636                                                 "Identifier=" + identifier,
637                                                 "NodeIdentifier=" + nodeIdentifier,
638                                                 setting + "=" + value
639                                         );
640                                 }
641
642                         }
643
644                         public class RemovePeer {
645
646                                 @Test
647                                 public void byName() throws InterruptedException, ExecutionException, IOException {
648                                         Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
649                                         connectAndAssert(() -> matchesRemovePeer("Friend1"));
650                                         replyWithPeerRemoved("Friend1");
651                                         assertThat(peer.get(), is(true));
652                                 }
653
654                                 @Test
655                                 public void invalidName() throws InterruptedException, ExecutionException, IOException {
656                                         Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
657                                         connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
658                                         replyWithUnknownNodeIdentifier();
659                                         assertThat(peer.get(), is(false));
660                                 }
661
662                                 @Test
663                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
664                                         Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
665                                         connectAndAssert(() -> matchesRemovePeer("id1"));
666                                         replyWithPeerRemoved("id1");
667                                         assertThat(peer.get(), is(true));
668                                 }
669
670                                 @Test
671                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
672                                         Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
673                                         connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
674                                         replyWithPeerRemoved("Friend1");
675                                         assertThat(peer.get(), is(true));
676                                 }
677
678                                 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
679                                         return matchesFcpMessage(
680                                                 "RemovePeer",
681                                                 "Identifier=" + identifier,
682                                                 "NodeIdentifier=" + nodeIdentifier
683                                         );
684                                 }
685
686                                 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
687                                         fcpServer.writeLine(
688                                                 "PeerRemoved",
689                                                 "Identifier=" + identifier,
690                                                 "NodeIdentifier=" + nodeIdentifier,
691                                                 "EndMessage"
692                                         );
693                                 }
694
695                         }
696
697                         private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
698                                 fcpServer.writeLine(
699                                         "Peer",
700                                         "Identifier=" + identifier,
701                                         "identity=" + peerId,
702                                         "opennet=false",
703                                         "ark.pubURI=SSK@3YEf.../ark",
704                                         "ark.number=78",
705                                         "auth.negTypes=2",
706                                         "version=Fred,0.7,1.0,1466",
707                                         "lastGoodVersion=Fred,0.7,1.0,1466"
708                                 );
709                                 fcpServer.writeLine(additionalLines);
710                                 fcpServer.writeLine("EndMessage");
711                         }
712
713                 }
714
715                 public class PeerNoteCommands {
716
717                         public class ListPeerNotes {
718
719                                 @Test
720                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
721                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
722                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
723                                         replyWithUnknownNodeIdentifier();
724                                         assertThat(peerNote.get().isPresent(), is(false));
725                                 }
726
727                                 @Test
728                                 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
729                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
730                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
731                                         replyWithPeerNote();
732                                         replyWithEndListPeerNotes();
733                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
734                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
735                                 }
736
737                                 @Test
738                                 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
739                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
740                                         connectAndAssert(() -> matchesListPeerNotes("id1"));
741                                         replyWithPeerNote();
742                                         replyWithEndListPeerNotes();
743                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
744                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
745                                 }
746
747                                 @Test
748                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
749                                         Future<Optional<PeerNote>> peerNote =
750                                                 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
751                                         connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
752                                         replyWithPeerNote();
753                                         replyWithEndListPeerNotes();
754                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
755                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
756                                 }
757
758                                 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
759                                         return matchesFcpMessage(
760                                                 "ListPeerNotes",
761                                                 "NodeIdentifier=" + nodeIdentifier
762                                         );
763                                 }
764
765                                 private void replyWithEndListPeerNotes() throws IOException {
766                                         fcpServer.writeLine(
767                                                 "EndListPeerNotes",
768                                                 "Identifier=" + identifier,
769                                                 "EndMessage"
770                                         );
771                                 }
772
773                                 private void replyWithPeerNote() throws IOException {
774                                         fcpServer.writeLine(
775                                                 "PeerNote",
776                                                 "Identifier=" + identifier,
777                                                 "NodeIdentifier=Friend1",
778                                                 "NoteText=RXhhbXBsZSBUZXh0Lg==",
779                                                 "PeerNoteType=1",
780                                                 "EndMessage"
781                                         );
782                                 }
783
784                         }
785
786                         public class ModifyPeerNotes {
787
788                                 @Test
789                                 public void byName() throws InterruptedException, ExecutionException, IOException {
790                                         Future<Boolean> noteUpdated =
791                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
792                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
793                                         replyWithPeerNote();
794                                         assertThat(noteUpdated.get(), is(true));
795                                 }
796
797                                 @Test
798                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
799                                         Future<Boolean> noteUpdated =
800                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
801                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
802                                         replyWithUnknownNodeIdentifier();
803                                         assertThat(noteUpdated.get(), is(false));
804                                 }
805
806                                 @Test
807                                 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
808                                 throws InterruptedException, ExecutionException, IOException {
809                                         Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
810                                         assertThat(noteUpdated.get(), is(false));
811                                 }
812
813                                 @Test
814                                 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
815                                         Future<Boolean> noteUpdated =
816                                                 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
817                                         connectAndAssert(() -> matchesModifyPeerNote("id1"));
818                                         replyWithPeerNote();
819                                         assertThat(noteUpdated.get(), is(true));
820                                 }
821
822                                 @Test
823                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
824                                         Future<Boolean> noteUpdated =
825                                                 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
826                                         connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
827                                         replyWithPeerNote();
828                                         assertThat(noteUpdated.get(), is(true));
829                                 }
830
831                                 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
832                                         return matchesFcpMessage(
833                                                 "ModifyPeerNote",
834                                                 "Identifier=" + identifier,
835                                                 "NodeIdentifier=" + nodeIdentifier,
836                                                 "PeerNoteType=1",
837                                                 "NoteText=Zm9v"
838                                         );
839                                 }
840
841                                 private void replyWithPeerNote() throws IOException {
842                                         fcpServer.writeLine(
843                                                 "PeerNote",
844                                                 "Identifier=" + identifier,
845                                                 "NodeIdentifier=Friend1",
846                                                 "NoteText=Zm9v",
847                                                 "PeerNoteType=1",
848                                                 "EndMessage"
849                                         );
850                                 }
851
852                         }
853
854                 }
855
856                 private void replyWithUnknownNodeIdentifier() throws IOException {
857                         fcpServer.writeLine(
858                                 "UnknownNodeIdentifier",
859                                 "Identifier=" + identifier,
860                                 "NodeIdentifier=id2",
861                                 "EndMessage"
862                         );
863                 }
864
865         }
866
867         public class PluginCommands {
868
869                 private static final String CLASS_NAME = "foo.plugin.Plugin";
870
871                 private void replyWithPluginInfo() throws IOException {
872                         fcpServer.writeLine(
873                                 "PluginInfo",
874                                 "Identifier=" + identifier,
875                                 "PluginName=superPlugin",
876                                 "IsTalkable=true",
877                                 "LongVersion=1.2.3",
878                                 "Version=42",
879                                 "OriginUri=superPlugin",
880                                 "Started=true",
881                                 "EndMessage"
882                         );
883                 }
884
885                 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
886                 throws InterruptedException, ExecutionException {
887                         assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
888                         assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
889                         assertThat(pluginInfo.get().get().isTalkable(), is(true));
890                         assertThat(pluginInfo.get().get().getVersion(), is("42"));
891                         assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
892                         assertThat(pluginInfo.get().get().isStarted(), is(true));
893                 }
894
895                 public class LoadPlugin {
896
897                         public class OfficialPlugins {
898
899                                 @Test
900                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
901                                         Future<Optional<PluginInfo>> pluginInfo =
902                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
903                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
904                                         assertThat(lines, not(contains(startsWith("Store="))));
905                                         replyWithPluginInfo();
906                                         verifyPluginInfo(pluginInfo);
907                                 }
908
909                                 @Test
910                                 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
911                                         Future<Optional<PluginInfo>> pluginInfo =
912                                                 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
913                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
914                                         assertThat(lines, hasItem("Store=true"));
915                                         replyWithPluginInfo();
916                                         verifyPluginInfo(pluginInfo);
917                                 }
918
919                                 @Test
920                                 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
921                                         Future<Optional<PluginInfo>> pluginInfo =
922                                                 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
923                                         connectAndAssert(() -> createMatcherForOfficialSource("https"));
924                                         replyWithPluginInfo();
925                                         verifyPluginInfo(pluginInfo);
926                                 }
927
928                                 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
929                                         return matchesFcpMessage(
930                                                 "LoadPlugin",
931                                                 "Identifier=" + identifier,
932                                                 "PluginURL=superPlugin",
933                                                 "URLType=official",
934                                                 "OfficialSource=" + officialSource
935                                         );
936                                 }
937
938                         }
939
940                         public class FromOtherSources {
941
942                                 private static final String FILE_PATH = "/path/to/plugin.jar";
943                                 private static final String URL = "http://server.com/plugin.jar";
944                                 private static final String KEY = "KSK@plugin.jar";
945
946                                 @Test
947                                 public void fromFile() throws ExecutionException, InterruptedException, IOException {
948                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
949                                         connectAndAssert(() -> createMatcher("file", FILE_PATH));
950                                         replyWithPluginInfo();
951                                         verifyPluginInfo(pluginInfo);
952                                 }
953
954                                 @Test
955                                 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
956                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
957                                         connectAndAssert(() -> createMatcher("url", URL));
958                                         replyWithPluginInfo();
959                                         verifyPluginInfo(pluginInfo);
960                                 }
961
962                                 @Test
963                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
964                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
965                                         connectAndAssert(() -> createMatcher("freenet", KEY));
966                                         replyWithPluginInfo();
967                                         verifyPluginInfo(pluginInfo);
968                                 }
969
970                                 private Matcher<List<String>> createMatcher(String urlType, String url) {
971                                         return matchesFcpMessage(
972                                                 "LoadPlugin",
973                                                 "Identifier=" + identifier,
974                                                 "PluginURL=" + url,
975                                                 "URLType=" + urlType
976                                         );
977                                 }
978
979                         }
980
981                         public class Failed {
982
983                                 @Test
984                                 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
985                                         Future<Optional<PluginInfo>> pluginInfo =
986                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
987                                         connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
988                                         replyWithProtocolError();
989                                         assertThat(pluginInfo.get().isPresent(), is(false));
990                                 }
991
992                         }
993
994                 }
995
996                 public class ReloadPlugin {
997
998                         @Test
999                         public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1000                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1001                                 connectAndAssert(this::matchReloadPluginMessage);
1002                                 replyWithPluginInfo();
1003                                 verifyPluginInfo(pluginInfo);
1004                         }
1005
1006                         @Test
1007                         public void reloadingPluginWithMaxWaitTimeWorks()
1008                         throws InterruptedException, ExecutionException, IOException {
1009                                 Future<Optional<PluginInfo>> pluginInfo =
1010                                         fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1011                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1012                                 replyWithPluginInfo();
1013                                 verifyPluginInfo(pluginInfo);
1014                         }
1015
1016                         @Test
1017                         public void reloadingPluginWithPurgeWorks()
1018                         throws InterruptedException, ExecutionException, IOException {
1019                                 Future<Optional<PluginInfo>> pluginInfo =
1020                                         fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1021                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1022                                 replyWithPluginInfo();
1023                                 verifyPluginInfo(pluginInfo);
1024                         }
1025
1026                         @Test
1027                         public void reloadingPluginWithStoreWorks()
1028                         throws InterruptedException, ExecutionException, IOException {
1029                                 Future<Optional<PluginInfo>> pluginInfo =
1030                                         fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1031                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1032                                 replyWithPluginInfo();
1033                                 verifyPluginInfo(pluginInfo);
1034                         }
1035
1036                         @Test
1037                         public void protocolErrorIsRecognizedAsFailure()
1038                         throws InterruptedException, ExecutionException, IOException {
1039                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1040                                 connectAndAssert(() -> matchReloadPluginMessage());
1041                                 replyWithProtocolError();
1042                                 assertThat(pluginInfo.get().isPresent(), is(false));
1043                         }
1044
1045                         private Matcher<List<String>> matchReloadPluginMessage() {
1046                                 return matchesFcpMessage(
1047                                         "ReloadPlugin",
1048                                         "Identifier=" + identifier,
1049                                         "PluginName=" + CLASS_NAME
1050                                 );
1051                         }
1052
1053                 }
1054
1055                 public class RemovePlugin {
1056
1057                         @Test
1058                         public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1059                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1060                                 connectAndAssert(this::matchPluginRemovedMessage);
1061                                 replyWithPluginRemoved();
1062                                 assertThat(pluginRemoved.get(), is(true));
1063                         }
1064
1065                         @Test
1066                         public void removingPluginWithMaxWaitTimeWorks()
1067                         throws InterruptedException, ExecutionException, IOException {
1068                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1069                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1070                                 replyWithPluginRemoved();
1071                                 assertThat(pluginRemoved.get(), is(true));
1072                         }
1073
1074                         @Test
1075                         public void removingPluginWithPurgeWorks()
1076                         throws InterruptedException, ExecutionException, IOException {
1077                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1078                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1079                                 replyWithPluginRemoved();
1080                                 assertThat(pluginRemoved.get(), is(true));
1081                         }
1082
1083                         private void replyWithPluginRemoved() throws IOException {
1084                                 fcpServer.writeLine(
1085                                         "PluginRemoved",
1086                                         "Identifier=" + identifier,
1087                                         "PluginName=" + CLASS_NAME,
1088                                         "EndMessage"
1089                                 );
1090                         }
1091
1092                         private Matcher<List<String>> matchPluginRemovedMessage() {
1093                                 return matchesFcpMessage(
1094                                         "RemovePlugin",
1095                                         "Identifier=" + identifier,
1096                                         "PluginName=" + CLASS_NAME
1097                                 );
1098                         }
1099
1100                 }
1101
1102                 public class GetPluginInfo {
1103
1104                         @Test
1105                         public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1106                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1107                                 connectAndAssert(this::matchGetPluginInfoMessage);
1108                                 replyWithPluginInfo();
1109                                 verifyPluginInfo(pluginInfo);
1110                         }
1111
1112                         @Test
1113                         public void gettingPluginInfoWithDetailsWorks()
1114                         throws InterruptedException, ExecutionException, IOException {
1115                                 Future<Optional<PluginInfo>> pluginInfo =
1116                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1117                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1118                                 replyWithPluginInfo();
1119                                 verifyPluginInfo(pluginInfo);
1120                         }
1121
1122                         @Test
1123                         public void protocolErrorIsRecognizedAsFailure()
1124                         throws InterruptedException, ExecutionException, IOException {
1125                                 Future<Optional<PluginInfo>> pluginInfo =
1126                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1127                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1128                                 replyWithProtocolError();
1129                                 assertThat(pluginInfo.get(), is(Optional.empty()));
1130                         }
1131
1132                         private Matcher<List<String>> matchGetPluginInfoMessage() {
1133                                 return matchesFcpMessage(
1134                                         "GetPluginInfo",
1135                                         "Identifier=" + identifier,
1136                                         "PluginName=" + CLASS_NAME
1137                                 );
1138                         }
1139
1140                 }
1141
1142         }
1143
1144         public class UskSubscriptionCommands {
1145
1146                 private static final String URI = "USK@some,uri/file.txt";
1147
1148                 @Test
1149                 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1150                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1151                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1152                         replyWithSubscribed();
1153                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1154                         AtomicInteger edition = new AtomicInteger();
1155                         CountDownLatch updated = new CountDownLatch(2);
1156                         uskSubscription.get().get().onUpdate(e -> {
1157                                 edition.set(e);
1158                                 updated.countDown();
1159                         });
1160                         sendUpdateNotification(23);
1161                         sendUpdateNotification(24);
1162                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1163                         assertThat(edition.get(), is(24));
1164                 }
1165
1166                 @Test
1167                 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1168                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1169                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1170                         replyWithSubscribed();
1171                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1172                         AtomicInteger edition = new AtomicInteger();
1173                         CountDownLatch updated = new CountDownLatch(2);
1174                         uskSubscription.get().get().onUpdate(e -> {
1175                                 edition.set(e);
1176                                 updated.countDown();
1177                         });
1178                         uskSubscription.get().get().onUpdate(e -> updated.countDown());
1179                         sendUpdateNotification(23);
1180                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1181                         assertThat(edition.get(), is(23));
1182                 }
1183
1184                 @Test
1185                 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1186                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1187                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1188                         replyWithSubscribed();
1189                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1190                         AtomicBoolean updated = new AtomicBoolean();
1191                         uskSubscription.get().get().onUpdate(e -> updated.set(true));
1192                         uskSubscription.get().get().cancel();
1193                         readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1194                         sendUpdateNotification(23);
1195                         assertThat(updated.get(), is(false));
1196                 }
1197
1198                 private void replyWithSubscribed() throws IOException {
1199                         fcpServer.writeLine(
1200                                 "SubscribedUSK",
1201                                 "Identifier=" + identifier,
1202                                 "URI=" + URI,
1203                                 "DontPoll=false",
1204                                 "EndMessage"
1205                         );
1206                 }
1207
1208                 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1209                         fcpServer.writeLine(
1210                                 "SubscribedUSKUpdate",
1211                                 "Identifier=" + identifier,
1212                                 "URI=" + URI,
1213                                 "Edition=" + edition
1214                         );
1215                         fcpServer.writeLine(additionalLines);
1216                         fcpServer.writeLine("EndMessage");
1217                 }
1218
1219         }
1220
1221         public class ClientGet {
1222
1223                 @Test
1224                 public void works() throws InterruptedException, ExecutionException, IOException {
1225                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1226                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1227                         replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1228                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1229                         Optional<Data> data = dataFuture.get();
1230                         verifyData(data);
1231                 }
1232
1233                 @Test
1234                 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1235                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1236                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1237                         replyWithGetFailed("not-test");
1238                         replyWithGetFailed(identifier);
1239                         Optional<Data> data = dataFuture.get();
1240                         assertThat(data.isPresent(), is(false));
1241                 }
1242
1243                 @Test
1244                 public void getFailedForDifferentIdentifierIsIgnored()
1245                 throws InterruptedException, ExecutionException, IOException {
1246                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1247                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1248                         replyWithGetFailed("not-test");
1249                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1250                         Optional<Data> data = dataFuture.get();
1251                         verifyData(data);
1252                 }
1253
1254                 @Test(expected = ExecutionException.class)
1255                 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1256                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1257                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1258                         fcpServer.close();
1259                         dataFuture.get();
1260                 }
1261
1262                 @Test
1263                 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1264                         fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1265                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1266                 }
1267
1268                 @Test
1269                 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1270                         fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1271                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1272                 }
1273
1274                 @Test
1275                 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1276                 throws InterruptedException, ExecutionException, IOException {
1277                         fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1278                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1279                 }
1280
1281                 @Test
1282                 public void clientGetWithPrioritySettingSendsCorrectCommands()
1283                 throws InterruptedException, ExecutionException, IOException {
1284                         fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1285                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1286                 }
1287
1288                 @Test
1289                 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1290                 throws InterruptedException, ExecutionException, IOException {
1291                         fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1292                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1293                 }
1294
1295                 @Test
1296                 public void clientGetWithGlobalSettingSendsCorrectCommands()
1297                 throws InterruptedException, ExecutionException, IOException {
1298                         fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1299                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1300                 }
1301
1302                 @Test
1303                 public void clientGetFollowsRedirect() throws InterruptedException, ExecutionException, IOException {
1304                     Future<Optional<Data>> data = fcpClient.clientGet().uri("USK@foo/bar").execute();
1305                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=USK@foo/bar"));
1306                         replyWithRedirect("USK@foo/baz");
1307                         readMessage(() -> matchesFcpMessage("ClientGet", "URI=USK@foo/baz"));
1308                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1309                         verifyData(data.get());
1310                 }
1311
1312                 @Test
1313                 public void clientGetNotifiesListenersOnRedirect() throws IOException, ExecutionException, InterruptedException {
1314                         List<String> redirects = new ArrayList<>();
1315                         Future<Optional<Data>> data = fcpClient.clientGet().onRedirect(redirects::add).uri("USK@foo/bar").execute();
1316                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=USK@foo/bar"));
1317                         replyWithRedirect("USK@foo/baz");
1318                         readMessage(() -> matchesFcpMessage("ClientGet", "URI=USK@foo/baz"));
1319                         replyWithRedirect("USK@foo/quux");
1320                         readMessage(() -> matchesFcpMessage("ClientGet", "URI=USK@foo/quux"));
1321                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1322                         verifyData(data.get());
1323                         assertThat(redirects, contains("USK@foo/baz", "USK@foo/quux"));
1324                 }
1325
1326                 private void replyWithGetFailed(String identifier) throws IOException {
1327                         fcpServer.writeLine(
1328                                 "GetFailed",
1329                                 "Identifier=" + identifier,
1330                                 "Code=3",
1331                                 "EndMessage"
1332                         );
1333                 }
1334
1335                 private void replyWithRedirect(String newUri) throws IOException {
1336                         fcpServer.writeLine(
1337                                 "GetFailed",
1338                                 "Identifier=" + identifier,
1339                                 "Code=27",
1340                                 "RedirectURI=" + newUri,
1341                                 "EndMessage"
1342                         );
1343                 }
1344
1345                 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1346                         fcpServer.writeLine(
1347                                 "AllData",
1348                                 "Identifier=" + identifier,
1349                                 "DataLength=" + (text.length() + 1),
1350                                 "StartupTime=1435610539000",
1351                                 "CompletionTime=1435610540000",
1352                                 "Metadata.ContentType=" + contentType,
1353                                 "Data",
1354                                 text
1355                         );
1356                 }
1357
1358                 private void verifyData(Optional<Data> data) throws IOException {
1359                         assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1360                         assertThat(data.get().size(), is(6L));
1361                         assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1362                                 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1363                 }
1364
1365         }
1366
1367         public class PutCommands {
1368
1369                 public class ClientPut {
1370
1371                         @Test
1372                         public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1373                                 fcpClient.clientPut()
1374                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1375                                         .length(6)
1376                                         .uri("KSK@foo.txt")
1377                                         .execute();
1378                                 connectNode();
1379                                 readMessage("Hello", this::matchesDirectClientPut);
1380                         }
1381
1382                         @Test
1383                         public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1384                                 Future<Optional<Key>> key = fcpClient.clientPut()
1385                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1386                                         .length(6)
1387                                         .uri("KSK@foo.txt")
1388                                         .execute();
1389                                 connectNode();
1390                                 readMessage("Hello", this::matchesDirectClientPut);
1391                                 replyWithPutFailed("not-the-right-one");
1392                                 replyWithPutSuccessful(identifier);
1393                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1394                         }
1395
1396                         @Test
1397                         public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1398                                 Future<Optional<Key>> key = fcpClient.clientPut()
1399                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1400                                         .length(6)
1401                                         .uri("KSK@foo.txt")
1402                                         .execute();
1403                                 connectNode();
1404                                 readMessage("Hello", this::matchesDirectClientPut);
1405                                 replyWithPutSuccessful("not-the-right-one");
1406                                 replyWithPutFailed(identifier);
1407                                 assertThat(key.get().isPresent(), is(false));
1408                         }
1409
1410                         @Test
1411                         public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1412                                 fcpClient.clientPut()
1413                                         .named("otherName.txt")
1414                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1415                                         .length(6)
1416                                         .uri("KSK@foo.txt")
1417                                         .execute();
1418                                 connectNode();
1419                                 readMessage("Hello", () -> allOf(
1420                                         hasHead("ClientPut"),
1421                                         hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1422                                                 "URI=KSK@foo.txt"),
1423                                         hasTail("EndMessage", "Hello")
1424                                 ));
1425                         }
1426
1427                         @Test
1428                         public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1429                                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1430                                 connectAndAssert(() ->
1431                                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1432                         }
1433
1434                         @Test
1435                         public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1436                                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1437                                 connectAndAssert(() ->
1438                                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1439                         }
1440
1441                         public class DDA {
1442
1443                                 private final File ddaFile;
1444                                 private final File fileToUpload;
1445
1446                                 public DDA() throws IOException {
1447                                         ddaFile = createDdaFile();
1448                                         fileToUpload = new File(ddaFile.getParent(), "test.dat");
1449                                 }
1450
1451                                 private Matcher<List<String>> matchesFileClientPut(File file) {
1452                                         return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1453                                 }
1454
1455                                 @Test
1456                                 public void completeDda() throws IOException, ExecutionException, InterruptedException {
1457                                         fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1458                                         connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1459                                         sendDdaRequired(identifier);
1460                                         readMessage(() -> matchesTestDDARequest(ddaFile));
1461                                         sendTestDDAReply(ddaFile.getParent(), ddaFile);
1462                                         readMessage(() -> matchesTestDDAResponse(ddaFile));
1463                                         writeTestDDAComplete(ddaFile);
1464                                         readMessage(() -> matchesFileClientPut(fileToUpload));
1465                                 }
1466
1467                                 @Test
1468                                 public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1469                                         fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1470                                         connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1471                                         sendDdaRequired(identifier);
1472                                         readMessage(() -> matchesTestDDARequest(ddaFile));
1473                                         sendTestDDAReply("/some-other-directory", ddaFile);
1474                                         sendTestDDAReply(ddaFile.getParent(), ddaFile);
1475                                         readMessage(() -> matchesTestDDAResponse(ddaFile));
1476                                 }
1477
1478                                 @Test
1479                                 public void sendResponseIfFileUnreadable()
1480                                 throws IOException, ExecutionException, InterruptedException {
1481                                         fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1482                                         connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1483                                         sendDdaRequired(identifier);
1484                                         readMessage(() -> matchesTestDDARequest(ddaFile));
1485                                         sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1486                                         readMessage(this::matchesFailedToReadResponse);
1487                                 }
1488
1489                                 @Test
1490                                 public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1491                                 throws IOException, ExecutionException, InterruptedException {
1492                                         fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1493                                         connectNode();
1494                                         List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1495                                         String identifier = extractIdentifier(lines);
1496                                         fcpServer.writeLine(
1497                                                 "TestDDAComplete",
1498                                                 "Directory=/some-other-directory",
1499                                                 "EndMessage"
1500                                         );
1501                                         sendDdaRequired(identifier);
1502                                         lines = fcpServer.collectUntil(is("EndMessage"));
1503                                         assertThat(lines, matchesFcpMessage(
1504                                                 "TestDDARequest",
1505                                                 "Directory=" + ddaFile.getParent(),
1506                                                 "WantReadDirectory=true",
1507                                                 "WantWriteDirectory=false"
1508                                         ));
1509                                 }
1510
1511                                 private Matcher<List<String>> matchesFailedToReadResponse() {
1512                                         return matchesFcpMessage(
1513                                                 "TestDDAResponse",
1514                                                 "Directory=" + ddaFile.getParent(),
1515                                                 "ReadContent=failed-to-read"
1516                                         );
1517                                 }
1518
1519                                 private void writeTestDDAComplete(File tempFile) throws IOException {
1520                                         fcpServer.writeLine(
1521                                                 "TestDDAComplete",
1522                                                 "Directory=" + tempFile.getParent(),
1523                                                 "ReadDirectoryAllowed=true",
1524                                                 "EndMessage"
1525                                         );
1526                                 }
1527
1528                                 private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1529                                         return matchesFcpMessage(
1530                                                 "TestDDAResponse",
1531                                                 "Directory=" + tempFile.getParent(),
1532                                                 "ReadContent=test-content"
1533                                         );
1534                                 }
1535
1536                                 private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1537                                         fcpServer.writeLine(
1538                                                 "TestDDAReply",
1539                                                 "Directory=" + directory,
1540                                                 "ReadFilename=" + tempFile,
1541                                                 "EndMessage"
1542                                         );
1543                                 }
1544
1545                                 private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1546                                         return matchesFcpMessage(
1547                                                 "TestDDARequest",
1548                                                 "Directory=" + tempFile.getParent(),
1549                                                 "WantReadDirectory=true",
1550                                                 "WantWriteDirectory=false"
1551                                         );
1552                                 }
1553
1554                                 private void sendDdaRequired(String identifier) throws IOException {
1555                                         fcpServer.writeLine(
1556                                                 "ProtocolError",
1557                                                 "Identifier=" + identifier,
1558                                                 "Code=25",
1559                                                 "EndMessage"
1560                                         );
1561                                 }
1562
1563                         }
1564
1565                         private void replyWithPutFailed(String identifier) throws IOException {
1566                                 fcpServer.writeLine(
1567                                         "PutFailed",
1568                                         "Identifier=" + identifier,
1569                                         "EndMessage"
1570                                 );
1571                         }
1572
1573                         private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
1574                                 List<String> lines =
1575                                         new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
1576                                 Arrays.asList(additionalLines).forEach(lines::add);
1577                                 return allOf(
1578                                         hasHead("ClientPut"),
1579                                         hasParameters(1, 2, lines.toArray(new String[lines.size()])),
1580                                         hasTail("EndMessage", "Hello")
1581                                 );
1582                         }
1583
1584                         private File createDdaFile() throws IOException {
1585                                 File tempFile = File.createTempFile("test-dda-", ".dat");
1586                                 tempFile.deleteOnExit();
1587                                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1588                                 return tempFile;
1589                         }
1590
1591                         @Test
1592                         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1593                         throws InterruptedException, ExecutionException, IOException {
1594                                 Future<Optional<Key>> key =
1595                                         fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1596                                 connectNode();
1597                                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1598                                 String identifier = extractIdentifier(lines);
1599                                 fcpServer.writeLine(
1600                                         "ProtocolError",
1601                                         "Identifier=not-the-right-one",
1602                                         "Code=25",
1603                                         "EndMessage"
1604                                 );
1605                                 fcpServer.writeLine(
1606                                         "PutSuccessful",
1607                                         "Identifier=" + identifier,
1608                                         "URI=KSK@foo.txt",
1609                                         "EndMessage"
1610                                 );
1611                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1612                         }
1613
1614                         @Test
1615                         public void clientPutAbortsOnProtocolErrorOtherThan25()
1616                         throws InterruptedException, ExecutionException, IOException {
1617                                 Future<Optional<Key>> key =
1618                                         fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1619                                 connectNode();
1620                                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1621                                 String identifier = extractIdentifier(lines);
1622                                 fcpServer.writeLine(
1623                                         "ProtocolError",
1624                                         "Identifier=" + identifier,
1625                                         "Code=1",
1626                                         "EndMessage"
1627                                 );
1628                                 assertThat(key.get().isPresent(), is(false));
1629                         }
1630
1631                         @Test
1632                         public void clientPutSendsNotificationsForGeneratedKeys()
1633                         throws InterruptedException, ExecutionException, IOException {
1634                                 List<String> generatedKeys = new CopyOnWriteArrayList<>();
1635                                 Future<Optional<Key>> key = fcpClient.clientPut()
1636                                         .onKeyGenerated(generatedKeys::add)
1637                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1638                                         .length(6)
1639                                         .uri("KSK@foo.txt")
1640                                         .execute();
1641                                 connectNode();
1642                                 readMessage("Hello", this::matchesDirectClientPut);
1643                                 replyWithGeneratedUri();
1644                                 replyWithPutSuccessful(identifier);
1645                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1646                                 assertThat(generatedKeys, contains("KSK@foo.txt"));
1647                         }
1648
1649                         @Test
1650                         public void clientPutSendsNotificationOnProgress()
1651                         throws InterruptedException, ExecutionException, IOException {
1652                                 List<RequestProgress> requestProgress = new ArrayList<>();
1653                                 Future<Optional<Key>> key = fcpClient.clientPut()
1654                                         .onProgress(requestProgress::add)
1655                                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
1656                                         .length(6)
1657                                         .uri("KSK@foo.txt")
1658                                         .execute();
1659                                 connectNode();
1660                                 readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
1661                                 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1662                                 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1663                                 replyWithPutSuccessful(identifier);
1664                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1665                                 assertThat(requestProgress, contains(
1666                                         isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1667                                         isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1668                                 ));
1669                         }
1670
1671                 }
1672
1673                 private void replyWithPutSuccessful(String identifier) throws IOException {
1674                         fcpServer.writeLine(
1675                                 "PutSuccessful",
1676                                 "URI=KSK@foo.txt",
1677                                 "Identifier=" + identifier,
1678                                 "EndMessage"
1679                         );
1680                 }
1681
1682                 private void replyWithGeneratedUri() throws IOException {
1683                         fcpServer.writeLine(
1684                                 "URIGenerated",
1685                                 "Identifier=" + identifier,
1686                                 "URI=KSK@foo.txt",
1687                                 "EndMessage"
1688                         );
1689                 }
1690
1691                 private void replyWithSimpleProgress(
1692                         int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
1693                         boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
1694                         fcpServer.writeLine(
1695                                 "SimpleProgress",
1696                                 "Identifier=" + identifier,
1697                                 "Total=" + total,
1698                                 "Required=" + required,
1699                                 "Failed=" + failed,
1700                                 "FatallyFailed=" + fatallyFailed,
1701                                 "Succeeded=" + succeeded,
1702                                 "LastProgress=" + lastProgress,
1703                                 "FinalizedTotal=" + finalizedTotal,
1704                                 "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
1705                                 "EndMessage"
1706                         );
1707                 }
1708
1709                 public class ClientPutDiskDir {
1710
1711                         @Test
1712                         public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1713                                 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1714                                 connectAndAssert(this::matchesClientPutDiskDir);
1715                                 fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
1716                                 assertThat(key.get().get().getKey(), is("CHK@abc"));
1717                         }
1718
1719                         @Test
1720                         public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
1721                                 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
1722                                 connectAndAssert(this::matchesClientPutDiskDir);
1723                                 replyWithProtocolError();
1724                                 assertThat(key.get().isPresent(), is(false));
1725                         }
1726
1727                         @Test
1728                         public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
1729                                 List<RequestProgress> requestProgress = new ArrayList<>();
1730                                 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add)
1731                                         .fromDirectory(new File("")).uri("CHK@").execute();
1732                                 connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1"));
1733                                 replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
1734                                 replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
1735                                 replyWithPutSuccessful(identifier);
1736                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1737                                 assertThat(requestProgress, contains(
1738                                         isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
1739                                         isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
1740                                 ));
1741                         }
1742
1743                         @Test
1744                         public void generatedUriIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
1745                                 List<String> generatedKeys = new ArrayList<>();
1746                                 Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onKeyGenerated(generatedKeys::add)
1747                                         .fromDirectory(new File("")).uri("CHK@").execute();
1748                                 connectAndAssert(this::matchesClientPutDiskDir);
1749                                 replyWithGeneratedUri();
1750                                 replyWithPutSuccessful(identifier);
1751                                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1752                                 assertThat(generatedKeys, contains("KSK@foo.txt"));
1753                         }
1754
1755                         private Matcher<List<String>> matchesClientPutDiskDir(String... additionalLines) {
1756                                 List<String> lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath()));
1757                                 Arrays.asList(additionalLines).forEach(lines::add);
1758                                 return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()]));
1759                         }
1760
1761                 }
1762
1763         }
1764
1765         public class ConfigCommand {
1766
1767                 public class GetConfig {
1768
1769                         @Test
1770                         public void defaultFcpClientCanGetConfigWithoutDetails()
1771                         throws InterruptedException, ExecutionException, IOException {
1772                                 Future<ConfigData> configData = fcpClient.getConfig().execute();
1773                                 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1774                                 replyWithConfigData();
1775                                 assertThat(configData.get(), notNullValue());
1776                         }
1777
1778                         @Test
1779                         public void defaultFcpClientCanGetConfigWithCurrent()
1780                         throws InterruptedException, ExecutionException, IOException {
1781                                 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1782                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1783                                 replyWithConfigData("current.foo=bar");
1784                                 assertThat(configData.get().getCurrent("foo"), is("bar"));
1785                         }
1786
1787                         @Test
1788                         public void defaultFcpClientCanGetConfigWithDefaults()
1789                         throws InterruptedException, ExecutionException, IOException {
1790                                 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1791                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1792                                 replyWithConfigData("default.foo=bar");
1793                                 assertThat(configData.get().getDefault("foo"), is("bar"));
1794                         }
1795
1796                         @Test
1797                         public void defaultFcpClientCanGetConfigWithSortOrder()
1798                         throws InterruptedException, ExecutionException, IOException {
1799                                 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1800                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1801                                 replyWithConfigData("sortOrder.foo=17");
1802                                 assertThat(configData.get().getSortOrder("foo"), is(17));
1803                         }
1804
1805                         @Test
1806                         public void defaultFcpClientCanGetConfigWithExpertFlag()
1807                         throws InterruptedException, ExecutionException, IOException {
1808                                 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1809                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1810                                 replyWithConfigData("expertFlag.foo=true");
1811                                 assertThat(configData.get().getExpertFlag("foo"), is(true));
1812                         }
1813
1814                         @Test
1815                         public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1816                         throws InterruptedException, ExecutionException, IOException {
1817                                 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1818                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1819                                 replyWithConfigData("forceWriteFlag.foo=true");
1820                                 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1821                         }
1822
1823                         @Test
1824                         public void defaultFcpClientCanGetConfigWithShortDescription()
1825                         throws InterruptedException, ExecutionException, IOException {
1826                                 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1827                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1828                                 replyWithConfigData("shortDescription.foo=bar");
1829                                 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1830                         }
1831
1832                         @Test
1833                         public void defaultFcpClientCanGetConfigWithLongDescription()
1834                         throws InterruptedException, ExecutionException, IOException {
1835                                 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1836                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1837                                 replyWithConfigData("longDescription.foo=bar");
1838                                 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1839                         }
1840
1841                         @Test
1842                         public void defaultFcpClientCanGetConfigWithDataTypes()
1843                         throws InterruptedException, ExecutionException, IOException {
1844                                 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1845                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1846                                 replyWithConfigData("dataType.foo=number");
1847                                 assertThat(configData.get().getDataType("foo"), is("number"));
1848                         }
1849
1850                         private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1851                                 return matchesFcpMessage(
1852                                         "GetConfig",
1853                                         "Identifier=" + identifier,
1854                                         additionalParameter + "=true"
1855                                 );
1856                         }
1857
1858                 }
1859
1860                 public class ModifyConfig {
1861
1862                         @Test
1863                         public void defaultFcpClientCanModifyConfigData()
1864                         throws InterruptedException, ExecutionException, IOException {
1865                                 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1866                                 connectAndAssert(() -> matchesFcpMessage(
1867                                         "ModifyConfig",
1868                                         "Identifier=" + identifier,
1869                                         "foo.bar=baz"
1870                                 ));
1871                                 replyWithConfigData("current.foo.bar=baz");
1872                                 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1873                         }
1874
1875                 }
1876
1877                 private void replyWithConfigData(String... additionalLines) throws IOException {
1878                         fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1879                         fcpServer.writeLine(additionalLines);
1880                         fcpServer.writeLine("EndMessage");
1881                 }
1882
1883         }
1884
1885         public class NodeInformation {
1886
1887                 @Test
1888                 public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
1889                         Future<NodeData> nodeData = fcpClient.getNode().execute();
1890                         connectAndAssert(() -> matchesGetNode(false, false, false));
1891                         replyWithNodeData();
1892                         assertThat(nodeData.get(), notNullValue());
1893                         assertThat(nodeData.get().getNodeRef().isOpennet(), is(false));
1894                 }
1895
1896                 @Test
1897                 public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
1898                 throws InterruptedException, ExecutionException, IOException {
1899                         Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
1900                         connectAndAssert(() -> matchesGetNode(true, false, false));
1901                         replyWithNodeData("opennet=true");
1902                         assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
1903                         assertThat(nodeData.get().getNodeRef().isOpennet(), is(true));
1904                 }
1905
1906                 @Test
1907                 public void defaultFcpClientCanGetNodeInformationWithPrivateData()
1908                 throws InterruptedException, ExecutionException, IOException {
1909                         Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
1910                         connectAndAssert(() -> matchesGetNode(false, true, false));
1911                         replyWithNodeData("ark.privURI=SSK@XdHMiRl");
1912                         assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
1913                 }
1914
1915                 @Test
1916                 public void defaultFcpClientCanGetNodeInformationWithVolatileData()
1917                 throws InterruptedException, ExecutionException, IOException {
1918                         Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
1919                         connectAndAssert(() -> matchesGetNode(false, false, true));
1920                         replyWithNodeData("volatile.freeJavaMemory=205706528");
1921                         assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
1922                 }
1923
1924                 private Matcher<List<String>> matchesGetNode(boolean withOpennetRef, boolean withPrivate, boolean withVolatile) {
1925                         return matchesFcpMessage(
1926                                 "GetNode",
1927                                 "Identifier=" + identifier,
1928                                 "GiveOpennetRef=" + withOpennetRef,
1929                                 "WithPrivate=" + withPrivate,
1930                                 "WithVolatile=" + withVolatile
1931                         );
1932                 }
1933
1934                 private void replyWithNodeData(String... additionalLines) throws IOException {
1935                         fcpServer.writeLine(
1936                                 "NodeData",
1937                                 "Identifier=" + identifier,
1938                                 "ark.pubURI=SSK@3YEf.../ark",
1939                                 "ark.number=78",
1940                                 "auth.negTypes=2",
1941                                 "version=Fred,0.7,1.0,1466",
1942                                 "lastGoodVersion=Fred,0.7,1.0,1466"
1943                         );
1944                         fcpServer.writeLine(additionalLines);
1945                         fcpServer.writeLine("EndMessage");
1946                 }
1947
1948         }
1949
1950 }