X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=c37b40aa13d605707d86ae0492018bd76dc936f0;hb=b1cb058f5ab875ad39b1d8d506cb6019d28118b7;hp=9cb098155e33c0c68bf4f1f619b3775d0ef9a8e6;hpb=aa659bcdaa77efb1e902a279a3505964d26e09ff;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 9cb0981..c37b40a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,6 +1,9 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -8,14 +11,22 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.ClientGet; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; import net.pterodactylus.fcp.GenerateSSK; +import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.ReturnType; import net.pterodactylus.fcp.SSKKeypair; +import com.google.common.io.ByteStreams; + /** * Default {@link FcpClient} implementation. * @@ -109,4 +120,178 @@ public class DefaultFcpClient implements FcpClient { } + @Override + public ClientGetCommand clientGet() { + return new ClientGetCommandImpl(); + } + + private class ClientGetCommandImpl implements ClientGetCommand { + + private String identifier; + private boolean ignoreDataStore; + private boolean dataStoreOnly; + private Long maxSize; + private Priority priority; + private boolean realTime; + private boolean global; + + @Override + public ClientGetCommand identifier(String identifier) { + this.identifier = identifier; + return this; + } + + @Override + public ClientGetCommand ignoreDataStore() { + ignoreDataStore = true; + return this; + } + + @Override + public ClientGetCommand dataStoreOnly() { + dataStoreOnly = true; + return this; + } + + @Override + public ClientGetCommand maxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } + + @Override + public ClientGetCommand priority(Priority priority) { + this.priority = priority; + return this; + } + + @Override + public ClientGetCommand realTime() { + realTime = true; + return this; + } + + @Override + public ClientGetCommand global() { + global = true; + return this; + } + + @Override + public Future> uri(String uri) { + return threadPool.submit(new Callable>() { + @Override + public Optional call() throws Exception { + DefaultFcpClient.this.connect(); + ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); + if (ignoreDataStore) { + clientGet.setIgnoreDataStore(true); + } + if (dataStoreOnly) { + clientGet.setDataStoreOnly(true); + } + if (maxSize != null) { + clientGet.setMaxSize(maxSize); + } + if (priority != null) { + clientGet.setPriority(priority); + } + if (realTime) { + clientGet.setRealTimeFlag(true); + } + if (global) { + clientGet.setGlobal(true); + } + try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) { + Sequence sequence = new Sequence(identifier); + replySequence.handle(AllData.class).with(sequence::allData); + replySequence.handle(GetFailed.class).with(sequence::getFailed); + replySequence.handleClose().with(sequence::disconnect); + replySequence.waitFor(sequence::isFinished); + replySequence.send(clientGet).get(); + return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty(); + } + } + }); + } + + private class Sequence { + + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + + private final String identifier; + + private String contentType; + private long dataLength; + private InputStream payload; + + private Sequence(String identifier) { + this.identifier = identifier; + } + + public boolean isFinished() { + return finished.get() || failed.get(); + } + + public boolean isSuccessful() { + return !failed.get(); + } + + public Data getData() { + return new Data() { + @Override + public String getMimeType() { + synchronized (Sequence.this) { + return contentType; + } + } + + @Override + public long size() { + synchronized (Sequence.this) { + return dataLength; + } + } + + @Override + public InputStream getInputStream() { + synchronized (Sequence.this) { + return payload; + } + } + }; + } + + public void allData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } + } + } + + public void getFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { + failed.set(true); + } + } + + public void disconnect(Throwable t) { + failed.set(true); + } + + } + + } + } +