Add progress consumer to ClientPut command
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 15 Oct 2015 21:21:58 +0000 (23:21 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 15 Oct 2015 21:23:52 +0000 (23:23 +0200)
src/main/java/net/pterodactylus/fcp/RequestProgress.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/SimpleProgress.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommand.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
src/test/java/net/pterodactylus/fcp/RequestProgressMatcher.java [new file with mode: 0644]
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

diff --git a/src/main/java/net/pterodactylus/fcp/RequestProgress.java b/src/main/java/net/pterodactylus/fcp/RequestProgress.java
new file mode 100644 (file)
index 0000000..2dcc109
--- /dev/null
@@ -0,0 +1,63 @@
+package net.pterodactylus.fcp;
+
+/**
+ * Progress information about a request.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class RequestProgress {
+
+       private final int total;
+       private final int required;
+       private final int failed;
+       private final int fatallyFailed;
+       private final long lastProgress;
+       private final int succeeded;
+       private final boolean finalizedTotal;
+       private final int minSuccessFetchBlocks;
+
+       public RequestProgress(int total, int required, int failed, int fatallyFailed, long lastProgress, int succeeded,
+               boolean finalizedTotal, int minSuccessFetchBlocks) {
+               this.total = total;
+               this.required = required;
+               this.failed = failed;
+               this.fatallyFailed = fatallyFailed;
+               this.lastProgress = lastProgress;
+               this.succeeded = succeeded;
+               this.finalizedTotal = finalizedTotal;
+               this.minSuccessFetchBlocks = minSuccessFetchBlocks;
+       }
+
+       public int getTotal() {
+               return total;
+       }
+
+       public int getRequired() {
+               return required;
+       }
+
+       public int getFailed() {
+               return failed;
+       }
+
+       public int getFatallyFailed() {
+               return fatallyFailed;
+       }
+
+       public long getLastProgress() {
+               return lastProgress;
+       }
+
+       public int getSucceeded() {
+               return succeeded;
+       }
+
+       public boolean isFinalizedTotal() {
+               return finalizedTotal;
+       }
+
+       public int getMinSuccessFetchBlocks() {
+               return minSuccessFetchBlocks;
+       }
+
+}
index 78017ad..69a3bf2 100644 (file)
@@ -99,6 +99,14 @@ public class SimpleProgress extends BaseMessage implements Identifiable {
                return Boolean.valueOf(getField("FinalizedTotal"));
        }
 
+       public long getLastProgress() {
+               return Long.valueOf(getField("LastProgress"));
+       }
+
+       public int getMinSuccessFetchBlocks() {
+               return Integer.valueOf(getField("MinSuccessFetchBlocks"));
+       }
+
        /**
         * Returns the identifier of the request.
         *
index 7e900f2..3fc5767 100644 (file)
@@ -6,6 +6,7 @@ import java.util.Optional;
 import java.util.function.Consumer;
 
 import net.pterodactylus.fcp.Key;
+import net.pterodactylus.fcp.RequestProgress;
 
 /**
  * FCP command that inserts data into Freenet.
@@ -14,6 +15,7 @@ import net.pterodactylus.fcp.Key;
  */
 public interface ClientPutCommand {
 
+       ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer);
        ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated);
        ClientPutCommand named(String targetFilename);
        WithUri redirectTo(String uri);
index 982fd76..8dbaf0f 100644 (file)
@@ -21,12 +21,15 @@ import net.pterodactylus.fcp.Key;
 import net.pterodactylus.fcp.ProtocolError;
 import net.pterodactylus.fcp.PutFailed;
 import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.RequestProgress;
+import net.pterodactylus.fcp.SimpleProgress;
 import net.pterodactylus.fcp.TestDDAComplete;
 import net.pterodactylus.fcp.TestDDAReply;
 import net.pterodactylus.fcp.TestDDARequest;
 import net.pterodactylus.fcp.TestDDAResponse;
 import net.pterodactylus.fcp.URIGenerated;
 import net.pterodactylus.fcp.UploadFrom;
+import net.pterodactylus.fcp.Verbosity;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -47,6 +50,7 @@ class ClientPutCommandImpl implements ClientPutCommand {
        private final AtomicReference<InputStream> payload = new AtomicReference<>();
        private final AtomicLong length = new AtomicLong();
        private final AtomicReference<String> targetFilename = new AtomicReference<>();
+       private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
        private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
 
        public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
@@ -56,6 +60,12 @@ class ClientPutCommandImpl implements ClientPutCommand {
        }
 
        @Override
+       public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
+               requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
+               return this;
+       }
+
+       @Override
        public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
                keyGenerateds.add(keyGenerated);
                return this;
@@ -113,6 +123,9 @@ class ClientPutCommandImpl implements ClientPutCommand {
                if (targetFilename.get() != null) {
                        clientPut.setTargetFilename(targetFilename.get());
                }
+               if (!requestProgressConsumers.isEmpty()) {
+                       clientPut.setVerbosity(Verbosity.PROGRESS);
+               }
                return clientPut;
        }
 
