Remove debug logging
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.stream.Collectors;
20
21 import net.pterodactylus.fcp.FcpKeyPair;
22 import net.pterodactylus.fcp.Key;
23 import net.pterodactylus.fcp.Peer;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.fake.FakeTcpServer;
26 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
27
28 import com.google.common.io.ByteStreams;
29 import com.google.common.io.Files;
30 import org.hamcrest.Description;
31 import org.hamcrest.Matcher;
32 import org.hamcrest.TypeSafeDiagnosingMatcher;
33 import org.junit.After;
34 import org.junit.Test;
35
36 /**
37  * Unit test for {@link DefaultFcpClient}.
38  *
39  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
40  */
41 public class DefaultFcpClientTest {
42
43         private static final String INSERT_URI =
44                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
45         private static final String REQUEST_URI =
46                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
47
48         private static int threadCounter = 0;
49         private final ExecutorService threadPool =
50                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
51         private final FakeTcpServer fcpServer;
52         private final DefaultFcpClient fcpClient;
53
54         public DefaultFcpClientTest() throws IOException {
55                 fcpServer = new FakeTcpServer(threadPool);
56                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
57         }
58
59         @After
60         public void tearDown() throws IOException {
61                 fcpServer.close();
62         }
63
64         @Test(expected = ExecutionException.class)
65         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
66         throws IOException, ExecutionException, InterruptedException {
67                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
68                 fcpServer.connect().get();
69                 fcpServer.collectUntil(is("EndMessage"));
70                 fcpServer.writeLine(
71                         "CloseConnectionDuplicateClientName",
72                         "EndMessage"
73                 );
74                 keyPairFuture.get();
75         }
76
77         @Test(expected = ExecutionException.class)
78         public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
79         throws IOException, ExecutionException, InterruptedException {
80                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
81                 fcpServer.connect().get();
82                 fcpServer.collectUntil(is("EndMessage"));
83                 fcpServer.close();
84                 keyPairFuture.get();
85         }
86
87         @Test
88         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
89                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
90                 connectNode();
91                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
92                 String identifier = extractIdentifier(lines);
93                 fcpServer.writeLine("SSKKeypair",
94                         "InsertURI=" + INSERT_URI + "",
95                         "RequestURI=" + REQUEST_URI + "",
96                         "Identifier=" + identifier,
97                         "EndMessage");
98                 FcpKeyPair keyPair = keyPairFuture.get();
99                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
100                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
101         }
102
103         private void connectNode() throws InterruptedException, ExecutionException, IOException {
104                 fcpServer.connect().get();
105                 fcpServer.collectUntil(is("EndMessage"));
106                 fcpServer.writeLine("NodeHello",
107                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
108                         "Revision=build01466",
109                         "Testnet=false",
110                         "Version=Fred,0.7,1.0,1466",
111                         "Build=1466",
112                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
113                         "Node=Fred",
114                         "ExtBuild=29",
115                         "FCPVersion=2.0",
116                         "NodeLanguage=ENGLISH",
117                         "ExtRevision=v29",
118                         "EndMessage"
119                 );
120         }
121
122         @Test
123         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
124                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
125                 connectNode();
126                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
127                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
128                 String identifier = extractIdentifier(lines);
129                 fcpServer.writeLine(
130                         "AllData",
131                         "Identifier=" + identifier,
132                         "DataLength=6",
133                         "StartupTime=1435610539000",
134                         "CompletionTime=1435610540000",
135                         "Metadata.ContentType=text/plain;charset=utf-8",
136                         "Data",
137                         "Hello"
138                 );
139                 Optional<Data> data = dataFuture.get();
140                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
141                 assertThat(data.get().size(), is(6L));
142                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
143                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
144         }
145
146         private String extractIdentifier(List<String> lines) {
147                 return lines.stream()
148                         .filter(s -> s.startsWith("Identifier="))
149                         .map(s -> s.substring(s.indexOf('=') + 1))
150                         .findFirst()
151                         .orElse("");
152         }
153
154         @Test
155         public void clientGetDownloadsDataForCorrectIdentifier()
156         throws InterruptedException, ExecutionException, IOException {
157                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
158                 connectNode();
159                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
160                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
161                 String identifier = extractIdentifier(lines);
162                 fcpServer.writeLine(
163                         "AllData",
164                         "Identifier=not-test",
165                         "DataLength=12",
166                         "StartupTime=1435610539000",
167                         "CompletionTime=1435610540000",
168                         "Metadata.ContentType=text/plain;charset=latin-9",
169                         "Data",
170                         "Hello World"
171                 );
172                 fcpServer.writeLine(
173                         "AllData",
174                         "Identifier=" + identifier,
175                         "DataLength=6",
176                         "StartupTime=1435610539000",
177                         "CompletionTime=1435610540000",
178                         "Metadata.ContentType=text/plain;charset=utf-8",
179                         "Data",
180                         "Hello"
181                 );
182                 Optional<Data> data = dataFuture.get();
183                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
184                 assertThat(data.get().size(), is(6L));
185                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
186                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
187         }
188
189         @Test
190         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
191                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
192                 connectNode();
193                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
194                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
195                 String identifier = extractIdentifier(lines);
196                 fcpServer.writeLine(
197                         "GetFailed",
198                         "Identifier=" + identifier,
199                         "Code=3",
200                         "EndMessage"
201                 );
202                 Optional<Data> data = dataFuture.get();
203                 assertThat(data.isPresent(), is(false));
204         }
205
206         @Test
207         public void clientGetRecognizesGetFailedForCorrectIdentifier()
208         throws InterruptedException, ExecutionException, IOException {
209                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
210                 connectNode();
211                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
212                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
213                 String identifier = extractIdentifier(lines);
214                 fcpServer.writeLine(
215                         "GetFailed",
216                         "Identifier=not-test",
217                         "Code=3",
218                         "EndMessage"
219                 );
220                 fcpServer.writeLine(
221                         "GetFailed",
222                         "Identifier=" + identifier,
223                         "Code=3",
224                         "EndMessage"
225                 );
226                 Optional<Data> data = dataFuture.get();
227                 assertThat(data.isPresent(), is(false));
228         }
229
230         @Test(expected = ExecutionException.class)
231         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
232                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
233                 connectNode();
234                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
235                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
236                 fcpServer.close();
237                 dataFuture.get();
238         }
239
240         @Test
241         public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
242                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
243                 connectNode();
244                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
245                 String identifier = extractIdentifier(lines);
246                 fcpServer.writeLine(
247                         "SSKKeypair",
248                         "InsertURI=" + INSERT_URI + "",
249                         "RequestURI=" + REQUEST_URI + "",
250                         "Identifier=" + identifier,
251                         "EndMessage"
252                 );
253                 keyPair.get();
254                 keyPair = fcpClient.generateKeypair().execute();
255                 lines = fcpServer.collectUntil(is("EndMessage"));
256                 identifier = extractIdentifier(lines);
257                 fcpServer.writeLine(
258                         "SSKKeypair",
259                         "InsertURI=" + INSERT_URI + "",
260                         "RequestURI=" + REQUEST_URI + "",
261                         "Identifier=" + identifier,
262                         "EndMessage"
263                 );
264                 keyPair.get();
265         }
266
267         @Test
268         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
269         throws InterruptedException, ExecutionException, IOException {
270                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
271                 connectNode();
272                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
273                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
274         }
275
276         @Test
277         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
278         throws InterruptedException, ExecutionException, IOException {
279                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
280                 connectNode();
281                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
282                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
283         }
284
285         @Test
286         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
287         throws InterruptedException, ExecutionException, IOException {
288                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
289                 connectNode();
290                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
291                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
292         }
293
294         @Test
295         public void clientGetWithPrioritySettingSendsCorrectCommands()
296         throws InterruptedException, ExecutionException, IOException {
297                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
298                 connectNode();
299                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
300                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
301         }
302
303         @Test
304         public void clientGetWithRealTimeSettingSendsCorrectCommands()
305         throws InterruptedException, ExecutionException, IOException {
306                 fcpClient.clientGet().realTime().uri("KSK@foo.txt");
307                 connectNode();
308                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
309                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
310         }
311
312         @Test
313         public void clientGetWithGlobalSettingSendsCorrectCommands()
314         throws InterruptedException, ExecutionException, IOException {
315                 fcpClient.clientGet().global().uri("KSK@foo.txt");
316                 connectNode();
317                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
318                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
319         }
320
321         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
322                 return new TypeSafeDiagnosingMatcher<List<String>>() {
323                         @Override
324                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
325                                 if (!item.get(0).equals(name)) {
326                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
327                                         return false;
328                                 }
329                                 for (String requiredLine : requiredLines) {
330                                         if (item.indexOf(requiredLine) < 1) {
331                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
332                                                 return false;
333                                         }
334                                 }
335                                 return true;
336                         }
337
338                         @Override
339                         public void describeTo(Description description) {
340                                 description.appendText("FCP message named ").appendValue(name);
341                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
342                         }
343                 };
344         }
345
346         @Test
347         public void clientPutWithDirectDataSendsCorrectCommand()
348         throws IOException, ExecutionException, InterruptedException {
349                 fcpClient.clientPut()
350                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
351                         .length(6)
352                         .uri("KSK@foo.txt");
353                 connectNode();
354                 List<String> lines = fcpServer.collectUntil(is("Hello"));
355                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
356         }
357
358         @Test
359         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
360         throws InterruptedException, ExecutionException, IOException {
361                 Future<Optional<Key>> key = fcpClient.clientPut()
362                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
363                         .length(6)
364                         .uri("KSK@foo.txt");
365                 connectNode();
366                 List<String> lines = fcpServer.collectUntil(is("Hello"));
367                 String identifier = extractIdentifier(lines);
368                 fcpServer.writeLine(
369                         "PutFailed",
370                         "Identifier=not-the-right-one",
371                         "EndMessage"
372                 );
373                 fcpServer.writeLine(
374                         "PutSuccessful",
375                         "URI=KSK@foo.txt",
376                         "Identifier=" + identifier,
377                         "EndMessage"
378                 );
379                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
380         }
381
382         @Test
383         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
384         throws InterruptedException, ExecutionException, IOException {
385                 Future<Optional<Key>> key = fcpClient.clientPut()
386                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
387                         .length(6)
388                         .uri("KSK@foo.txt");
389                 connectNode();
390                 List<String> lines = fcpServer.collectUntil(is("Hello"));
391                 String identifier = extractIdentifier(lines);
392                 fcpServer.writeLine(
393                         "PutSuccessful",
394                         "Identifier=not-the-right-one",
395                         "URI=KSK@foo.txt",
396                         "EndMessage"
397                 );
398                 fcpServer.writeLine(
399                         "PutFailed",
400                         "Identifier=" + identifier,
401                         "EndMessage"
402                 );
403                 assertThat(key.get().isPresent(), is(false));
404         }
405
406         @Test
407         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
408         throws InterruptedException, ExecutionException, IOException {
409                 fcpClient.clientPut()
410                         .named("otherName.txt")
411                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
412                         .length(6)
413                         .uri("KSK@foo.txt");
414                 connectNode();
415                 List<String> lines = fcpServer.collectUntil(is("Hello"));
416                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
417                         "DataLength=6", "URI=KSK@foo.txt"));
418         }
419
420         @Test
421         public void clientPutWithRedirectSendsCorrectCommand()
422         throws IOException, ExecutionException, InterruptedException {
423                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt");
424                 connectNode();
425                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
426                 assertThat(lines,
427                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
428         }
429
430         @Test
431         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
432                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
433                 connectNode();
434                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
435                 assertThat(lines,
436                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
437         }
438
439         @Test
440         public void clientPutWithFileCanCompleteTestDdaSequence()
441         throws IOException, ExecutionException, InterruptedException {
442                 File tempFile = createTempFile();
443                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
444                 connectNode();
445                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
446                 String identifier = extractIdentifier(lines);
447                 fcpServer.writeLine(
448                         "ProtocolError",
449                         "Identifier=" + identifier,
450                         "Code=25",
451                         "EndMessage"
452                 );
453                 lines = fcpServer.collectUntil(is("EndMessage"));
454                 assertThat(lines, matchesFcpMessage(
455                         "TestDDARequest",
456                         "Directory=" + tempFile.getParent(),
457                         "WantReadDirectory=true",
458                         "WantWriteDirectory=false",
459                         "EndMessage"
460                 ));
461                 fcpServer.writeLine(
462                         "TestDDAReply",
463                         "Directory=" + tempFile.getParent(),
464                         "ReadFilename=" + tempFile,
465                         "EndMessage"
466                 );
467                 lines = fcpServer.collectUntil(is("EndMessage"));
468                 assertThat(lines, matchesFcpMessage(
469                         "TestDDAResponse",
470                         "Directory=" + tempFile.getParent(),
471                         "ReadContent=test-content",
472                         "EndMessage"
473                 ));
474                 fcpServer.writeLine(
475                         "TestDDAComplete",
476                         "Directory=" + tempFile.getParent(),
477                         "ReadDirectoryAllowed=true",
478                         "EndMessage"
479                 );
480                 lines = fcpServer.collectUntil(is("EndMessage"));
481                 assertThat(lines,
482                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
483                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
484         }
485
486         private File createTempFile() throws IOException {
487                 File tempFile = File.createTempFile("test-dda-", ".dat");
488                 tempFile.deleteOnExit();
489                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
490                 return tempFile;
491         }
492
493         @Test
494         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
495         throws InterruptedException, ExecutionException, IOException {
496                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
497                 connectNode();
498                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
499                 String identifier = extractIdentifier(lines);
500                 fcpServer.writeLine(
501                         "ProtocolError",
502                         "Identifier=not-the-right-one",
503                         "Code=25",
504                         "EndMessage"
505                 );
506                 fcpServer.writeLine(
507                         "PutSuccessful",
508                         "Identifier=" + identifier,
509                         "URI=KSK@foo.txt",
510                         "EndMessage"
511                 );
512                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
513         }
514
515         @Test
516         public void clientPutAbortsOnProtocolErrorOtherThan25()
517         throws InterruptedException, ExecutionException, IOException {
518                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
519                 connectNode();
520                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
521                 String identifier = extractIdentifier(lines);
522                 fcpServer.writeLine(
523                         "ProtocolError",
524                         "Identifier=" + identifier,
525                         "Code=1",
526                         "EndMessage"
527                 );
528                 assertThat(key.get().isPresent(), is(false));
529         }
530
531         @Test
532         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
533         InterruptedException {
534                 File tempFile = createTempFile();
535                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
536                 connectNode();
537                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
538                 String identifier = extractIdentifier(lines);
539                 fcpServer.writeLine(
540                         "ProtocolError",
541                         "Identifier=" + identifier,
542                         "Code=25",
543                         "EndMessage"
544                 );
545                 lines = fcpServer.collectUntil(is("EndMessage"));
546                 assertThat(lines, matchesFcpMessage(
547                         "TestDDARequest",
548                         "Directory=" + tempFile.getParent(),
549                         "WantReadDirectory=true",
550                         "WantWriteDirectory=false",
551                         "EndMessage"
552                 ));
553                 fcpServer.writeLine(
554                         "TestDDAReply",
555                         "Directory=/some-other-directory",
556                         "ReadFilename=" + tempFile,
557                         "EndMessage"
558                 );
559                 fcpServer.writeLine(
560                         "TestDDAReply",
561                         "Directory=" + tempFile.getParent(),
562                         "ReadFilename=" + tempFile,
563                         "EndMessage"
564                 );
565                 lines = fcpServer.collectUntil(is("EndMessage"));
566                 assertThat(lines, matchesFcpMessage(
567                         "TestDDAResponse",
568                         "Directory=" + tempFile.getParent(),
569                         "ReadContent=test-content",
570                         "EndMessage"
571                 ));
572         }
573
574         @Test
575         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
576         throws IOException, ExecutionException, InterruptedException {
577                 File tempFile = createTempFile();
578                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
579                 connectNode();
580                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
581                 String identifier = extractIdentifier(lines);
582                 fcpServer.writeLine(
583                         "ProtocolError",
584                         "Identifier=" + identifier,
585                         "Code=25",
586                         "EndMessage"
587                 );
588                 lines = fcpServer.collectUntil(is("EndMessage"));
589                 assertThat(lines, matchesFcpMessage(
590                         "TestDDARequest",
591                         "Directory=" + tempFile.getParent(),
592                         "WantReadDirectory=true",
593                         "WantWriteDirectory=false",
594                         "EndMessage"
595                 ));
596                 fcpServer.writeLine(
597                         "TestDDAReply",
598                         "Directory=" + tempFile.getParent(),
599                         "ReadFilename=" + tempFile + ".foo",
600                         "EndMessage"
601                 );
602                 lines = fcpServer.collectUntil(is("EndMessage"));
603                 assertThat(lines, matchesFcpMessage(
604                         "TestDDAResponse",
605                         "Directory=" + tempFile.getParent(),
606                         "ReadContent=failed-to-read",
607                         "EndMessage"
608                 ));
609         }
610
611         @Test
612         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
613         throws IOException, ExecutionException, InterruptedException {
614                 File tempFile = createTempFile();
615                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
616                 connectNode();
617                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
618                 String identifier = extractIdentifier(lines);
619                 fcpServer.writeLine(
620                         "TestDDAComplete",
621                         "Directory=/some-other-directory",
622                         "EndMessage"
623                 );
624                 fcpServer.writeLine(
625                         "ProtocolError",
626                         "Identifier=" + identifier,
627                         "Code=25",
628                         "EndMessage"
629                 );
630                 lines = fcpServer.collectUntil(is("EndMessage"));
631                 assertThat(lines, matchesFcpMessage(
632                         "TestDDARequest",
633                         "Directory=" + tempFile.getParent(),
634                         "WantReadDirectory=true",
635                         "WantWriteDirectory=false",
636                         "EndMessage"
637                 ));
638         }
639
640         @Test
641         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
642                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
643                 connectNode();
644                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
645                 assertThat(lines, matchesFcpMessage(
646                         "ListPeers",
647                         "WithVolatile=false",
648                         "WithMetadata=false",
649                         "EndMessage"
650                 ));
651                 String identifier = extractIdentifier(lines);
652                 fcpServer.writeLine(
653                         "Peer",
654                         "Identifier=" + identifier,
655                         "identity=id1",
656                         "EndMessage"
657                 );
658                 fcpServer.writeLine(
659                         "Peer",
660                         "Identifier=" + identifier,
661                         "identity=id2",
662                         "EndMessage"
663                 );
664                 fcpServer.writeLine(
665                         "EndListPeers",
666                         "Identifier=" + identifier,
667                         "EndMessage"
668                 );
669                 assertThat(peers.get(), hasSize(2));
670                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
671                         containsInAnyOrder("id1", "id2"));
672         }
673
674         @Test
675         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
676                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
677                 connectNode();
678                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
679                 assertThat(lines, matchesFcpMessage(
680                         "ListPeers",
681                         "WithVolatile=false",
682                         "WithMetadata=true",
683                         "EndMessage"
684                 ));
685                 String identifier = extractIdentifier(lines);
686                 fcpServer.writeLine(
687                         "Peer",
688                         "Identifier=" + identifier,
689                         "identity=id1",
690                         "metadata.foo=bar1",
691                         "EndMessage"
692                 );
693                 fcpServer.writeLine(
694                         "Peer",
695                         "Identifier=" + identifier,
696                         "identity=id2",
697                         "metadata.foo=bar2",
698                         "EndMessage"
699                 );
700                 fcpServer.writeLine(
701                         "EndListPeers",
702                         "Identifier=" + identifier,
703                         "EndMessage"
704                 );
705                 assertThat(peers.get(), hasSize(2));
706                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
707                         containsInAnyOrder("bar1", "bar2"));
708         }
709
710         @Test
711         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
712                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
713                 connectNode();
714                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
715                 assertThat(lines, matchesFcpMessage(
716                         "ListPeers",
717                         "WithVolatile=true",
718                         "WithMetadata=false",
719                         "EndMessage"
720                 ));
721                 String identifier = extractIdentifier(lines);
722                 fcpServer.writeLine(
723                         "Peer",
724                         "Identifier=" + identifier,
725                         "identity=id1",
726                         "volatile.foo=bar1",
727                         "EndMessage"
728                 );
729                 fcpServer.writeLine(
730                         "Peer",
731                         "Identifier=" + identifier,
732                         "identity=id2",
733                         "volatile.foo=bar2",
734                         "EndMessage"
735                 );
736                 fcpServer.writeLine(
737                         "EndListPeers",
738                         "Identifier=" + identifier,
739                         "EndMessage"
740                 );
741                 assertThat(peers.get(), hasSize(2));
742                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
743                         containsInAnyOrder("bar1", "bar2"));
744         }
745
746 }