Add ClientPut command implementation
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.util.Objects;
7 import java.util.Optional;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.util.concurrent.atomic.AtomicReference;
12
13 import net.pterodactylus.fcp.ClientPut;
14 import net.pterodactylus.fcp.FcpMessage;
15 import net.pterodactylus.fcp.Key;
16 import net.pterodactylus.fcp.PutFailed;
17 import net.pterodactylus.fcp.PutSuccessful;
18 import net.pterodactylus.fcp.UploadFrom;
19
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23
24 /**
25  * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
26  *
27  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
28  */
29 class ClientPutCommandImpl implements ClientPutCommand {
30
31         private final ListeningExecutorService threadPool;
32         private final ConnectionSupplier connectionSupplier;
33         private final AtomicReference<String> redirectUri = new AtomicReference<>();
34         private final AtomicReference<File> file = new AtomicReference<>();
35         private final AtomicReference<InputStream> payload = new AtomicReference<>();
36         private final AtomicLong length = new AtomicLong();
37         private final AtomicReference<String> targetFilename = new AtomicReference<>();
38
39         public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41                 this.connectionSupplier = connectionSupplier;
42         }
43
44         @Override
45         public ClientPutCommand named(String targetFilename) {
46                 this.targetFilename.set(targetFilename);
47                 return this;
48         }
49
50         @Override
51         public Keyed<Optional<Key>> redirectTo(Key key) {
52                 this.redirectUri.set(Objects.requireNonNull(key, "key must not be null").getKey());
53                 return this::key;
54         }
55
56         @Override
57         public Keyed<Optional<Key>> from(File file) {
58                 this.file.set(Objects.requireNonNull(file, "file must not be null"));
59                 return this::key;
60         }
61
62         @Override
63         public Lengthed<Keyed<Optional<Key>>> from(InputStream inputStream) {
64                 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
65                 return this::length;
66         }
67
68         private Keyed<Optional<Key>> length(long length) {
69                 this.length.set(length);
70                 return this::key;
71         }
72
73         private ListenableFuture<Optional<Key>> key(Key key) {
74                 String identifier = new RandomIdentifierGenerator().generate();
75                 ClientPut clientPut = createClientPutCommand(key.getKey(), identifier);
76                 return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get());
77         }
78
79         private ClientPut createClientPutCommand(String uri, String identifier) {
80                 ClientPut clientPut;
81                 if (file.get() != null) {
82                         clientPut = createClientPutFromDisk(uri, identifier, file.get());
83                 } else if (redirectUri.get() != null) {
84                         clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
85                 } else {
86                         clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
87                 }
88                 if (targetFilename.get() != null) {
89                         clientPut.setTargetFilename(targetFilename.get());
90                 }
91                 return clientPut;
92         }
93
94         private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
95                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
96                 clientPut.setFilename(file.getAbsolutePath());
97                 return clientPut;
98         }
99
100         private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
101                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
102                 clientPut.setTargetURI(redirectUri);
103                 return clientPut;
104         }
105
106         private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
107                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
108                 clientPut.setDataLength(length);
109                 clientPut.setPayloadInputStream(payload);
110                 return clientPut;
111         }
112
113         private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
114
115                 private final AtomicReference<String> identifier = new AtomicReference<>();
116                 private final AtomicReference<Key> finalKey = new AtomicReference<>();
117                 private final AtomicBoolean putFinished = new AtomicBoolean();
118
119                 public ClientPutReplySequence() throws IOException {
120                         super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
121                 }
122
123                 @Override
124                 protected boolean isFinished() {
125                         return putFinished.get();
126                 }
127
128                 @Override
129                 protected Optional<Key> getResult() {
130                         return Optional.ofNullable(finalKey.get());
131                 }
132
133                 @Override
134                 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
135                         identifier.set(fcpMessage.getField("Identifier"));
136                         return super.send(fcpMessage);
137                 }
138
139                 @Override
140                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
141                         if (putSuccessful.getIdentifier().equals(identifier.get())) {
142                                 finalKey.set(new Key(putSuccessful.getURI()));
143                                 putFinished.set(true);
144                         }
145                 }
146
147                 @Override
148                 protected void consumePutFailed(PutFailed putFailed) {
149                         if (putFailed.getIdentifier().equals(identifier.get())) {
150                                 putFinished.set(true);
151                         }
152                 }
153
154                 @Override
155                 protected void consumeConnectionClosed(Throwable throwable) {
156                         putFinished.set(true);
157                 }
158         }
159
160 }