Use execute() to trigger execution of commands
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.stream.Collectors;
20
21 import net.pterodactylus.fcp.FcpKeyPair;
22 import net.pterodactylus.fcp.Key;
23 import net.pterodactylus.fcp.Peer;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.fake.FakeTcpServer;
26 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
27
28 import com.google.common.io.ByteStreams;
29 import com.google.common.io.Files;
30 import org.hamcrest.Description;
31 import org.hamcrest.Matcher;
32 import org.hamcrest.TypeSafeDiagnosingMatcher;
33 import org.junit.After;
34 import org.junit.Test;
35
36 /**
37  * Unit test for {@link DefaultFcpClient}.
38  *
39  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
40  */
41 public class DefaultFcpClientTest {
42
43         private static final String INSERT_URI =
44                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
45         private static final String REQUEST_URI =
46                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
47
48         private static int threadCounter = 0;
49         private final ExecutorService threadPool =
50                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
51         private final FakeTcpServer fcpServer;
52         private final DefaultFcpClient fcpClient;
53
54         public DefaultFcpClientTest() throws IOException {
55                 fcpServer = new FakeTcpServer(threadPool);
56                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
57         }
58
59         @After
60         public void tearDown() throws IOException {
61                 fcpServer.close();
62         }
63
64         @Test(expected = ExecutionException.class)
65         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
66         throws IOException, ExecutionException, InterruptedException {
67                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
68                 fcpServer.connect().get();
69                 fcpServer.collectUntil(is("EndMessage"));
70                 fcpServer.writeLine(
71                         "CloseConnectionDuplicateClientName",
72                         "EndMessage"
73                 );
74                 keyPairFuture.get();
75         }
76
77         @Test(expected = ExecutionException.class)
78         public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
79         throws IOException, ExecutionException, InterruptedException {
80                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
81                 fcpServer.connect().get();
82                 fcpServer.collectUntil(is("EndMessage"));
83                 fcpServer.close();
84                 keyPairFuture.get();
85         }
86
87         @Test
88         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
89                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
90                 connectNode();
91                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
92                 String identifier = extractIdentifier(lines);
93                 fcpServer.writeLine("SSKKeypair",
94                         "InsertURI=" + INSERT_URI + "",
95                         "RequestURI=" + REQUEST_URI + "",
96                         "Identifier=" + identifier,
97                         "EndMessage");
98                 FcpKeyPair keyPair = keyPairFuture.get();
99                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
100                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
101         }
102
103         private void connectNode() throws InterruptedException, ExecutionException, IOException {
104                 fcpServer.connect().get();
105                 fcpServer.collectUntil(is("EndMessage"));
106                 fcpServer.writeLine("NodeHello",
107                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
108                         "Revision=build01466",
109                         "Testnet=false",
110                         "Version=Fred,0.7,1.0,1466",
111                         "Build=1466",
112                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
113                         "Node=Fred",
114                         "ExtBuild=29",
115                         "FCPVersion=2.0",
116                         "NodeLanguage=ENGLISH",
117                         "ExtRevision=v29",
118                         "EndMessage"
119                 );
120         }
121
122         @Test
123         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
124                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
125                 connectNode();
126                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
127                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
128                 String identifier = extractIdentifier(lines);
129                 fcpServer.writeLine(
130                         "AllData",
131                         "Identifier=" + identifier,
132                         "DataLength=6",
133                         "StartupTime=1435610539000",
134                         "CompletionTime=1435610540000",
135                         "Metadata.ContentType=text/plain;charset=utf-8",
136                         "Data",
137                         "Hello"
138                 );
139                 Optional<Data> data = dataFuture.get();
140                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
141                 assertThat(data.get().size(), is(6L));
142                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
143                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
144         }
145
146         private String extractIdentifier(List<String> lines) {
147                 return lines.stream()
148                         .filter(s -> s.startsWith("Identifier="))
149                         .map(s -> s.substring(s.indexOf('=') + 1))
150                         .findFirst()
151                         .orElse("");
152         }
153
154         @Test
155         public void clientGetDownloadsDataForCorrectIdentifier()
156         throws InterruptedException, ExecutionException, IOException {
157                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
158                 connectNode();
159                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
160                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
161                 String identifier = extractIdentifier(lines);
162                 fcpServer.writeLine(
163                         "AllData",
164                         "Identifier=not-test",
165                         "DataLength=12",
166                         "StartupTime=1435610539000",
167                         "CompletionTime=1435610540000",
168                         "Metadata.ContentType=text/plain;charset=latin-9",
169                         "Data",
170                         "Hello World"
171                 );
172                 fcpServer.writeLine(
173                         "AllData",
174                         "Identifier=" + identifier,
175                         "DataLength=6",
176                         "StartupTime=1435610539000",
177                         "CompletionTime=1435610540000",
178                         "Metadata.ContentType=text/plain;charset=utf-8",
179                         "Data",
180                         "Hello"
181                 );
182                 Optional<Data> data = dataFuture.get();
183                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
184                 assertThat(data.get().size(), is(6L));
185                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
186                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
187         }
188
189         @Test
190         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
191                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
192                 connectNode();
193                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
194                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
195                 String identifier = extractIdentifier(lines);
196                 fcpServer.writeLine(
197                         "GetFailed",
198                         "Identifier=" + identifier,
199                         "Code=3",
200                         "EndMessage"
201                 );
202                 Optional<Data> data = dataFuture.get();
203                 assertThat(data.isPresent(), is(false));
204         }
205
206         @Test
207         public void clientGetRecognizesGetFailedForCorrectIdentifier()
208         throws InterruptedException, ExecutionException, IOException {
209                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
210                 connectNode();
211                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
212                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
213                 String identifier = extractIdentifier(lines);
214                 fcpServer.writeLine(
215                         "GetFailed",
216                         "Identifier=not-test",
217                         "Code=3",
218                         "EndMessage"
219                 );
220                 fcpServer.writeLine(
221                         "GetFailed",
222                         "Identifier=" + identifier,
223                         "Code=3",
224                         "EndMessage"
225                 );
226                 Optional<Data> data = dataFuture.get();
227                 assertThat(data.isPresent(), is(false));
228         }
229
230         @Test(expected = ExecutionException.class)
231         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
232                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
233                 connectNode();
234                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
235                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
236                 fcpServer.close();
237                 dataFuture.get();
238         }
239
240         @Test
241         public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
242                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
243                 connectNode();
244                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
245                 String identifier = extractIdentifier(lines);
246                 fcpServer.writeLine(
247                         "SSKKeypair",
248                         "InsertURI=" + INSERT_URI + "",
249                         "RequestURI=" + REQUEST_URI + "",
250                         "Identifier=" + identifier,
251                         "EndMessage"
252                 );
253                 keyPair.get();
254                 keyPair = fcpClient.generateKeypair().execute();
255                 lines = fcpServer.collectUntil(is("EndMessage"));
256                 identifier = extractIdentifier(lines);
257                 fcpServer.writeLine(
258                         "SSKKeypair",
259                         "InsertURI=" + INSERT_URI + "",
260                         "RequestURI=" + REQUEST_URI + "",
261                         "Identifier=" + identifier,
262                         "EndMessage"
263                 );
264                 keyPair.get();
265         }
266
267         @Test
268         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
269         throws InterruptedException, ExecutionException, IOException {
270                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
271                 connectNode();
272                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
273                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
274         }
275
276         @Test
277         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
278         throws InterruptedException, ExecutionException, IOException {
279                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
280                 connectNode();
281                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
282                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
283         }
284
285         @Test
286         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
287         throws InterruptedException, ExecutionException, IOException {
288                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
289                 connectNode();
290                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
291                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
292         }
293
294         @Test
295         public void clientGetWithPrioritySettingSendsCorrectCommands()
296         throws InterruptedException, ExecutionException, IOException {
297                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
298                 connectNode();
299                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
300                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
301         }
302
303         @Test
304         public void clientGetWithRealTimeSettingSendsCorrectCommands()
305         throws InterruptedException, ExecutionException, IOException {
306                 fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
307                 connectNode();
308                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
309                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
310         }
311
312         @Test
313         public void clientGetWithGlobalSettingSendsCorrectCommands()
314         throws InterruptedException, ExecutionException, IOException {
315                 fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
316                 connectNode();
317                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
318                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
319         }
320
321         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
322                 return new TypeSafeDiagnosingMatcher<List<String>>() {
323                         @Override
324                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
325                                 if (!item.get(0).equals(name)) {
326                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
327                                         return false;
328                                 }
329                                 for (String requiredLine : requiredLines) {
330                                         if (item.indexOf(requiredLine) < 1) {
331                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
332                                                 return false;
333                                         }
334                                 }
335                                 return true;
336                         }
337
338                         @Override
339                         public void describeTo(Description description) {
340                                 description.appendText("FCP message named ").appendValue(name);
341                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
342                         }
343                 };
344         }
345
346         @Test
347         public void clientPutWithDirectDataSendsCorrectCommand()
348         throws IOException, ExecutionException, InterruptedException {
349                 fcpClient.clientPut()
350                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
351                         .length(6)
352                         .uri("KSK@foo.txt")
353                         .execute();
354                 connectNode();
355                 List<String> lines = fcpServer.collectUntil(is("Hello"));
356                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
357         }
358
359         @Test
360         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
361         throws InterruptedException, ExecutionException, IOException {
362                 Future<Optional<Key>> key = fcpClient.clientPut()
363                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
364                         .length(6)
365                         .uri("KSK@foo.txt")
366                         .execute();
367                 connectNode();
368                 List<String> lines = fcpServer.collectUntil(is("Hello"));
369                 String identifier = extractIdentifier(lines);
370                 fcpServer.writeLine(
371                         "PutFailed",
372                         "Identifier=not-the-right-one",
373                         "EndMessage"
374                 );
375                 fcpServer.writeLine(
376                         "PutSuccessful",
377                         "URI=KSK@foo.txt",
378                         "Identifier=" + identifier,
379                         "EndMessage"
380                 );
381                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
382         }
383
384         @Test
385         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
386         throws InterruptedException, ExecutionException, IOException {
387                 Future<Optional<Key>> key = fcpClient.clientPut()
388                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
389                         .length(6)
390                         .uri("KSK@foo.txt")
391                         .execute();
392                 connectNode();
393                 List<String> lines = fcpServer.collectUntil(is("Hello"));
394                 String identifier = extractIdentifier(lines);
395                 fcpServer.writeLine(
396                         "PutSuccessful",
397                         "Identifier=not-the-right-one",
398                         "URI=KSK@foo.txt",
399                         "EndMessage"
400                 );
401                 fcpServer.writeLine(
402                         "PutFailed",
403                         "Identifier=" + identifier,
404                         "EndMessage"
405                 );
406                 assertThat(key.get().isPresent(), is(false));
407         }
408
409         @Test
410         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
411         throws InterruptedException, ExecutionException, IOException {
412                 fcpClient.clientPut()
413                         .named("otherName.txt")
414                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
415                         .length(6)
416                         .uri("KSK@foo.txt")
417                         .execute();
418                 connectNode();
419                 List<String> lines = fcpServer.collectUntil(is("Hello"));
420                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
421                         "DataLength=6", "URI=KSK@foo.txt"));
422         }
423
424         @Test
425         public void clientPutWithRedirectSendsCorrectCommand()
426         throws IOException, ExecutionException, InterruptedException {
427                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
428                 connectNode();
429                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
430                 assertThat(lines,
431                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
432         }
433
434         @Test
435         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
436                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
437                 connectNode();
438                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
439                 assertThat(lines,
440                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
441         }
442
443         @Test
444         public void clientPutWithFileCanCompleteTestDdaSequence()
445         throws IOException, ExecutionException, InterruptedException {
446                 File tempFile = createTempFile();
447                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
448                 connectNode();
449                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
450                 String identifier = extractIdentifier(lines);
451                 fcpServer.writeLine(
452                         "ProtocolError",
453                         "Identifier=" + identifier,
454                         "Code=25",
455                         "EndMessage"
456                 );
457                 lines = fcpServer.collectUntil(is("EndMessage"));
458                 assertThat(lines, matchesFcpMessage(
459                         "TestDDARequest",
460                         "Directory=" + tempFile.getParent(),
461                         "WantReadDirectory=true",
462                         "WantWriteDirectory=false",
463                         "EndMessage"
464                 ));
465                 fcpServer.writeLine(
466                         "TestDDAReply",
467                         "Directory=" + tempFile.getParent(),
468                         "ReadFilename=" + tempFile,
469                         "EndMessage"
470                 );
471                 lines = fcpServer.collectUntil(is("EndMessage"));
472                 assertThat(lines, matchesFcpMessage(
473                         "TestDDAResponse",
474                         "Directory=" + tempFile.getParent(),
475                         "ReadContent=test-content",
476                         "EndMessage"
477                 ));
478                 fcpServer.writeLine(
479                         "TestDDAComplete",
480                         "Directory=" + tempFile.getParent(),
481                         "ReadDirectoryAllowed=true",
482                         "EndMessage"
483                 );
484                 lines = fcpServer.collectUntil(is("EndMessage"));
485                 assertThat(lines,
486                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
487                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
488         }
489
490         private File createTempFile() throws IOException {
491                 File tempFile = File.createTempFile("test-dda-", ".dat");
492                 tempFile.deleteOnExit();
493                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
494                 return tempFile;
495         }
496
497         @Test
498         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
499         throws InterruptedException, ExecutionException, IOException {
500                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
501                 connectNode();
502                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
503                 String identifier = extractIdentifier(lines);
504                 fcpServer.writeLine(
505                         "ProtocolError",
506                         "Identifier=not-the-right-one",
507                         "Code=25",
508                         "EndMessage"
509                 );
510                 fcpServer.writeLine(
511                         "PutSuccessful",
512                         "Identifier=" + identifier,
513                         "URI=KSK@foo.txt",
514                         "EndMessage"
515                 );
516                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
517         }
518
519         @Test
520         public void clientPutAbortsOnProtocolErrorOtherThan25()
521         throws InterruptedException, ExecutionException, IOException {
522                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
523                 connectNode();
524                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
525                 String identifier = extractIdentifier(lines);
526                 fcpServer.writeLine(
527                         "ProtocolError",
528                         "Identifier=" + identifier,
529                         "Code=1",
530                         "EndMessage"
531                 );
532                 assertThat(key.get().isPresent(), is(false));
533         }
534
535         @Test
536         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
537         InterruptedException {
538                 File tempFile = createTempFile();
539                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
540                 connectNode();
541                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
542                 String identifier = extractIdentifier(lines);
543                 fcpServer.writeLine(
544                         "ProtocolError",
545                         "Identifier=" + identifier,
546                         "Code=25",
547                         "EndMessage"
548                 );
549                 lines = fcpServer.collectUntil(is("EndMessage"));
550                 assertThat(lines, matchesFcpMessage(
551                         "TestDDARequest",
552                         "Directory=" + tempFile.getParent(),
553                         "WantReadDirectory=true",
554                         "WantWriteDirectory=false",
555                         "EndMessage"
556                 ));
557                 fcpServer.writeLine(
558                         "TestDDAReply",
559                         "Directory=/some-other-directory",
560                         "ReadFilename=" + tempFile,
561                         "EndMessage"
562                 );
563                 fcpServer.writeLine(
564                         "TestDDAReply",
565                         "Directory=" + tempFile.getParent(),
566                         "ReadFilename=" + tempFile,
567                         "EndMessage"
568                 );
569                 lines = fcpServer.collectUntil(is("EndMessage"));
570                 assertThat(lines, matchesFcpMessage(
571                         "TestDDAResponse",
572                         "Directory=" + tempFile.getParent(),
573                         "ReadContent=test-content",
574                         "EndMessage"
575                 ));
576         }
577
578         @Test
579         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
580         throws IOException, ExecutionException, InterruptedException {
581                 File tempFile = createTempFile();
582                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
583                 connectNode();
584                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
585                 String identifier = extractIdentifier(lines);
586                 fcpServer.writeLine(
587                         "ProtocolError",
588                         "Identifier=" + identifier,
589                         "Code=25",
590                         "EndMessage"
591                 );
592                 lines = fcpServer.collectUntil(is("EndMessage"));
593                 assertThat(lines, matchesFcpMessage(
594                         "TestDDARequest",
595                         "Directory=" + tempFile.getParent(),
596                         "WantReadDirectory=true",
597                         "WantWriteDirectory=false",
598                         "EndMessage"
599                 ));
600                 fcpServer.writeLine(
601                         "TestDDAReply",
602                         "Directory=" + tempFile.getParent(),
603                         "ReadFilename=" + tempFile + ".foo",
604                         "EndMessage"
605                 );
606                 lines = fcpServer.collectUntil(is("EndMessage"));
607                 assertThat(lines, matchesFcpMessage(
608                         "TestDDAResponse",
609                         "Directory=" + tempFile.getParent(),
610                         "ReadContent=failed-to-read",
611                         "EndMessage"
612                 ));
613         }
614
615         @Test
616         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
617         throws IOException, ExecutionException, InterruptedException {
618                 File tempFile = createTempFile();
619                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
620                 connectNode();
621                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
622                 String identifier = extractIdentifier(lines);
623                 fcpServer.writeLine(
624                         "TestDDAComplete",
625                         "Directory=/some-other-directory",
626                         "EndMessage"
627                 );
628                 fcpServer.writeLine(
629                         "ProtocolError",
630                         "Identifier=" + identifier,
631                         "Code=25",
632                         "EndMessage"
633                 );
634                 lines = fcpServer.collectUntil(is("EndMessage"));
635                 assertThat(lines, matchesFcpMessage(
636                         "TestDDARequest",
637                         "Directory=" + tempFile.getParent(),
638                         "WantReadDirectory=true",
639                         "WantWriteDirectory=false",
640                         "EndMessage"
641                 ));
642         }
643
644         @Test
645         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
646                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
647                 connectNode();
648                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
649                 assertThat(lines, matchesFcpMessage(
650                         "ListPeers",
651                         "WithVolatile=false",
652                         "WithMetadata=false",
653                         "EndMessage"
654                 ));
655                 String identifier = extractIdentifier(lines);
656                 fcpServer.writeLine(
657                         "Peer",
658                         "Identifier=" + identifier,
659                         "identity=id1",
660                         "EndMessage"
661                 );
662                 fcpServer.writeLine(
663                         "Peer",
664                         "Identifier=" + identifier,
665                         "identity=id2",
666                         "EndMessage"
667                 );
668                 fcpServer.writeLine(
669                         "EndListPeers",
670                         "Identifier=" + identifier,
671                         "EndMessage"
672                 );
673                 assertThat(peers.get(), hasSize(2));
674                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
675                         containsInAnyOrder("id1", "id2"));
676         }
677
678         @Test
679         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
680                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
681                 connectNode();
682                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
683                 assertThat(lines, matchesFcpMessage(
684                         "ListPeers",
685                         "WithVolatile=false",
686                         "WithMetadata=true",
687                         "EndMessage"
688                 ));
689                 String identifier = extractIdentifier(lines);
690                 fcpServer.writeLine(
691                         "Peer",
692                         "Identifier=" + identifier,
693                         "identity=id1",
694                         "metadata.foo=bar1",
695                         "EndMessage"
696                 );
697                 fcpServer.writeLine(
698                         "Peer",
699                         "Identifier=" + identifier,
700                         "identity=id2",
701                         "metadata.foo=bar2",
702                         "EndMessage"
703                 );
704                 fcpServer.writeLine(
705                         "EndListPeers",
706                         "Identifier=" + identifier,
707                         "EndMessage"
708                 );
709                 assertThat(peers.get(), hasSize(2));
710                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
711                         containsInAnyOrder("bar1", "bar2"));
712         }
713
714         @Test
715         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
716                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
717                 connectNode();
718                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
719                 assertThat(lines, matchesFcpMessage(
720                         "ListPeers",
721                         "WithVolatile=true",
722                         "WithMetadata=false",
723                         "EndMessage"
724                 ));
725                 String identifier = extractIdentifier(lines);
726                 fcpServer.writeLine(
727                         "Peer",
728                         "Identifier=" + identifier,
729                         "identity=id1",
730                         "volatile.foo=bar1",
731                         "EndMessage"
732                 );
733                 fcpServer.writeLine(
734                         "Peer",
735                         "Identifier=" + identifier,
736                         "identity=id2",
737                         "volatile.foo=bar2",
738                         "EndMessage"
739                 );
740                 fcpServer.writeLine(
741                         "EndListPeers",
742                         "Identifier=" + identifier,
743                         "EndMessage"
744                 );
745                 assertThat(peers.get(), hasSize(2));
746                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
747                         containsInAnyOrder("bar1", "bar2"));
748         }
749
750 }