1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
12 import net.pterodactylus.fcp.AllData;
13 import net.pterodactylus.fcp.ClientGet;
14 import net.pterodactylus.fcp.FcpMessage;
15 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
16 import net.pterodactylus.fcp.GetFailed;
17 import net.pterodactylus.fcp.Priority;
18 import net.pterodactylus.fcp.ReturnType;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
25 * Implementation of the {@link ClientGetCommand}.
27 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
29 class ClientGetCommandImpl implements ClientGetCommand {
31 private final ListeningExecutorService threadPool;
32 private final ConnectionSupplier connectionSupplier;
34 private boolean ignoreDataStore;
35 private boolean dataStoreOnly;
37 private Priority priority;
38 private boolean realTime;
39 private boolean global;
41 public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
42 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
43 this.connectionSupplier = connectionSupplier;
47 public ClientGetCommand ignoreDataStore() {
48 ignoreDataStore = true;
53 public ClientGetCommand dataStoreOnly() {
59 public ClientGetCommand maxSize(long maxSize) {
60 this.maxSize = maxSize;
65 public ClientGetCommand priority(Priority priority) {
66 this.priority = priority;
71 public ClientGetCommand realTime() {
77 public ClientGetCommand global() {
83 public Executable<Optional<Data>> uri(String uri) {
84 return () -> threadPool.submit(() -> execute(uri));
87 private Optional<Data> execute(String uri) throws InterruptedException, ExecutionException, IOException {
88 ClientGet clientGet = createClientGetCommand(uri);
89 try (ClientGetReplySequence clientGetReplySequence = new ClientGetReplySequence()) {
90 return clientGetReplySequence.send(clientGet).get();
94 private ClientGet createClientGetCommand(String uri) {
95 String identifier = new RandomIdentifierGenerator().generate();
96 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
97 if (ignoreDataStore) {
98 clientGet.setIgnoreDataStore(true);
101 clientGet.setDataStoreOnly(true);
103 if (maxSize != null) {
104 clientGet.setMaxSize(maxSize);
106 if (priority != null) {
107 clientGet.setPriority(priority);
110 clientGet.setRealTimeFlag(true);
113 clientGet.setGlobal(true);
118 private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
120 private final AtomicBoolean finished = new AtomicBoolean();
121 private final AtomicBoolean failed = new AtomicBoolean();
123 private String contentType;
124 private long dataLength;
125 private InputStream payload;
127 public ClientGetReplySequence() throws IOException {
128 super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
132 protected boolean isFinished() {
133 return finished.get() || failed.get();
137 protected Optional<Data> getResult() {
138 return failed.get() ? Optional.empty() : Optional.of(new Data() {
140 public String getMimeType() {
150 public InputStream getInputStream() {
157 protected void consumeAllData(AllData allData) {
158 synchronized (this) {
159 contentType = allData.getContentType();
160 dataLength = allData.getDataLength();
162 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
164 } catch (IOException e) {
172 protected void consumeGetFailed(GetFailed getFailed) {