1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 import java.util.concurrent.atomic.AtomicReference;
10 import net.pterodactylus.fcp.AllData;
11 import net.pterodactylus.fcp.ClientGet;
12 import net.pterodactylus.fcp.FcpMessage;
13 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
14 import net.pterodactylus.fcp.GetFailed;
15 import net.pterodactylus.fcp.Priority;
16 import net.pterodactylus.fcp.ReturnType;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
23 * Implementation of the {@link ClientGetCommand}.
25 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
27 class ClientGetCommandImpl implements ClientGetCommand {
29 private final ListeningExecutorService threadPool;
30 private final ConnectionSupplier connectionSupplier;
32 private boolean ignoreDataStore;
33 private boolean dataStoreOnly;
35 private Priority priority;
36 private boolean realTime;
37 private boolean global;
39 public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41 this.connectionSupplier = connectionSupplier;
45 public ClientGetCommand ignoreDataStore() {
46 ignoreDataStore = true;
51 public ClientGetCommand dataStoreOnly() {
57 public ClientGetCommand maxSize(long maxSize) {
58 this.maxSize = maxSize;
63 public ClientGetCommand priority(Priority priority) {
64 this.priority = priority;
69 public ClientGetCommand realTime() {
75 public ClientGetCommand global() {
81 public ListenableFuture<Optional<Data>> uri(String uri) {
82 ClientGet clientGet = createClientGetCommand(uri);
83 return threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get());
86 private ClientGet createClientGetCommand(String uri) {
87 String identifier = new RandomIdentifierGenerator().generate();
88 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
89 if (ignoreDataStore) {
90 clientGet.setIgnoreDataStore(true);
93 clientGet.setDataStoreOnly(true);
95 if (maxSize != null) {
96 clientGet.setMaxSize(maxSize);
98 if (priority != null) {
99 clientGet.setPriority(priority);
102 clientGet.setRealTimeFlag(true);
105 clientGet.setGlobal(true);
110 private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
112 private final AtomicReference<String> identifier = new AtomicReference<>();
113 private final AtomicBoolean finished = new AtomicBoolean();
114 private final AtomicBoolean failed = new AtomicBoolean();
116 private String contentType;
117 private long dataLength;
118 private InputStream payload;
120 public ClientGetReplySequence() throws IOException {
121 super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
125 protected boolean isFinished() {
126 return finished.get() || failed.get();
130 protected Optional<Data> getResult() {
131 return failed.get() ? Optional.empty() : Optional.of(new Data() {
133 public String getMimeType() {
143 public InputStream getInputStream() {
150 protected void consumeAllData(AllData allData) {
151 if (allData.getIdentifier().equals(identifier.get())) {
152 synchronized (this) {
153 contentType = allData.getContentType();
154 dataLength = allData.getDataLength();
156 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
158 } catch (IOException e) {
167 protected void consumeGetFailed(GetFailed getFailed) {
168 if (getFailed.getIdentifier().equals(identifier.get())) {
174 protected void consumeConnectionClosed(Throwable throwable) {
179 public ListenableFuture<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
180 identifier.set(fcpMessage.getField("Identifier"));
181 return super.send(fcpMessage);