From 9970ce4a2868da05e778ec998afb0a899a133959 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Mon, 6 Jul 2015 06:51:15 +0200 Subject: [PATCH] Move ClientGet implementation to its own class --- .../fcp/quelaton/ClientGetCommandImpl.java | 181 +++++++++++++++++++++ .../fcp/quelaton/DefaultFcpClient.java | 155 +----------------- 2 files changed, 182 insertions(+), 154 deletions(-) create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java new file mode 100644 index 0000000..b08ff4e --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientGetCommandImpl.java @@ -0,0 +1,181 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.ClientGet; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; +import net.pterodactylus.fcp.GetFailed; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.ReturnType; + +/** + * Implementation of the {@link ClientGetCommand}. + * + * @author David ‘Bombe’ Roden + */ +class ClientGetCommandImpl implements ClientGetCommand { + + private final ExecutorService threadPool; + private final ConnectionSupplier connectionSupplier; + + private String identifier; + private boolean ignoreDataStore; + private boolean dataStoreOnly; + private Long maxSize; + private Priority priority; + private boolean realTime; + private boolean global; + + public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { + this.threadPool = threadPool; + this.connectionSupplier = connectionSupplier; + } + + @Override + public ClientGetCommand identifier(String identifier) { + this.identifier = identifier; + return this; + } + + @Override + public ClientGetCommand ignoreDataStore() { + ignoreDataStore = true; + return this; + } + + @Override + public ClientGetCommand dataStoreOnly() { + dataStoreOnly = true; + return this; + } + + @Override + public ClientGetCommand maxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } + + @Override + public ClientGetCommand priority(Priority priority) { + this.priority = priority; + return this; + } + + @Override + public ClientGetCommand realTime() { + realTime = true; + return this; + } + + @Override + public ClientGetCommand global() { + global = true; + return this; + } + + @Override + public Future> uri(String uri) { + ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); + if (ignoreDataStore) { + clientGet.setIgnoreDataStore(true); + } + if (dataStoreOnly) { + clientGet.setDataStoreOnly(true); + } + if (maxSize != null) { + clientGet.setMaxSize(maxSize); + } + if (priority != null) { + clientGet.setPriority(priority); + } + if (realTime) { + clientGet.setRealTimeFlag(true); + } + if (global) { + clientGet.setGlobal(true); + } + return threadPool.submit(() -> { + FcpReplySequence> replySequence = + new ClientGetReplySequence(); + return replySequence.send(clientGet).get(); + }); + } + + private class ClientGetReplySequence extends FcpReplySequence> { + + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + + private final String identifier = ClientGetCommandImpl.this.identifier; + + private String contentType; + private long dataLength; + private InputStream payload; + + public ClientGetReplySequence() throws IOException { + super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get()); + } + + @Override + protected boolean isFinished() { + return finished.get() || failed.get(); + } + + @Override + protected Optional getResult() { + return failed.get() ? Optional.empty() : Optional.of(new Data() { + @Override + public String getMimeType() { + return contentType; + } + + @Override + public long size() { + return dataLength; + } + + @Override + public InputStream getInputStream() { + return payload; + } + }); + } + + @Override + protected void consumeAllData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } + } + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { + failed.set(true); + } + } + + @Override + protected void consumeConnectionClosed(Throwable throwable) { + failed.set(true); + } + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 79c7f44..30dac91 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,25 +1,16 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; -import java.io.InputStream; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import net.pterodactylus.fcp.AllData; -import net.pterodactylus.fcp.ClientGet; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpUtils.TempInputStream; -import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.Priority; -import net.pterodactylus.fcp.ReturnType; /** * Default {@link FcpClient} implementation. @@ -93,151 +84,7 @@ public class DefaultFcpClient implements FcpClient { @Override public ClientGetCommand clientGet() { - return new ClientGetCommandImpl(); - } - - private class ClientGetCommandImpl implements ClientGetCommand { - - private String identifier; - private boolean ignoreDataStore; - private boolean dataStoreOnly; - private Long maxSize; - private Priority priority; - private boolean realTime; - private boolean global; - - @Override - public ClientGetCommand identifier(String identifier) { - this.identifier = identifier; - return this; - } - - @Override - public ClientGetCommand ignoreDataStore() { - ignoreDataStore = true; - return this; - } - - @Override - public ClientGetCommand dataStoreOnly() { - dataStoreOnly = true; - return this; - } - - @Override - public ClientGetCommand maxSize(long maxSize) { - this.maxSize = maxSize; - return this; - } - - @Override - public ClientGetCommand priority(Priority priority) { - this.priority = priority; - return this; - } - - @Override - public ClientGetCommand realTime() { - realTime = true; - return this; - } - - @Override - public ClientGetCommand global() { - global = true; - return this; - } - - @Override - public Future> uri(String uri) { - ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); - if (ignoreDataStore) { - clientGet.setIgnoreDataStore(true); - } - if (dataStoreOnly) { - clientGet.setDataStoreOnly(true); - } - if (maxSize != null) { - clientGet.setMaxSize(maxSize); - } - if (priority != null) { - clientGet.setPriority(priority); - } - if (realTime) { - clientGet.setRealTimeFlag(true); - } - if (global) { - clientGet.setGlobal(true); - } - return threadPool.submit(() -> { - FcpReplySequence> replySequence = new FcpReplySequence>(threadPool, connect()) { - private final AtomicBoolean finished = new AtomicBoolean(); - private final AtomicBoolean failed = new AtomicBoolean(); - - private final String identifier = ClientGetCommandImpl.this.identifier; - - private String contentType; - private long dataLength; - private InputStream payload; - - @Override - protected boolean isFinished() { - return finished.get() || failed.get(); - } - - @Override - protected Optional getResult() { - return failed.get() ? Optional.empty() : Optional.of(new Data() { - @Override - public String getMimeType() { - return contentType; - } - - @Override - public long size() { - return dataLength; - } - - @Override - public InputStream getInputStream() { - return payload; - } - }); - } - - @Override - protected void consumeAllData(AllData allData) { - if (allData.getIdentifier().equals(identifier)) { - synchronized (this) { - contentType = allData.getContentType(); - dataLength = allData.getDataLength(); - try { - payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); - finished.set(true); - } catch (IOException e) { - // TODO – logging - failed.set(true); - } - } - } - } - - @Override - protected void consumeGetFailed(GetFailed getFailed) { - if (getFailed.getIdentifier().equals(identifier)) { - failed.set(true); - } - } - - @Override - protected void consumeConnectionClosed(Throwable throwable) { - failed.set(true); - } - }; - return replySequence.send(clientGet).get(); - }); - } - + return new ClientGetCommandImpl(threadPool, this::connect); } } -- 2.7.4