import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import net.pterodactylus.fcp.ClientPut;
import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.ProtocolError;
import net.pterodactylus.fcp.PutFailed;
import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.RequestProgress;
+import net.pterodactylus.fcp.SimpleProgress;
import net.pterodactylus.fcp.TestDDAComplete;
import net.pterodactylus.fcp.TestDDAReply;
import net.pterodactylus.fcp.TestDDARequest;
import net.pterodactylus.fcp.TestDDAResponse;
+import net.pterodactylus.fcp.URIGenerated;
import net.pterodactylus.fcp.UploadFrom;
+import net.pterodactylus.fcp.Verbosity;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
+ * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
private final ListeningExecutorService threadPool;
private final ConnectionSupplier connectionSupplier;
+ private final Supplier<String> identifierGenerator;
private final AtomicReference<String> redirectUri = new AtomicReference<>();
private final AtomicReference<File> file = new AtomicReference<>();
private final AtomicReference<InputStream> payload = new AtomicReference<>();
private final AtomicLong length = new AtomicLong();
private final AtomicReference<String> targetFilename = new AtomicReference<>();
+ private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
+ private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
- public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+ public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
this.connectionSupplier = connectionSupplier;
+ this.identifierGenerator = identifierGenerator;
+ }
+
+ @Override
+ public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
+ requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
+ return this;
+ }
+
+ @Override
+ public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
+ keyGenerateds.add(keyGenerated);
+ return this;
}
@Override
}
@Override
- public Keyed<Optional<Key>> redirectTo(String uri) {
+ public WithUri redirectTo(String uri) {
this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
return this::key;
}
@Override
- public Keyed<Optional<Key>> from(File file) {
+ public WithUri from(File file) {
this.file.set(Objects.requireNonNull(file, "file must not be null"));
return this::key;
}
@Override
- public Lengthed<Keyed<Optional<Key>>> from(InputStream inputStream) {
+ public WithLength from(InputStream inputStream) {
payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
return this::length;
}
- private Keyed<Optional<Key>> length(long length) {
+ private WithUri length(long length) {
this.length.set(length);
return this::key;
}
- private ListenableFuture<Optional<Key>> key(Key key) {
- String identifier = new RandomIdentifierGenerator().generate();
- ClientPut clientPut = createClientPutCommand(key.getKey(), identifier);
- return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get());
+ private Executable<Optional<Key>> key(String uri) {
+ return () -> threadPool.submit(() -> execute(uri));
+ }
+
+ private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
+ ClientPut clientPut = createClientPutCommand(uri, identifierGenerator.get());
+ try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
+ return clientPutDialog.send(clientPut).get();
+ }
}
private ClientPut createClientPutCommand(String uri, String identifier) {
if (targetFilename.get() != null) {
clientPut.setTargetFilename(targetFilename.get());
}
+ if (!requestProgressConsumers.isEmpty()) {
+ clientPut.setVerbosity(Verbosity.PROGRESS);
+ }
return clientPut;
}
return clientPut;
}
- private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
+ private class ClientPutDialog extends FcpDialog<Optional<Key>> {
private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
- private final AtomicReference<String> identifier = new AtomicReference<>();
private final AtomicReference<String> directory = new AtomicReference<>();
- private final AtomicReference<Key> finalKey = new AtomicReference<>();
- private final AtomicBoolean putFinished = new AtomicBoolean();
- public ClientPutReplySequence() throws IOException {
- super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
- }
-
- @Override
- protected boolean isFinished() {
- return putFinished.get();
- }
-
- @Override
- protected Optional<Key> getResult() {
- return Optional.ofNullable(finalKey.get());
+ public ClientPutDialog() throws IOException {
+ super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get(), Optional.<Key>empty());
}
@Override
public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
originalClientPut.set(fcpMessage);
- identifier.set(fcpMessage.getField("Identifier"));
String filename = fcpMessage.getField("Filename");
if (filename != null) {
directory.set(new File(filename).getParent());
}
@Override
- protected void consumePutSuccessful(PutSuccessful putSuccessful) {
- if (putSuccessful.getIdentifier().equals(identifier.get())) {
- finalKey.set(new Key(putSuccessful.getURI()));
- putFinished.set(true);
+ protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+ RequestProgress requestProgress = new RequestProgress(
+ simpleProgress.getTotal(),
+ simpleProgress.getRequired(),
+ simpleProgress.getFailed(),
+ simpleProgress.getFatallyFailed(),
+ simpleProgress.getLastProgress(),
+ simpleProgress.getSucceeded(),
+ simpleProgress.isFinalizedTotal(),
+ simpleProgress.getMinSuccessFetchBlocks()
+ );
+ requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
+ }
+
+ @Override
+ protected void consumeURIGenerated(URIGenerated uriGenerated) {
+ for (Consumer<String> keyGenerated : keyGenerateds) {
+ keyGenerated.accept(uriGenerated.getURI());
}
}
@Override
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+ setResult(Optional.of(new Key(putSuccessful.getURI())));
+ }
+
+ @Override
protected void consumePutFailed(PutFailed putFailed) {
- if (putFailed.getIdentifier().equals(identifier.get())) {
- putFinished.set(true);
- }
+ finish();
}
@Override
protected void consumeProtocolError(ProtocolError protocolError) {
- if (protocolError.getIdentifier().equals(identifier.get())) {
- if (protocolError.getCode() == 25) {
- sendMessage(new TestDDARequest(directory.get(), true, false));
- } else {
- putFinished.set(true);
- }
+ if (protocolError.getCode() == 25) {
+ setIdentifier(directory.get());
+ sendMessage(new TestDDARequest(directory.get(), true, false));
+ } else {
+ finish();
}
}
@Override
protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
- if (testDDAReply.getDirectory().equals(directory.get())) {
- try {
- String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
- sendMessage(new TestDDAResponse(directory.get(), readContent));
- } catch (IOException e) {
- sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
- }
+ try {
+ String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
+ sendMessage(new TestDDAResponse(directory.get(), readContent));
+ } catch (IOException e) {
+ sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
}
}
@Override
protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
- if (testDDAComplete.getDirectory().equals(directory.get())) {
- sendMessage(originalClientPut.get());
- }
- }
-
- @Override
- protected void consumeConnectionClosed(Throwable throwable) {
- putFinished.set(true);
+ setIdentifier(originalClientPut.get().getField("Identifier"));
+ sendMessage(originalClientPut.get());
}
}