Add test for reusing connections
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.logging.Level;
20 import java.util.logging.Logger;
21 import java.util.stream.Collectors;
22
23 import net.pterodactylus.fcp.FcpKeyPair;
24 import net.pterodactylus.fcp.Key;
25 import net.pterodactylus.fcp.Peer;
26 import net.pterodactylus.fcp.Priority;
27 import net.pterodactylus.fcp.fake.FakeTcpServer;
28 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
29
30 import com.google.common.io.ByteStreams;
31 import com.google.common.io.Files;
32 import org.hamcrest.Description;
33 import org.hamcrest.Matcher;
34 import org.hamcrest.TypeSafeDiagnosingMatcher;
35 import org.junit.After;
36 import org.junit.Test;
37
38 /**
39  * Unit test for {@link DefaultFcpClient}.
40  *
41  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
42  */
43 public class DefaultFcpClientTest {
44
45         private static final String INSERT_URI =
46                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
47         private static final String REQUEST_URI =
48                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
49
50         private static int threadCounter = 0;
51         private final ExecutorService threadPool =
52                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
53         private final FakeTcpServer fcpServer;
54         private final DefaultFcpClient fcpClient;
55
56         public DefaultFcpClientTest() throws IOException {
57                 fcpServer = new FakeTcpServer(threadPool);
58                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
59         }
60
61         @After
62         public void tearDown() throws IOException {
63                 fcpServer.close();
64         }
65
66         @Test(expected = ExecutionException.class)
67         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
68         throws IOException, ExecutionException, InterruptedException {
69                 Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
70                 Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
71                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
72                 fcpServer.connect().get();
73                 fcpServer.collectUntil(is("EndMessage"));
74                 fcpServer.writeLine(
75                         "CloseConnectionDuplicateClientName",
76                         "EndMessage"
77                 );
78                 keyPairFuture.get();
79         }
80
81         @Test(expected = ExecutionException.class)
82         public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
83         throws IOException, ExecutionException, InterruptedException {
84                 Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
85                 Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
86                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
87                 fcpServer.connect().get();
88                 fcpServer.collectUntil(is("EndMessage"));
89                 fcpServer.close();
90                 keyPairFuture.get();
91         }
92
93         @Test
94         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
95                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
96                 connectNode();
97                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
98                 String identifier = extractIdentifier(lines);
99                 fcpServer.writeLine("SSKKeypair",
100                         "InsertURI=" + INSERT_URI + "",
101                         "RequestURI=" + REQUEST_URI + "",
102                         "Identifier=" + identifier,
103                         "EndMessage");
104                 FcpKeyPair keyPair = keyPairFuture.get();
105                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
106                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
107         }
108
109         private void connectNode() throws InterruptedException, ExecutionException, IOException {
110                 fcpServer.connect().get();
111                 fcpServer.collectUntil(is("EndMessage"));
112                 fcpServer.writeLine("NodeHello",
113                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
114                         "Revision=build01466",
115                         "Testnet=false",
116                         "Version=Fred,0.7,1.0,1466",
117                         "Build=1466",
118                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
119                         "Node=Fred",
120                         "ExtBuild=29",
121                         "FCPVersion=2.0",
122                         "NodeLanguage=ENGLISH",
123                         "ExtRevision=v29",
124                         "EndMessage"
125                 );
126         }
127
128         @Test
129         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
130                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
131                 connectNode();
132                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
133                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
134                 String identifier = extractIdentifier(lines);
135                 fcpServer.writeLine(
136                         "AllData",
137                         "Identifier=" + identifier,
138                         "DataLength=6",
139                         "StartupTime=1435610539000",
140                         "CompletionTime=1435610540000",
141                         "Metadata.ContentType=text/plain;charset=utf-8",
142                         "Data",
143                         "Hello"
144                 );
145                 Optional<Data> data = dataFuture.get();
146                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
147                 assertThat(data.get().size(), is(6L));
148                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
149                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
150         }
151
152         private String extractIdentifier(List<String> lines) {
153                 return lines.stream()
154                         .filter(s -> s.startsWith("Identifier="))
155                         .map(s -> s.substring(s.indexOf('=') + 1))
156                         .findFirst()
157                         .orElse("");
158         }
159
160         @Test
161         public void clientGetDownloadsDataForCorrectIdentifier()
162         throws InterruptedException, ExecutionException, IOException {
163                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
164                 connectNode();
165                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
166                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
167                 String identifier = extractIdentifier(lines);
168                 fcpServer.writeLine(
169                         "AllData",
170                         "Identifier=not-test",
171                         "DataLength=12",
172                         "StartupTime=1435610539000",
173                         "CompletionTime=1435610540000",
174                         "Metadata.ContentType=text/plain;charset=latin-9",
175                         "Data",
176                         "Hello World"
177                 );
178                 fcpServer.writeLine(
179                         "AllData",
180                         "Identifier=" + identifier,
181                         "DataLength=6",
182                         "StartupTime=1435610539000",
183                         "CompletionTime=1435610540000",
184                         "Metadata.ContentType=text/plain;charset=utf-8",
185                         "Data",
186                         "Hello"
187                 );
188                 Optional<Data> data = dataFuture.get();
189                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
190                 assertThat(data.get().size(), is(6L));
191                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
192                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
193         }
194
195         @Test
196         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
197                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
198                 connectNode();
199                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
200                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
201                 String identifier = extractIdentifier(lines);
202                 fcpServer.writeLine(
203                         "GetFailed",
204                         "Identifier=" + identifier,
205                         "Code=3",
206                         "EndMessage"
207                 );
208                 Optional<Data> data = dataFuture.get();
209                 assertThat(data.isPresent(), is(false));
210         }
211
212         @Test
213         public void clientGetRecognizesGetFailedForCorrectIdentifier()
214         throws InterruptedException, ExecutionException, IOException {
215                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
216                 connectNode();
217                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
218                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
219                 String identifier = extractIdentifier(lines);
220                 fcpServer.writeLine(
221                         "GetFailed",
222                         "Identifier=not-test",
223                         "Code=3",
224                         "EndMessage"
225                 );
226                 fcpServer.writeLine(
227                         "GetFailed",
228                         "Identifier=" + identifier,
229                         "Code=3",
230                         "EndMessage"
231                 );
232                 Optional<Data> data = dataFuture.get();
233                 assertThat(data.isPresent(), is(false));
234         }
235
236         @Test(expected = ExecutionException.class)
237         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
238                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
239                 connectNode();
240                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
241                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
242                 fcpServer.close();
243                 dataFuture.get();
244         }
245
246         @Test
247         public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
248                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
249                 connectNode();
250                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
251                 String identifier = extractIdentifier(lines);
252                 fcpServer.writeLine(
253                         "SSKKeypair",
254                         "InsertURI=" + INSERT_URI + "",
255                         "RequestURI=" + REQUEST_URI + "",
256                         "Identifier=" + identifier,
257                         "EndMessage"
258                 );
259                 keyPair.get();
260                 keyPair = fcpClient.generateKeypair().execute();
261                 lines = fcpServer.collectUntil(is("EndMessage"));
262                 identifier = extractIdentifier(lines);
263                 fcpServer.writeLine(
264                         "SSKKeypair",
265                         "InsertURI=" + INSERT_URI + "",
266                         "RequestURI=" + REQUEST_URI + "",
267                         "Identifier=" + identifier,
268                         "EndMessage"
269                 );
270                 keyPair.get();
271         }
272
273         @Test
274         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
275         throws InterruptedException, ExecutionException, IOException {
276                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
277                 connectNode();
278                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
279                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
280         }
281
282         @Test
283         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
284         throws InterruptedException, ExecutionException, IOException {
285                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
286                 connectNode();
287                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
288                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
289         }
290
291         @Test
292         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
293         throws InterruptedException, ExecutionException, IOException {
294                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
295                 connectNode();
296                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
297                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
298         }
299
300         @Test
301         public void clientGetWithPrioritySettingSendsCorrectCommands()
302         throws InterruptedException, ExecutionException, IOException {
303                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
304                 connectNode();
305                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
306                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
307         }
308
309         @Test
310         public void clientGetWithRealTimeSettingSendsCorrectCommands()
311         throws InterruptedException, ExecutionException, IOException {
312                 fcpClient.clientGet().realTime().uri("KSK@foo.txt");
313                 connectNode();
314                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
315                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
316         }
317
318         @Test
319         public void clientGetWithGlobalSettingSendsCorrectCommands()
320         throws InterruptedException, ExecutionException, IOException {
321                 fcpClient.clientGet().global().uri("KSK@foo.txt");
322                 connectNode();
323                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
324                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
325         }
326
327         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
328                 return new TypeSafeDiagnosingMatcher<List<String>>() {
329                         @Override
330                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
331                                 if (!item.get(0).equals(name)) {
332                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
333                                         return false;
334                                 }
335                                 for (String requiredLine : requiredLines) {
336                                         if (item.indexOf(requiredLine) < 1) {
337                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
338                                                 return false;
339                                         }
340                                 }
341                                 return true;
342                         }
343
344                         @Override
345                         public void describeTo(Description description) {
346                                 description.appendText("FCP message named ").appendValue(name);
347                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
348                         }
349                 };
350         }
351
352         @Test
353         public void clientPutWithDirectDataSendsCorrectCommand()
354         throws IOException, ExecutionException, InterruptedException {
355                 fcpClient.clientPut()
356                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
357                         .length(6)
358                         .uri("KSK@foo.txt");
359                 connectNode();
360                 List<String> lines = fcpServer.collectUntil(is("Hello"));
361                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
362         }
363
364         @Test
365         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
366         throws InterruptedException, ExecutionException, IOException {
367                 Future<Optional<Key>> key = fcpClient.clientPut()
368                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
369                         .length(6)
370                         .uri("KSK@foo.txt");
371                 connectNode();
372                 List<String> lines = fcpServer.collectUntil(is("Hello"));
373                 String identifier = extractIdentifier(lines);
374                 fcpServer.writeLine(
375                         "PutFailed",
376                         "Identifier=not-the-right-one",
377                         "EndMessage"
378                 );
379                 fcpServer.writeLine(
380                         "PutSuccessful",
381                         "URI=KSK@foo.txt",
382                         "Identifier=" + identifier,
383                         "EndMessage"
384                 );
385                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
386         }
387
388         @Test
389         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
390         throws InterruptedException, ExecutionException, IOException {
391                 Future<Optional<Key>> key = fcpClient.clientPut()
392                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
393                         .length(6)
394                         .uri("KSK@foo.txt");
395                 connectNode();
396                 List<String> lines = fcpServer.collectUntil(is("Hello"));
397                 String identifier = extractIdentifier(lines);
398                 fcpServer.writeLine(
399                         "PutSuccessful",
400                         "Identifier=not-the-right-one",
401                         "URI=KSK@foo.txt",
402                         "EndMessage"
403                 );
404                 fcpServer.writeLine(
405                         "PutFailed",
406                         "Identifier=" + identifier,
407                         "EndMessage"
408                 );
409                 assertThat(key.get().isPresent(), is(false));
410         }
411
412         @Test
413         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
414         throws InterruptedException, ExecutionException, IOException {
415                 fcpClient.clientPut()
416                         .named("otherName.txt")
417                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
418                         .length(6)
419                         .uri("KSK@foo.txt");
420                 connectNode();
421                 List<String> lines = fcpServer.collectUntil(is("Hello"));
422                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
423                         "DataLength=6", "URI=KSK@foo.txt"));
424         }
425
426         @Test
427         public void clientPutWithRedirectSendsCorrectCommand()
428         throws IOException, ExecutionException, InterruptedException {
429                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt");
430                 connectNode();
431                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
432                 assertThat(lines,
433                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
434         }
435
436         @Test
437         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
438                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
439                 connectNode();
440                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
441                 assertThat(lines,
442                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
443         }
444
445         @Test
446         public void clientPutWithFileCanCompleteTestDdaSequence()
447         throws IOException, ExecutionException, InterruptedException {
448                 File tempFile = createTempFile();
449                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
450                 connectNode();
451                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
452                 String identifier = extractIdentifier(lines);
453                 fcpServer.writeLine(
454                         "ProtocolError",
455                         "Identifier=" + identifier,
456                         "Code=25",
457                         "EndMessage"
458                 );
459                 lines = fcpServer.collectUntil(is("EndMessage"));
460                 assertThat(lines, matchesFcpMessage(
461                         "TestDDARequest",
462                         "Directory=" + tempFile.getParent(),
463                         "WantReadDirectory=true",
464                         "WantWriteDirectory=false",
465                         "EndMessage"
466                 ));
467                 fcpServer.writeLine(
468                         "TestDDAReply",
469                         "Directory=" + tempFile.getParent(),
470                         "ReadFilename=" + tempFile,
471                         "EndMessage"
472                 );
473                 lines = fcpServer.collectUntil(is("EndMessage"));
474                 assertThat(lines, matchesFcpMessage(
475                         "TestDDAResponse",
476                         "Directory=" + tempFile.getParent(),
477                         "ReadContent=test-content",
478                         "EndMessage"
479                 ));
480                 fcpServer.writeLine(
481                         "TestDDAComplete",
482                         "Directory=" + tempFile.getParent(),
483                         "ReadDirectoryAllowed=true",
484                         "EndMessage"
485                 );
486                 lines = fcpServer.collectUntil(is("EndMessage"));
487                 assertThat(lines,
488                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
489                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
490         }
491
492         private File createTempFile() throws IOException {
493                 File tempFile = File.createTempFile("test-dda-", ".dat");
494                 tempFile.deleteOnExit();
495                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
496                 return tempFile;
497         }
498
499         @Test
500         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
501         throws InterruptedException, ExecutionException, IOException {
502                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
503                 connectNode();
504                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
505                 String identifier = extractIdentifier(lines);
506                 fcpServer.writeLine(
507                         "ProtocolError",
508                         "Identifier=not-the-right-one",
509                         "Code=25",
510                         "EndMessage"
511                 );
512                 fcpServer.writeLine(
513                         "PutSuccessful",
514                         "Identifier=" + identifier,
515                         "URI=KSK@foo.txt",
516                         "EndMessage"
517                 );
518                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
519         }
520
521         @Test
522         public void clientPutAbortsOnProtocolErrorOtherThan25()
523         throws InterruptedException, ExecutionException, IOException {
524                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
525                 connectNode();
526                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
527                 String identifier = extractIdentifier(lines);
528                 fcpServer.writeLine(
529                         "ProtocolError",
530                         "Identifier=" + identifier,
531                         "Code=1",
532                         "EndMessage"
533                 );
534                 assertThat(key.get().isPresent(), is(false));
535         }
536
537         @Test
538         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
539         InterruptedException {
540                 File tempFile = createTempFile();
541                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
542                 connectNode();
543                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
544                 String identifier = extractIdentifier(lines);
545                 fcpServer.writeLine(
546                         "ProtocolError",
547                         "Identifier=" + identifier,
548                         "Code=25",
549                         "EndMessage"
550                 );
551                 lines = fcpServer.collectUntil(is("EndMessage"));
552                 assertThat(lines, matchesFcpMessage(
553                         "TestDDARequest",
554                         "Directory=" + tempFile.getParent(),
555                         "WantReadDirectory=true",
556                         "WantWriteDirectory=false",
557                         "EndMessage"
558                 ));
559                 fcpServer.writeLine(
560                         "TestDDAReply",
561                         "Directory=/some-other-directory",
562                         "ReadFilename=" + tempFile,
563                         "EndMessage"
564                 );
565                 fcpServer.writeLine(
566                         "TestDDAReply",
567                         "Directory=" + tempFile.getParent(),
568                         "ReadFilename=" + tempFile,
569                         "EndMessage"
570                 );
571                 lines = fcpServer.collectUntil(is("EndMessage"));
572                 assertThat(lines, matchesFcpMessage(
573                         "TestDDAResponse",
574                         "Directory=" + tempFile.getParent(),
575                         "ReadContent=test-content",
576                         "EndMessage"
577                 ));
578         }
579
580         @Test
581         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
582         throws IOException, ExecutionException, InterruptedException {
583                 File tempFile = createTempFile();
584                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
585                 connectNode();
586                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
587                 String identifier = extractIdentifier(lines);
588                 fcpServer.writeLine(
589                         "ProtocolError",
590                         "Identifier=" + identifier,
591                         "Code=25",
592                         "EndMessage"
593                 );
594                 lines = fcpServer.collectUntil(is("EndMessage"));
595                 assertThat(lines, matchesFcpMessage(
596                         "TestDDARequest",
597                         "Directory=" + tempFile.getParent(),
598                         "WantReadDirectory=true",
599                         "WantWriteDirectory=false",
600                         "EndMessage"
601                 ));
602                 fcpServer.writeLine(
603                         "TestDDAReply",
604                         "Directory=" + tempFile.getParent(),
605                         "ReadFilename=" + tempFile + ".foo",
606                         "EndMessage"
607                 );
608                 lines = fcpServer.collectUntil(is("EndMessage"));
609                 assertThat(lines, matchesFcpMessage(
610                         "TestDDAResponse",
611                         "Directory=" + tempFile.getParent(),
612                         "ReadContent=failed-to-read",
613                         "EndMessage"
614                 ));
615         }
616
617         @Test
618         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
619         throws IOException, ExecutionException, InterruptedException {
620                 File tempFile = createTempFile();
621                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
622                 connectNode();
623                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
624                 String identifier = extractIdentifier(lines);
625                 fcpServer.writeLine(
626                         "TestDDAComplete",
627                         "Directory=/some-other-directory",
628                         "EndMessage"
629                 );
630                 fcpServer.writeLine(
631                         "ProtocolError",
632                         "Identifier=" + identifier,
633                         "Code=25",
634                         "EndMessage"
635                 );
636                 lines = fcpServer.collectUntil(is("EndMessage"));
637                 assertThat(lines, matchesFcpMessage(
638                         "TestDDARequest",
639                         "Directory=" + tempFile.getParent(),
640                         "WantReadDirectory=true",
641                         "WantWriteDirectory=false",
642                         "EndMessage"
643                 ));
644         }
645
646         @Test
647         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
648                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
649                 connectNode();
650                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
651                 assertThat(lines, matchesFcpMessage(
652                         "ListPeers",
653                         "WithVolatile=false",
654                         "WithMetadata=false",
655                         "EndMessage"
656                 ));
657                 String identifier = extractIdentifier(lines);
658                 fcpServer.writeLine(
659                         "Peer",
660                         "Identifier=" + identifier,
661                         "identity=id1",
662                         "EndMessage"
663                 );
664                 fcpServer.writeLine(
665                         "Peer",
666                         "Identifier=" + identifier,
667                         "identity=id2",
668                         "EndMessage"
669                 );
670                 fcpServer.writeLine(
671                         "EndListPeers",
672                         "Identifier=" + identifier,
673                         "EndMessage"
674                 );
675                 assertThat(peers.get(), hasSize(2));
676                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
677                         containsInAnyOrder("id1", "id2"));
678         }
679
680         @Test
681         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
682                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
683                 connectNode();
684                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
685                 assertThat(lines, matchesFcpMessage(
686                         "ListPeers",
687                         "WithVolatile=false",
688                         "WithMetadata=true",
689                         "EndMessage"
690                 ));
691                 String identifier = extractIdentifier(lines);
692                 fcpServer.writeLine(
693                         "Peer",
694                         "Identifier=" + identifier,
695                         "identity=id1",
696                         "metadata.foo=bar1",
697                         "EndMessage"
698                 );
699                 fcpServer.writeLine(
700                         "Peer",
701                         "Identifier=" + identifier,
702                         "identity=id2",
703                         "metadata.foo=bar2",
704                         "EndMessage"
705                 );
706                 fcpServer.writeLine(
707                         "EndListPeers",
708                         "Identifier=" + identifier,
709                         "EndMessage"
710                 );
711                 assertThat(peers.get(), hasSize(2));
712                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
713                         containsInAnyOrder("bar1", "bar2"));
714         }
715
716         @Test
717         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
718                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
719                 connectNode();
720                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
721                 assertThat(lines, matchesFcpMessage(
722                         "ListPeers",
723                         "WithVolatile=true",
724                         "WithMetadata=false",
725                         "EndMessage"
726                 ));
727                 String identifier = extractIdentifier(lines);
728                 fcpServer.writeLine(
729                         "Peer",
730                         "Identifier=" + identifier,
731                         "identity=id1",
732                         "volatile.foo=bar1",
733                         "EndMessage"
734                 );
735                 fcpServer.writeLine(
736                         "Peer",
737                         "Identifier=" + identifier,
738                         "identity=id2",
739                         "volatile.foo=bar2",
740                         "EndMessage"
741                 );
742                 fcpServer.writeLine(
743                         "EndListPeers",
744                         "Identifier=" + identifier,
745                         "EndMessage"
746                 );
747                 assertThat(peers.get(), hasSize(2));
748                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
749                         containsInAnyOrder("bar1", "bar2"));
750         }
751
752 }