X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FClientGetCommandImpl.java;h=575d94b353b885b8929e1671d5a907f8bc214d28;hb=dbd390e996a8b0d94d5e57906b996a95f4a22f4e;hp=b4e391ad7c6f9ed8830f9db6b7de93752f890c93;hpb=253654bb636b35c05ad7a5ca33dfb3d5d7dda969;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java index b4e391a..575d94b 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java @@ -4,16 +4,21 @@ import java.io.IOException; import java.io.InputStream; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import net.pterodactylus.fcp.AllData; import net.pterodactylus.fcp.ClientGet; +import net.pterodactylus.fcp.FcpMessage; import net.pterodactylus.fcp.FcpUtils.TempInputStream; import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.ReturnType; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * Implementation of the {@link ClientGetCommand}. * @@ -21,10 +26,9 @@ import net.pterodactylus.fcp.ReturnType; */ class ClientGetCommandImpl implements ClientGetCommand { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final ConnectionSupplier connectionSupplier; - private String identifier; private boolean ignoreDataStore; private boolean dataStoreOnly; private Long maxSize; @@ -33,17 +37,11 @@ class ClientGetCommandImpl implements ClientGetCommand { private boolean global; public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { - this.threadPool = threadPool; + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.connectionSupplier = connectionSupplier; } @Override - public ClientGetCommand identifier(String identifier) { - this.identifier = identifier; - return this; - } - - @Override public ClientGetCommand ignoreDataStore() { ignoreDataStore = true; return this; @@ -80,12 +78,13 @@ class ClientGetCommandImpl implements ClientGetCommand { } @Override - public Future> uri(String uri) { + public ListenableFuture> uri(String uri) { ClientGet clientGet = createClientGetCommand(uri); return threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get()); } private ClientGet createClientGetCommand(String uri) { + String identifier = new RandomIdentifierGenerator().generate(); ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); if (ignoreDataStore) { clientGet.setIgnoreDataStore(true); @@ -110,11 +109,10 @@ class ClientGetCommandImpl implements ClientGetCommand { private class ClientGetReplySequence extends FcpReplySequence> { + private final AtomicReference identifier = new AtomicReference<>(); private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicBoolean failed = new AtomicBoolean(); - private final String identifier = ClientGetCommandImpl.this.identifier; - private String contentType; private long dataLength; private InputStream payload; @@ -150,7 +148,7 @@ class ClientGetCommandImpl implements ClientGetCommand { @Override protected void consumeAllData(AllData allData) { - if (allData.getIdentifier().equals(identifier)) { + if (allData.getIdentifier().equals(identifier.get())) { synchronized (this) { contentType = allData.getContentType(); dataLength = allData.getDataLength(); @@ -167,7 +165,7 @@ class ClientGetCommandImpl implements ClientGetCommand { @Override protected void consumeGetFailed(GetFailed getFailed) { - if (getFailed.getIdentifier().equals(identifier)) { + if (getFailed.getIdentifier().equals(identifier.get())) { failed.set(true); } } @@ -177,6 +175,12 @@ class ClientGetCommandImpl implements ClientGetCommand { failed.set(true); } + @Override + public ListenableFuture> send(FcpMessage fcpMessage) throws IOException { + identifier.set(fcpMessage.getField("Identifier")); + return super.send(fcpMessage); + } + } }