@@ -155,6 +168,21 @@ class ClientPutCommandImpl implements ClientPutCommand {
                }
 
                @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       RequestProgress requestProgress = new RequestProgress(
+                               simpleProgress.getTotal(),
+                               simpleProgress.getRequired(),
+                               simpleProgress.getFailed(),
+                               simpleProgress.getFatallyFailed(),
+                               simpleProgress.getLastProgress(),
+                               simpleProgress.getSucceeded(),
+                               simpleProgress.isFinalizedTotal(),
+                               simpleProgress.getMinSuccessFetchBlocks()
+                       );
+                       requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
+               }
+
+               @Override
                protected void consumeURIGenerated(URIGenerated uriGenerated) {
                        for (Consumer<String> keyGenerated : keyGenerateds) {
                                keyGenerated.accept(uriGenerated.getURI());
diff --git a/src/test/java/net/pterodactylus/fcp/RequestProgressMatcher.java b/src/test/java/net/pterodactylus/fcp/RequestProgressMatcher.java
new file mode 100644 (file)
index 0000000..e2adf30
--- /dev/null
@@ -0,0 +1,71 @@
+package net.pterodactylus.fcp;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+/**
+ * Hamcrest matcher for {@link RequestProgress}.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class RequestProgressMatcher {
+
+       public static Matcher<RequestProgress> isRequestProgress(int total, int required, int failed, int fatallyFailed,
+               int succeeded, long lastProgress, boolean finalizedTotal, int minSuccessFetchBlocks) {
+               return new TypeSafeDiagnosingMatcher<RequestProgress>() {
+                       @Override
+                       protected boolean matchesSafely(RequestProgress requestProgress, Description mismatchDescription) {
+                               if (requestProgress.getTotal() != total) {
+                                       mismatchDescription.appendText("total is ").appendValue(requestProgress.getTotal());
+                                       return false;
+                               }
+                               if (requestProgress.getRequired() != required) {
+                                       mismatchDescription.appendText("required is ").appendValue(requestProgress.getRequired());
+                                       return false;
+                               }
+                               if (requestProgress.getFailed() != failed) {
+                                       mismatchDescription.appendText("failed is ").appendValue(requestProgress.getFailed());
+                                       return false;
+                               }
+                               if (requestProgress.getFatallyFailed() != fatallyFailed) {
+                                       mismatchDescription.appendText("fatally failed is ")
+                                               .appendValue(requestProgress.getFatallyFailed());
+                                       return false;
+                               }
+                               if (requestProgress.getSucceeded() != succeeded) {
+                                       mismatchDescription.appendText("succeeded is ").appendValue(requestProgress.getSucceeded());
+                                       return false;
+                               }
+                               if (requestProgress.getLastProgress() != lastProgress) {
+                                       mismatchDescription.appendText("last progress is ").appendValue(requestProgress.getLastProgress());
+                                       return false;
+                               }
+                               if (requestProgress.isFinalizedTotal() != finalizedTotal) {
+                                       mismatchDescription.appendText("finalized total is ")
+                                               .appendValue(requestProgress.isFinalizedTotal());
+                                       return false;
+                               }
+                               if (requestProgress.getMinSuccessFetchBlocks() != minSuccessFetchBlocks) {
+                                       mismatchDescription.appendText("min success fetch blocks is ")
+                                               .appendValue(requestProgress.getMinSuccessFetchBlocks());
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("total ").appendValue(total);
+                               description.appendText(", required ").appendValue(required);
+                               description.appendText(", failed ").appendValue(failed);
+                               description.appendText(", fatally failed ").appendValue(fatallyFailed);
+                               description.appendText(", succeeded ").appendValue(succeeded);
+                               description.appendText(", last progress ").appendValue(lastProgress);
+                               description.appendText(", finalized total ").appendValue(finalizedTotal);
+                               description.appendText(", min success fetch blocks ").appendValue(minSuccessFetchBlocks);
+                       }
+               };
+       }
+
+}
index 771cd25..a4dc5a9 100644 (file)
@@ -1,5 +1,6 @@
 package net.pterodactylus.fcp.quelaton;
 
+import static net.pterodactylus.fcp.RequestProgressMatcher.isRequestProgress;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.contains;
@@ -16,6 +17,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -43,6 +45,7 @@ import net.pterodactylus.fcp.Peer;
 import net.pterodactylus.fcp.PeerNote;
 import net.pterodactylus.fcp.PluginInfo;
 import net.pterodactylus.fcp.Priority;
+import net.pterodactylus.fcp.RequestProgress;
 import net.pterodactylus.fcp.fake.FakeTcpServer;
 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
 
@@ -1542,10 +1545,12 @@ public class DefaultFcpClientTest {
                        );
                }
 
-               private Matcher<List<String>> matchesDirectClientPut() {
+               private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
+                       List<String> lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
+                       Arrays.asList(additionalLines).forEach(lines::add);
                        return allOf(
                                hasHead("ClientPut"),
-                               hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
+                               hasParameters(1, 2, lines.toArray(new String[lines.size()])),
                                hasTail("EndMessage", "Hello")
                        );
                }
@@ -1619,6 +1624,45 @@ public class DefaultFcpClientTest {
                        assertThat(generatedKeys, contains("KSK@foo.txt"));
                }
 
+               @Test
+               public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException {
+                       List<RequestProgress> requestProgress = new ArrayList<>();
+                   Future<Optional<Key>> key = fcpClient.clientPut()
+                               .onProgress(requestProgress::add)
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
+                       replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
+                       replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
+                       replyWithPutSuccessful(identifier);
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                       assertThat(requestProgress, contains(
+                               isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
+                               isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
+                       ));
+               }
+
+               private void replyWithSimpleProgress(
+                       int total, int required, int failed, int fatallyFailed, int succeeded, int lastProgress,
+                       boolean finalizedTotal, int minSuccessFetchBlocks) throws IOException {
+                       fcpServer.writeLine(
+                               "SimpleProgress",
+                               "Identifier=" + identifier,
+                               "Total=" + total,
+                               "Required=" + required,
+                               "Failed=" + failed,
+                               "FatallyFailed=" + fatallyFailed,
+                               "Succeeded=" + succeeded,
+                               "LastProgress=" + lastProgress,
+                               "FinalizedTotal=" + finalizedTotal,
+                               "MinSuccessFetchBlocks=" + minSuccessFetchBlocks,
+                               "EndMessage"
+                       );
+               }
+
        }
 
        public class ClientPutDiskDir {