X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FClientGetCommandImpl.java;h=0c93fc32772a6112d7f12540ad6bda89fa946c19;hb=9f8674f7ad4b179b3a2cac9a24f1dc6152548bc1;hp=b08ff4e6846bcc8cd8af8c0ec1141bc1e69a5814;hpb=9970ce4a2868da05e778ec998afb0a899a133959;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java index b08ff4e..0c93fc3 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java @@ -3,8 +3,8 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.io.InputStream; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import net.pterodactylus.fcp.AllData; @@ -14,6 +14,9 @@ import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.ReturnType; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * Implementation of the {@link ClientGetCommand}. * @@ -21,10 +24,9 @@ import net.pterodactylus.fcp.ReturnType; */ class ClientGetCommandImpl implements ClientGetCommand { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final ConnectionSupplier connectionSupplier; - private String identifier; private boolean ignoreDataStore; private boolean dataStoreOnly; private Long maxSize; @@ -33,17 +35,11 @@ class ClientGetCommandImpl implements ClientGetCommand { private boolean global; public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { - this.threadPool = threadPool; + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.connectionSupplier = connectionSupplier; } @Override - public ClientGetCommand identifier(String identifier) { - this.identifier = identifier; - return this; - } - - @Override public ClientGetCommand ignoreDataStore() { ignoreDataStore = true; return this; @@ -80,7 +76,19 @@ class ClientGetCommandImpl implements ClientGetCommand { } @Override - public Future> uri(String uri) { + public Executable> uri(String uri) { + return () -> threadPool.submit(() -> execute(uri)); + } + + private Optional execute(String uri) throws InterruptedException, ExecutionException, IOException { + ClientGet clientGet = createClientGetCommand(uri); + try (ClientGetDialog clientGetDialog = new ClientGetDialog()) { + return clientGetDialog.send(clientGet).get(); + } + } + + private ClientGet createClientGetCommand(String uri) { + String identifier = new RandomIdentifierGenerator().generate(); ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); if (ignoreDataStore) { clientGet.setIgnoreDataStore(true); @@ -100,25 +108,19 @@ class ClientGetCommandImpl implements ClientGetCommand { if (global) { clientGet.setGlobal(true); } - return threadPool.submit(() -> { - FcpReplySequence> replySequence = - new ClientGetReplySequence(); - return replySequence.send(clientGet).get(); - }); + return clientGet; } - private class ClientGetReplySequence extends FcpReplySequence> { + private class ClientGetDialog extends FcpDialog> { private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicBoolean failed = new AtomicBoolean(); - private final String identifier = ClientGetCommandImpl.this.identifier; - private String contentType; private long dataLength; private InputStream payload; - public ClientGetReplySequence() throws IOException { + public ClientGetDialog() throws IOException { super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get()); } @@ -149,30 +151,21 @@ class ClientGetCommandImpl implements ClientGetCommand { @Override protected void consumeAllData(AllData allData) { - if (allData.getIdentifier().equals(identifier)) { - synchronized (this) { - contentType = allData.getContentType(); - dataLength = allData.getDataLength(); - try { - payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); - finished.set(true); - } catch (IOException e) { - // TODO – logging - failed.set(true); - } + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); } } } @Override protected void consumeGetFailed(GetFailed getFailed) { - if (getFailed.getIdentifier().equals(identifier)) { - failed.set(true); - } - } - - @Override - protected void consumeConnectionClosed(Throwable throwable) { failed.set(true); }