1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 import java.util.concurrent.atomic.AtomicReference;
10 import net.pterodactylus.fcp.AllData;
11 import net.pterodactylus.fcp.ClientGet;
12 import net.pterodactylus.fcp.FcpMessage;
13 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
14 import net.pterodactylus.fcp.GetFailed;
15 import net.pterodactylus.fcp.Priority;
16 import net.pterodactylus.fcp.ReturnType;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
23 * Implementation of the {@link ClientGetCommand}.
25 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
27 class ClientGetCommandImpl implements ClientGetCommand {
29 private final ListeningExecutorService threadPool;
30 private final ConnectionSupplier connectionSupplier;
32 private boolean ignoreDataStore;
33 private boolean dataStoreOnly;
35 private Priority priority;
36 private boolean realTime;
37 private boolean global;
39 public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41 this.connectionSupplier = connectionSupplier;
45 public ClientGetCommand ignoreDataStore() {
46 ignoreDataStore = true;
51 public ClientGetCommand dataStoreOnly() {
57 public ClientGetCommand maxSize(long maxSize) {
58 this.maxSize = maxSize;
63 public ClientGetCommand priority(Priority priority) {
64 this.priority = priority;
69 public ClientGetCommand realTime() {
75 public ClientGetCommand global() {
81 public Executable<Optional<Data>> uri(String uri) {
82 ClientGet clientGet = createClientGetCommand(uri);
83 return () -> threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get());
86 private ClientGet createClientGetCommand(String uri) {
87 String identifier = new RandomIdentifierGenerator().generate();
88 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
89 if (ignoreDataStore) {
90 clientGet.setIgnoreDataStore(true);
93 clientGet.setDataStoreOnly(true);
95 if (maxSize != null) {
96 clientGet.setMaxSize(maxSize);
98 if (priority != null) {
99 clientGet.setPriority(priority);
102 clientGet.setRealTimeFlag(true);
105 clientGet.setGlobal(true);
110 private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
112 private final AtomicBoolean finished = new AtomicBoolean();
113 private final AtomicBoolean failed = new AtomicBoolean();
115 private String contentType;
116 private long dataLength;
117 private InputStream payload;
119 public ClientGetReplySequence() throws IOException {
120 super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
124 protected boolean isFinished() {
125 return finished.get() || failed.get();
129 protected Optional<Data> getResult() {
130 return failed.get() ? Optional.empty() : Optional.of(new Data() {
132 public String getMimeType() {
142 public InputStream getInputStream() {
149 protected void consumeAllData(AllData allData) {
150 synchronized (this) {
151 contentType = allData.getContentType();
152 dataLength = allData.getDataLength();
154 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
156 } catch (IOException e) {
164 protected void consumeGetFailed(GetFailed getFailed) {