Add command that retrieves data from Freenet
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Wed, 1 Jul 2015 19:29:21 +0000 (21:29 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Wed, 1 Jul 2015 19:29:48 +0000 (21:29 +0200)
src/main/java/net/pterodactylus/fcp/ClientGet.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java
src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

index 1126824..d340cf0 100644 (file)
@@ -131,6 +131,10 @@ public class ClientGet extends FcpMessage {
                setField("PriorityClass", String.valueOf(priority));
        }
 
+       public void setRealTimeFlag(boolean realTimeFlag) {
+               setField("RealTimeFlag", String.valueOf(realTimeFlag));
+       }
+
        /**
         * Sets the persistence of the request.
         *
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java
new file mode 100644 (file)
index 0000000..60a46d0
--- /dev/null
@@ -0,0 +1,34 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.concurrent.Future;
+
+import net.pterodactylus.fcp.Priority;
+
+/**
+ * Command that retrieves data from Freenet.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public interface ClientGetCommand {
+
+       ClientGetCommand identifier(String identifier);
+       ClientGetCommand ignoreDataStore();
+       ClientGetCommand dataStoreOnly();
+       ClientGetCommand maxSize(long maxSize);
+       ClientGetCommand priority(Priority priority);
+       ClientGetCommand realTime();
+       ClientGetCommand global();
+
+       Future<Optional<Data>> uri(String uri);
+
+       interface Data {
+
+               String getMimeType();
+               long size();
+               InputStream getInputStream();
+
+       }
+
+}
index 9cb0981..c37b40a 100644 (file)
@@ -1,6 +1,9 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -8,14 +11,22 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.ClientGet;
 import net.pterodactylus.fcp.ClientHello;
 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.FcpConnection;
 import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
 import net.pterodactylus.fcp.GenerateSSK;
+import net.pterodactylus.fcp.GetFailed;
 import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Priority;
+import net.pterodactylus.fcp.ReturnType;
 import net.pterodactylus.fcp.SSKKeypair;
 
+import com.google.common.io.ByteStreams;
+
 /**
  * Default {@link FcpClient} implementation.
  *
@@ -109,4 +120,178 @@ public class DefaultFcpClient implements FcpClient {
 
        }
 
+       @Override
+       public ClientGetCommand clientGet() {
+               return new ClientGetCommandImpl();
+       }
+
+       private class ClientGetCommandImpl implements ClientGetCommand {
+
+               private String identifier;
+               private boolean ignoreDataStore;
+               private boolean dataStoreOnly;
+               private Long maxSize;
+               private Priority priority;
+               private boolean realTime;
+               private boolean global;
+
+               @Override
+               public ClientGetCommand identifier(String identifier) {
+                       this.identifier = identifier;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand ignoreDataStore() {
+                       ignoreDataStore = true;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand dataStoreOnly() {
+                       dataStoreOnly = true;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand maxSize(long maxSize) {
+                       this.maxSize = maxSize;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand priority(Priority priority) {
+                       this.priority = priority;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand realTime() {
+                       realTime = true;
+                       return this;
+               }
+
+               @Override
+               public ClientGetCommand global() {
+                       global = true;
+                       return this;
+               }
+
+               @Override
+               public Future<Optional<Data>> uri(String uri) {
+                       return threadPool.submit(new Callable<Optional<Data>>() {
+                               @Override
+                               public Optional<Data> call() throws Exception {
+                                       DefaultFcpClient.this.connect();
+                                       ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
+                                       if (ignoreDataStore) {
+                                               clientGet.setIgnoreDataStore(true);
+                                       }
+                                       if (dataStoreOnly) {
+                                               clientGet.setDataStoreOnly(true);
+                                       }
+                                       if (maxSize != null) {
+                                               clientGet.setMaxSize(maxSize);
+                                       }
+                                       if (priority != null) {
+                                               clientGet.setPriority(priority);
+                                       }
+                                       if (realTime) {
+                                               clientGet.setRealTimeFlag(true);
+                                       }
+                                       if (global) {
+                                               clientGet.setGlobal(true);
+                                       }
+                                       try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
+                                               Sequence sequence = new Sequence(identifier);
+                                               replySequence.handle(AllData.class).with(sequence::allData);
+                                               replySequence.handle(GetFailed.class).with(sequence::getFailed);
+                                               replySequence.handleClose().with(sequence::disconnect);
+                                               replySequence.waitFor(sequence::isFinished);
+                                               replySequence.send(clientGet).get();
+                                               return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
+                                       }
+                               }
+                       });
+               }
+
+               private class Sequence {
+
+                       private final AtomicBoolean finished = new AtomicBoolean();
+                       private final AtomicBoolean failed = new AtomicBoolean();
+
+                       private final String identifier;
+
+                       private String contentType;
+                       private long dataLength;
+                       private InputStream payload;
+
+                       private Sequence(String identifier) {
+                               this.identifier = identifier;
+                       }
+
+                       public boolean isFinished() {
+                               return finished.get() || failed.get();
+                       }
+
+                       public boolean isSuccessful() {
+                               return !failed.get();
+                       }
+
+                       public Data getData() {
+                               return new Data() {
+                                       @Override
+                                       public String getMimeType() {
+                                               synchronized (Sequence.this) {
+                                                       return contentType;
+                                               }
+                                       }
+
+                                       @Override
+                                       public long size() {
+                                               synchronized (Sequence.this) {
+                                                       return dataLength;
+                                               }
+                                       }
+
+                                       @Override
+                                       public InputStream getInputStream() {
+                                               synchronized (Sequence.this) {
+                                                       return payload;
+                                               }
+                                       }
+                               };
+                       }
+
+                       public void allData(AllData allData) {
+                               if (allData.getIdentifier().equals(identifier)) {
+                                       synchronized (this) {
+                                               contentType = allData.getContentType();
+                                               dataLength = allData.getDataLength();
+                                               try {
+                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+                                                       finished.set(true);
+                                               } catch (IOException e) {
+                                                       // TODO – logging
+                                                       failed.set(true);
+                                               }
+                                       }
+                               }
+                       }
+
+                       public void getFailed(GetFailed getFailed) {
+                               if (getFailed.getIdentifier().equals(identifier)) {
+                                       failed.set(true);
+                               }
+                       }
+
+                       public void disconnect(Throwable t) {
+                               failed.set(true);
+                       }
+
+               }
+
+       }
+
 }
+
index 065f68a..905dd4f 100644 (file)
@@ -8,5 +8,6 @@ package net.pterodactylus.fcp.quelaton;
 public interface FcpClient {
 
        GenerateKeypairCommand generateKeypair();
+       ClientGetCommand clientGet();
 
 }
index 802788c..5d7b441 100644 (file)
@@ -1,17 +1,24 @@
 package net.pterodactylus.fcp.quelaton;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.Priority;
 import net.pterodactylus.fcp.fake.FakeTcpServer;
+import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
 
+import com.google.common.io.ByteStreams;
 import org.junit.After;
 import org.junit.Test;
 
@@ -73,4 +80,161 @@ public class DefaultFcpClientTest {
                );
        }
 
+       @Test
+       public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
+               Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "EndMessage"
+               ));
+               fcpServer.writeLine(
+                       "AllData",
+                       "Identifier=test",
+                       "DataLength=6",
+                       "StartupTime=1435610539000",
+                       "CompletionTime=1435610540000",
+                       "Metadata.ContentType=text/plain;charset=utf-8",
+                       "Data",
+                       "Hello"
+               );
+               Optional<Data> data = dataFuture.get();
+               assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
+               assertThat(data.get().size(), is(6L));
+               assertThat(ByteStreams.toByteArray(data.get().getInputStream()), is("Hello\n".getBytes(StandardCharsets.UTF_8)));
+       }
+
+       @Test
+       public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
+               Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "EndMessage"
+               ));
+               fcpServer.writeLine(
+                       "GetFailed",
+                       "Identifier=test",
+                       "Code=3",
+                       "EndMessage"
+               );
+               Optional<Data> data = dataFuture.get();
+               assertThat(data.isPresent(), is(false));
+       }
+
+       @Test
+       public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
+               Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "EndMessage"
+               ));
+               fcpServer.close();
+               Optional<Data> data = dataFuture.get();
+               assertThat(data.isPresent(), is(false));
+       }
+
+       @Test
+       public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().ignoreDataStore().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "IgnoreDS=true",
+                       "EndMessage"
+               ));
+       }
+
+       @Test
+       public void clientGetWithDataStoreOnlySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().dataStoreOnly().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "DSonly=true",
+                       "EndMessage"
+               ));
+       }
+
+       @Test
+       public void clientGetWithMaxSizeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().maxSize(1048576).identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "MaxSize=1048576",
+                       "EndMessage"
+               ));
+       }
+
+       @Test
+       public void clientGetWithPrioritySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().priority(Priority.interactive).identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "PriorityClass=1",
+                       "EndMessage"
+               ));
+       }
+
+       @Test
+       public void clientGetWithRealTimeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().realTime().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "RealTimeFlag=true",
+                       "EndMessage"
+               ));
+       }
+
+       @Test
+       public void clientGetWithGlobalSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
+               fcpClient.clientGet().global().identifier("test").uri("KSK@foo.txt");
+               connectNode();
+               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+               assertThat(lines, containsInAnyOrder(
+                       "ClientGet",
+                       "Identifier=test",
+                       "ReturnType=direct",
+                       "URI=KSK@foo.txt",
+                       "Global=true",
+                       "EndMessage"
+               ));
+       }
+
 }