Add progress consumer to ClientPut command
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutCommandImpl.java
index df345f3..8dbaf0f 100644 (file)
@@ -4,13 +4,16 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.ClientPut;
 import net.pterodactylus.fcp.FcpMessage;
@@ -18,18 +21,22 @@ import net.pterodactylus.fcp.Key;
 import net.pterodactylus.fcp.ProtocolError;
 import net.pterodactylus.fcp.PutFailed;
 import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.RequestProgress;
+import net.pterodactylus.fcp.SimpleProgress;
 import net.pterodactylus.fcp.TestDDAComplete;
 import net.pterodactylus.fcp.TestDDAReply;
 import net.pterodactylus.fcp.TestDDARequest;
 import net.pterodactylus.fcp.TestDDAResponse;
+import net.pterodactylus.fcp.URIGenerated;
 import net.pterodactylus.fcp.UploadFrom;
+import net.pterodactylus.fcp.Verbosity;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
+ * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -37,15 +44,31 @@ class ClientPutCommandImpl implements ClientPutCommand {
 
        private final ListeningExecutorService threadPool;
        private final ConnectionSupplier connectionSupplier;
+       private final Supplier<String> identifierGenerator;
        private final AtomicReference<String> redirectUri = new AtomicReference<>();
        private final AtomicReference<File> file = new AtomicReference<>();
        private final AtomicReference<InputStream> payload = new AtomicReference<>();
        private final AtomicLong length = new AtomicLong();
        private final AtomicReference<String> targetFilename = new AtomicReference<>();
+       private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
+       private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
 
-       public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+       public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
                this.threadPool = MoreExecutors.listeningDecorator(threadPool);
                this.connectionSupplier = connectionSupplier;
+               this.identifierGenerator = identifierGenerator;
+       }
+
+       @Override
+       public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
+               requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
+               return this;
+       }
+
+       @Override
+       public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
+               keyGenerateds.add(keyGenerated);
+               return this;
        }
 
        @Override
@@ -55,24 +78,24 @@ class ClientPutCommandImpl implements ClientPutCommand {
        }
 
        @Override
-       public WithUri<Executable<Optional<Key>>> redirectTo(String uri) {
+       public WithUri redirectTo(String uri) {
                this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
                return this::key;
        }
 
        @Override
-       public WithUri<Executable<Optional<Key>>> from(File file) {
+       public WithUri from(File file) {
                this.file.set(Objects.requireNonNull(file, "file must not be null"));
                return this::key;
        }
 
        @Override
-       public WithLength<WithUri<Executable<Optional<Key>>>> from(InputStream inputStream) {
+       public WithLength from(InputStream inputStream) {
                payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
                return this::length;
        }
 
-       private WithUri<Executable<Optional<Key>>> length(long length) {
+       private WithUri length(long length) {
                this.length.set(length);
                return this::key;
        }
@@ -82,10 +105,9 @@ class ClientPutCommandImpl implements ClientPutCommand {
        }
 
        private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
-               String identifier = new RandomIdentifierGenerator().generate();
-               ClientPut clientPut = createClientPutCommand(uri, identifier);
-               try (ClientPutReplySequence clientPutReplySequence = new ClientPutReplySequence()) {
-                       return clientPutReplySequence.send(clientPut).get();
+               ClientPut clientPut = createClientPutCommand(uri, identifierGenerator.get());
+               try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
+                       return clientPutDialog.send(clientPut).get();
                }
        }
 
@@ -101,6 +123,9 @@ class ClientPutCommandImpl implements ClientPutCommand {
                if (targetFilename.get() != null) {
                        clientPut.setTargetFilename(targetFilename.get());
                }
+               if (!requestProgressConsumers.isEmpty()) {
+                       clientPut.setVerbosity(Verbosity.PROGRESS);
+               }
                return clientPut;
        }
 
@@ -123,25 +148,13 @@ class ClientPutCommandImpl implements ClientPutCommand {
                return clientPut;
        }
 
-       private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
+       private class ClientPutDialog extends FcpDialog<Optional<Key>> {
 
                private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
                private final AtomicReference<String> directory = new AtomicReference<>();
-               private final AtomicReference<Key> finalKey = new AtomicReference<>();
-               private final AtomicBoolean putFinished = new AtomicBoolean();
-
-               public ClientPutReplySequence() throws IOException {
-                       super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
-               }
 
-               @Override
-               protected boolean isFinished() {
-                       return putFinished.get();
-               }
-
-               @Override
-               protected Optional<Key> getResult() {
-                       return Optional.ofNullable(finalKey.get());
+               public ClientPutDialog() throws IOException {
+                       super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get(), Optional.<Key>empty());
                }
 
                @Override
@@ -155,14 +168,35 @@ class ClientPutCommandImpl implements ClientPutCommand {
                }
 
                @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       RequestProgress requestProgress = new RequestProgress(
+                               simpleProgress.getTotal(),
+                               simpleProgress.getRequired(),
+                               simpleProgress.getFailed(),
+                               simpleProgress.getFatallyFailed(),
+                               simpleProgress.getLastProgress(),
+                               simpleProgress.getSucceeded(),
+                               simpleProgress.isFinalizedTotal(),
+                               simpleProgress.getMinSuccessFetchBlocks()
+                       );
+                       requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
+               }
+
+               @Override
+               protected void consumeURIGenerated(URIGenerated uriGenerated) {
+                       for (Consumer<String> keyGenerated : keyGenerateds) {
+                               keyGenerated.accept(uriGenerated.getURI());
+                       }
+               }
+
+               @Override
                protected void consumePutSuccessful(PutSuccessful putSuccessful) {
-                       finalKey.set(new Key(putSuccessful.getURI()));
-                       putFinished.set(true);
+                       setResult(Optional.of(new Key(putSuccessful.getURI())));
                }
 
                @Override
                protected void consumePutFailed(PutFailed putFailed) {
-                       putFinished.set(true);
+                       finish();
                }
 
                @Override
@@ -171,7 +205,7 @@ class ClientPutCommandImpl implements ClientPutCommand {
                                setIdentifier(directory.get());
                                sendMessage(new TestDDARequest(directory.get(), true, false));
                        } else {
-                               putFinished.set(true);
+                               finish();
                        }
                }