1 package net.pterodactylus.fcp.quelaton;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
8 import java.util.Objects;
9 import java.util.Optional;
10 import java.util.concurrent.CopyOnWriteArrayList;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicLong;
14 import java.util.concurrent.atomic.AtomicReference;
15 import java.util.function.Consumer;
16 import java.util.function.Supplier;
18 import net.pterodactylus.fcp.ClientPut;
19 import net.pterodactylus.fcp.FcpMessage;
20 import net.pterodactylus.fcp.Key;
21 import net.pterodactylus.fcp.ProtocolError;
22 import net.pterodactylus.fcp.PutFailed;
23 import net.pterodactylus.fcp.PutSuccessful;
24 import net.pterodactylus.fcp.RequestProgress;
25 import net.pterodactylus.fcp.SimpleProgress;
26 import net.pterodactylus.fcp.TestDDAComplete;
27 import net.pterodactylus.fcp.TestDDAReply;
28 import net.pterodactylus.fcp.TestDDARequest;
29 import net.pterodactylus.fcp.TestDDAResponse;
30 import net.pterodactylus.fcp.URIGenerated;
31 import net.pterodactylus.fcp.UploadFrom;
32 import net.pterodactylus.fcp.Verbosity;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.ListeningExecutorService;
36 import com.google.common.util.concurrent.MoreExecutors;
39 * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
41 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
43 class ClientPutCommandImpl implements ClientPutCommand {
45 private final ListeningExecutorService threadPool;
46 private final ConnectionSupplier connectionSupplier;
47 private final Supplier<String> identifierGenerator;
48 private final AtomicReference<String> redirectUri = new AtomicReference<>();
49 private final AtomicReference<File> file = new AtomicReference<>();
50 private final AtomicReference<InputStream> payload = new AtomicReference<>();
51 private final AtomicLong length = new AtomicLong();
52 private final AtomicReference<String> targetFilename = new AtomicReference<>();
53 private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
54 private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
56 public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
57 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
58 this.connectionSupplier = connectionSupplier;
59 this.identifierGenerator = identifierGenerator;
63 public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
64 requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
69 public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
70 keyGenerateds.add(keyGenerated);
75 public ClientPutCommand named(String targetFilename) {
76 this.targetFilename.set(targetFilename);
81 public WithUri redirectTo(String uri) {
82 this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
87 public WithUri from(File file) {
88 this.file.set(Objects.requireNonNull(file, "file must not be null"));
93 public WithLength from(InputStream inputStream) {
94 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
98 private WithUri length(long length) {
99 this.length.set(length);
103 private Executable<Optional<Key>> key(String uri) {
104 return () -> threadPool.submit(() -> execute(uri));
107 private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
108 ClientPut clientPut = createClientPutCommand(uri, identifierGenerator.get());
109 try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
110 return clientPutDialog.send(clientPut).get();
114 private ClientPut createClientPutCommand(String uri, String identifier) {
116 if (file.get() != null) {
117 clientPut = createClientPutFromDisk(uri, identifier, file.get());
118 } else if (redirectUri.get() != null) {
119 clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
121 clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
123 if (targetFilename.get() != null) {
124 clientPut.setTargetFilename(targetFilename.get());
126 if (!requestProgressConsumers.isEmpty()) {
127 clientPut.setVerbosity(Verbosity.PROGRESS);
132 private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
133 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
134 clientPut.setFilename(file.getAbsolutePath());
138 private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
139 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
140 clientPut.setTargetURI(redirectUri);
144 private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
145 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
146 clientPut.setDataLength(length);
147 clientPut.setPayloadInputStream(payload);
151 private class ClientPutDialog extends FcpDialog<Optional<Key>> {
153 private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
154 private final AtomicReference<String> directory = new AtomicReference<>();
156 public ClientPutDialog() throws IOException {
157 super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get(), Optional.<Key>empty());
161 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
162 originalClientPut.set(fcpMessage);
163 String filename = fcpMessage.getField("Filename");
164 if (filename != null) {
165 directory.set(new File(filename).getParent());
167 return super.send(fcpMessage);
171 protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
172 RequestProgress requestProgress = new RequestProgress(
173 simpleProgress.getTotal(),
174 simpleProgress.getRequired(),
175 simpleProgress.getFailed(),
176 simpleProgress.getFatallyFailed(),
177 simpleProgress.getLastProgress(),
178 simpleProgress.getSucceeded(),
179 simpleProgress.isFinalizedTotal(),
180 simpleProgress.getMinSuccessFetchBlocks()
182 requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
186 protected void consumeURIGenerated(URIGenerated uriGenerated) {
187 for (Consumer<String> keyGenerated : keyGenerateds) {
188 keyGenerated.accept(uriGenerated.getURI());
193 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
194 setResult(Optional.of(new Key(putSuccessful.getURI())));
198 protected void consumePutFailed(PutFailed putFailed) {
203 protected void consumeProtocolError(ProtocolError protocolError) {
204 if (protocolError.getCode() == 25) {
205 setIdentifier(directory.get());
206 sendMessage(new TestDDARequest(directory.get(), true, false));
213 protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
215 String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
216 sendMessage(new TestDDAResponse(directory.get(), readContent));
217 } catch (IOException e) {
218 sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
223 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
224 setIdentifier(originalClientPut.get().getField("Identifier"));
225 sendMessage(originalClientPut.get());