1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Future;
8 import java.util.concurrent.atomic.AtomicBoolean;
9 import java.util.concurrent.atomic.AtomicReference;
11 import net.pterodactylus.fcp.AllData;
12 import net.pterodactylus.fcp.ClientGet;
13 import net.pterodactylus.fcp.FcpMessage;
14 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
15 import net.pterodactylus.fcp.GetFailed;
16 import net.pterodactylus.fcp.Priority;
17 import net.pterodactylus.fcp.ReturnType;
20 * Implementation of the {@link ClientGetCommand}.
22 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
24 class ClientGetCommandImpl implements ClientGetCommand {
26 private final ExecutorService threadPool;
27 private final ConnectionSupplier connectionSupplier;
29 private boolean ignoreDataStore;
30 private boolean dataStoreOnly;
32 private Priority priority;
33 private boolean realTime;
34 private boolean global;
36 public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
37 this.threadPool = threadPool;
38 this.connectionSupplier = connectionSupplier;
42 public ClientGetCommand ignoreDataStore() {
43 ignoreDataStore = true;
48 public ClientGetCommand dataStoreOnly() {
54 public ClientGetCommand maxSize(long maxSize) {
55 this.maxSize = maxSize;
60 public ClientGetCommand priority(Priority priority) {
61 this.priority = priority;
66 public ClientGetCommand realTime() {
72 public ClientGetCommand global() {
78 public Future<Optional<Data>> uri(String uri) {
79 ClientGet clientGet = createClientGetCommand(uri);
80 return threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get());
83 private ClientGet createClientGetCommand(String uri) {
84 String identifier = new RandomIdentifierGenerator().generate();
85 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
86 if (ignoreDataStore) {
87 clientGet.setIgnoreDataStore(true);
90 clientGet.setDataStoreOnly(true);
92 if (maxSize != null) {
93 clientGet.setMaxSize(maxSize);
95 if (priority != null) {
96 clientGet.setPriority(priority);
99 clientGet.setRealTimeFlag(true);
102 clientGet.setGlobal(true);
107 private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
109 private final AtomicReference<String> identifier = new AtomicReference<>();
110 private final AtomicBoolean finished = new AtomicBoolean();
111 private final AtomicBoolean failed = new AtomicBoolean();
113 private String contentType;
114 private long dataLength;
115 private InputStream payload;
117 public ClientGetReplySequence() throws IOException {
118 super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
122 protected boolean isFinished() {
123 return finished.get() || failed.get();
127 protected Optional<Data> getResult() {
128 return failed.get() ? Optional.empty() : Optional.of(new Data() {
130 public String getMimeType() {
140 public InputStream getInputStream() {
147 protected void consumeAllData(AllData allData) {
148 if (allData.getIdentifier().equals(identifier.get())) {
149 synchronized (this) {
150 contentType = allData.getContentType();
151 dataLength = allData.getDataLength();
153 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
155 } catch (IOException e) {
164 protected void consumeGetFailed(GetFailed getFailed) {
165 if (getFailed.getIdentifier().equals(identifier.get())) {
171 protected void consumeConnectionClosed(Throwable throwable) {
176 public Future<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
177 identifier.set(fcpMessage.getField("Identifier"));
178 return super.send(fcpMessage);