1 package net.pterodactylus.fcp.quelaton;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.util.Objects;
7 import java.util.Optional;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.util.concurrent.atomic.AtomicReference;
13 import net.pterodactylus.fcp.ClientPut;
14 import net.pterodactylus.fcp.FcpMessage;
15 import net.pterodactylus.fcp.Key;
16 import net.pterodactylus.fcp.PutFailed;
17 import net.pterodactylus.fcp.PutSuccessful;
18 import net.pterodactylus.fcp.UploadFrom;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
25 * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
27 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
29 class ClientPutCommandImpl implements ClientPutCommand {
31 private final ListeningExecutorService threadPool;
32 private final ConnectionSupplier connectionSupplier;
33 private final AtomicReference<String> redirectUri = new AtomicReference<>();
34 private final AtomicReference<File> file = new AtomicReference<>();
35 private final AtomicReference<InputStream> payload = new AtomicReference<>();
36 private final AtomicLong length = new AtomicLong();
37 private final AtomicReference<String> targetFilename = new AtomicReference<>();
39 public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41 this.connectionSupplier = connectionSupplier;
45 public ClientPutCommand named(String targetFilename) {
46 this.targetFilename.set(targetFilename);
51 public Keyed<Optional<Key>> redirectTo(Key key) {
52 this.redirectUri.set(Objects.requireNonNull(key, "key must not be null").getKey());
57 public Keyed<Optional<Key>> from(File file) {
58 this.file.set(Objects.requireNonNull(file, "file must not be null"));
63 public Lengthed<Keyed<Optional<Key>>> from(InputStream inputStream) {
64 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
68 private Keyed<Optional<Key>> length(long length) {
69 this.length.set(length);
73 private ListenableFuture<Optional<Key>> key(Key key) {
74 String identifier = new RandomIdentifierGenerator().generate();
75 ClientPut clientPut = createClientPutCommand(key.getKey(), identifier);
76 return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get());
79 private ClientPut createClientPutCommand(String uri, String identifier) {
81 if (file.get() != null) {
82 clientPut = createClientPutFromDisk(uri, identifier, file.get());
83 } else if (redirectUri.get() != null) {
84 clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
86 clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
88 if (targetFilename.get() != null) {
89 clientPut.setTargetFilename(targetFilename.get());
94 private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
95 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
96 clientPut.setFilename(file.getAbsolutePath());
100 private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
101 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
102 clientPut.setTargetURI(redirectUri);
106 private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
107 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
108 clientPut.setDataLength(length);
109 clientPut.setPayloadInputStream(payload);
113 private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
115 private final AtomicReference<String> identifier = new AtomicReference<>();
116 private final AtomicReference<Key> finalKey = new AtomicReference<>();
117 private final AtomicBoolean putFinished = new AtomicBoolean();
119 public ClientPutReplySequence() throws IOException {
120 super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
124 protected boolean isFinished() {
125 return putFinished.get();
129 protected Optional<Key> getResult() {
130 return Optional.ofNullable(finalKey.get());
134 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
135 identifier.set(fcpMessage.getField("Identifier"));
136 return super.send(fcpMessage);
140 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
141 if (putSuccessful.getIdentifier().equals(identifier.get())) {
142 finalKey.set(new Key(putSuccessful.getURI()));
143 putFinished.set(true);
148 protected void consumePutFailed(PutFailed putFailed) {
149 if (putFailed.getIdentifier().equals(identifier.get())) {
150 putFinished.set(true);
155 protected void consumeConnectionClosed(Throwable throwable) {
156 putFinished.set(true);