import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import net.pterodactylus.fcp.ProtocolError;
import net.pterodactylus.fcp.PutFailed;
import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.RequestProgress;
+import net.pterodactylus.fcp.SimpleProgress;
import net.pterodactylus.fcp.TestDDAComplete;
import net.pterodactylus.fcp.TestDDAReply;
import net.pterodactylus.fcp.TestDDARequest;
import net.pterodactylus.fcp.TestDDAResponse;
import net.pterodactylus.fcp.URIGenerated;
import net.pterodactylus.fcp.UploadFrom;
+import net.pterodactylus.fcp.Verbosity;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
private final AtomicReference<InputStream> payload = new AtomicReference<>();
private final AtomicLong length = new AtomicLong();
private final AtomicReference<String> targetFilename = new AtomicReference<>();
+ private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
}
@Override
+ public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
+ requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
+ return this;
+ }
+
+ @Override
public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
keyGenerateds.add(keyGenerated);
return this;
if (targetFilename.get() != null) {
clientPut.setTargetFilename(targetFilename.get());
}
+ if (!requestProgressConsumers.isEmpty()) {
+ clientPut.setVerbosity(Verbosity.PROGRESS);
+ }
return clientPut;
}
private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
private final AtomicReference<String> directory = new AtomicReference<>();
- private final AtomicReference<Key> finalKey = new AtomicReference<>();
- private final AtomicBoolean putFinished = new AtomicBoolean();
public ClientPutDialog() throws IOException {
- super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
- }
-
- @Override
- protected boolean isFinished() {
- return putFinished.get();
- }
-
- @Override
- protected Optional<Key> getResult() {
- return Optional.ofNullable(finalKey.get());
+ super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get(), Optional.<Key>empty());
}
@Override
}
@Override
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+ RequestProgress requestProgress = new RequestProgress(
+ simpleProgress.getTotal(),
+ simpleProgress.getRequired(),
+ simpleProgress.getFailed(),
+ simpleProgress.getFatallyFailed(),
+ simpleProgress.getLastProgress(),
+ simpleProgress.getSucceeded(),
+ simpleProgress.isFinalizedTotal(),
+ simpleProgress.getMinSuccessFetchBlocks()
+ );
+ requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
+ }
+
+ @Override
protected void consumeURIGenerated(URIGenerated uriGenerated) {
for (Consumer<String> keyGenerated : keyGenerateds) {
keyGenerated.accept(uriGenerated.getURI());
@Override
protected void consumePutSuccessful(PutSuccessful putSuccessful) {
- finalKey.set(new Key(putSuccessful.getURI()));
- putFinished.set(true);
+ setResult(Optional.of(new Key(putSuccessful.getURI())));
}
@Override
protected void consumePutFailed(PutFailed putFailed) {
- putFinished.set(true);
+ finish();
}
@Override
setIdentifier(directory.get());
sendMessage(new TestDDARequest(directory.get(), true, false));
} else {
- putFinished.set(true);
+ finish();
}
}