From: David ‘Bombe’ Roden Date: Wed, 1 Jul 2015 19:29:21 +0000 (+0200) Subject: Add command that retrieves data from Freenet X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=b1cb058f5ab875ad39b1d8d506cb6019d28118b7;p=jFCPlib.git Add command that retrieves data from Freenet --- diff --git a/src/main/java/net/pterodactylus/fcp/ClientGet.java b/src/main/java/net/pterodactylus/fcp/ClientGet.java index 1126824..d340cf0 100644 --- a/src/main/java/net/pterodactylus/fcp/ClientGet.java +++ b/src/main/java/net/pterodactylus/fcp/ClientGet.java @@ -131,6 +131,10 @@ public class ClientGet extends FcpMessage { setField("PriorityClass", String.valueOf(priority)); } + public void setRealTimeFlag(boolean realTimeFlag) { + setField("RealTimeFlag", String.valueOf(realTimeFlag)); + } + /** * Sets the persistence of the request. * diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java new file mode 100644 index 0000000..60a46d0 --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommand.java @@ -0,0 +1,34 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.Future; + +import net.pterodactylus.fcp.Priority; + +/** + * Command that retrieves data from Freenet. + * + * @author David ‘Bombe’ Roden + */ +public interface ClientGetCommand { + + ClientGetCommand identifier(String identifier); + ClientGetCommand ignoreDataStore(); + ClientGetCommand dataStoreOnly(); + ClientGetCommand maxSize(long maxSize); + ClientGetCommand priority(Priority priority); + ClientGetCommand realTime(); + ClientGetCommand global(); + + Future> uri(String uri); + + interface Data { + + String getMimeType(); + long size(); + InputStream getInputStream(); + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 9cb0981..c37b40a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,6 +1,9 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -8,14 +11,22 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.ClientGet; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; import net.pterodactylus.fcp.GenerateSSK; +import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.ReturnType; import net.pterodactylus.fcp.SSKKeypair; +import com.google.common.io.ByteStreams; + /** * Default {@link FcpClient} implementation. * @@ -109,4 +120,178 @@ public class DefaultFcpClient implements FcpClient { } + @Override + public ClientGetCommand clientGet() { + return new ClientGetCommandImpl(); + } + + private class ClientGetCommandImpl implements ClientGetCommand { + + private String identifier; + private boolean ignoreDataStore; + private boolean dataStoreOnly; + private Long maxSize; + private Priority priority; + private boolean realTime; + private boolean global; + + @Override + public ClientGetCommand identifier(String identifier) { + this.identifier = identifier; + return this; + } + + @Override + public ClientGetCommand ignoreDataStore() { + ignoreDataStore = true; + return this; + } + + @Override + public ClientGetCommand dataStoreOnly() { + dataStoreOnly = true; + return this; + } + + @Override + public ClientGetCommand maxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } + + @Override + public ClientGetCommand priority(Priority priority) { + this.priority = priority; + return this; + } + + @Override + public ClientGetCommand realTime() { + realTime = true; + return this; + } + + @Override + public ClientGetCommand global() { + global = true; + return this; + } + + @Override + public Future> uri(String uri) { + return threadPool.submit(new Callable>() { + @Override + public Optional call() throws Exception { + DefaultFcpClient.this.connect(); + ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); + if (ignoreDataStore) { + clientGet.setIgnoreDataStore(true); + } + if (dataStoreOnly) { + clientGet.setDataStoreOnly(true); + } + if (maxSize != null) { + clientGet.setMaxSize(maxSize); + } + if (priority != null) { + clientGet.setPriority(priority); + } + if (realTime) { + clientGet.setRealTimeFlag(true); + } + if (global) { + clientGet.setGlobal(true); + } + try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) { + Sequence sequence = new Sequence(identifier); + replySequence.handle(AllData.class).with(sequence::allData); + replySequence.handle(GetFailed.class).with(sequence::getFailed); + replySequence.handleClose().with(sequence::disconnect); + replySequence.waitFor(sequence::isFinished); + replySequence.send(clientGet).get(); + return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty(); + } + } + }); + } + + private class Sequence { + + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + + private final String identifier; + + private String contentType; + private long dataLength; + private InputStream payload; + + private Sequence(String identifier) { + this.identifier = identifier; + } + + public boolean isFinished() { + return finished.get() || failed.get(); + } + + public boolean isSuccessful() { + return !failed.get(); + } + + public Data getData() { + return new Data() { + @Override + public String getMimeType() { + synchronized (Sequence.this) { + return contentType; + } + } + + @Override + public long size() { + synchronized (Sequence.this) { + return dataLength; + } + } + + @Override + public InputStream getInputStream() { + synchronized (Sequence.this) { + return payload; + } + } + }; + } + + public void allData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } + } + } + + public void getFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { + failed.set(true); + } + } + + public void disconnect(Throwable t) { + failed.set(true); + } + + } + + } + } + diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java index 065f68a..905dd4f 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java @@ -8,5 +8,6 @@ package net.pterodactylus.fcp.quelaton; public interface FcpClient { GenerateKeypairCommand generateKeypair(); + ClientGetCommand clientGet(); } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index 802788c..5d7b441 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -1,17 +1,24 @@ package net.pterodactylus.fcp.quelaton; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.fake.FakeTcpServer; +import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data; +import com.google.common.io.ByteStreams; import org.junit.After; import org.junit.Test; @@ -73,4 +80,161 @@ public class DefaultFcpClientTest { ); } + @Test + public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "EndMessage" + )); + fcpServer.writeLine( + "AllData", + "Identifier=test", + "DataLength=6", + "StartupTime=1435610539000", + "CompletionTime=1435610540000", + "Metadata.ContentType=text/plain;charset=utf-8", + "Data", + "Hello" + ); + Optional data = dataFuture.get(); + assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8")); + assertThat(data.get().size(), is(6L)); + assertThat(ByteStreams.toByteArray(data.get().getInputStream()), is("Hello\n".getBytes(StandardCharsets.UTF_8))); + } + + @Test + public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "EndMessage" + )); + fcpServer.writeLine( + "GetFailed", + "Identifier=test", + "Code=3", + "EndMessage" + ); + Optional data = dataFuture.get(); + assertThat(data.isPresent(), is(false)); + } + + @Test + public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "EndMessage" + )); + fcpServer.close(); + Optional data = dataFuture.get(); + assertThat(data.isPresent(), is(false)); + } + + @Test + public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().ignoreDataStore().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "IgnoreDS=true", + "EndMessage" + )); + } + + @Test + public void clientGetWithDataStoreOnlySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().dataStoreOnly().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "DSonly=true", + "EndMessage" + )); + } + + @Test + public void clientGetWithMaxSizeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().maxSize(1048576).identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "MaxSize=1048576", + "EndMessage" + )); + } + + @Test + public void clientGetWithPrioritySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().priority(Priority.interactive).identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "PriorityClass=1", + "EndMessage" + )); + } + + @Test + public void clientGetWithRealTimeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().realTime().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "RealTimeFlag=true", + "EndMessage" + )); + } + + @Test + public void clientGetWithGlobalSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().global().identifier("test").uri("KSK@foo.txt"); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, containsInAnyOrder( + "ClientGet", + "Identifier=test", + "ReturnType=direct", + "URI=KSK@foo.txt", + "Global=true", + "EndMessage" + )); + } + }