X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=c37b40aa13d605707d86ae0492018bd76dc936f0;hb=b1cb058f5ab875ad39b1d8d506cb6019d28118b7;hp=e0c6e12528d2ed0dfdd044994479f7dc260f5fdd;hpb=075f351f114b8a58f82a8e2d2e295b987237d66e;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index e0c6e12..c37b40a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,6 +1,9 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -8,14 +11,22 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.ClientGet; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; import net.pterodactylus.fcp.GenerateSSK; +import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.ReturnType; import net.pterodactylus.fcp.SSKKeypair; +import com.google.common.io.ByteStreams; + /** * Default {@link FcpClient} implementation. * @@ -80,18 +91,207 @@ public class DefaultFcpClient implements FcpClient { public Future execute() { return threadPool.submit(() -> { connect(); - GenerateSSK generateSSK = new GenerateSSK(); - AtomicReference keyPair = new AtomicReference<>(); + Sequence sequence = new Sequence(); FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get()); - replySequence.handle(SSKKeypair.class) - .with((message) -> keyPair.set( - new FcpKeyPair(message.getRequestURI(), message.getInsertURI()))); - replySequence.waitFor(() -> keyPair.get() != null); - replySequence.send(generateSSK).get(); + replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair); + replySequence.waitFor(sequence::isFinished); + replySequence.send(new GenerateSSK()).get(); + return sequence.getKeyPair(); + }); + } + + private class Sequence { + + private AtomicReference keyPair = new AtomicReference<>(); + + public void handleSSKKeypair(SSKKeypair sskKeypair) { + keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI())); + } + + public boolean isFinished() { + return keyPair.get() != null; + } + + public FcpKeyPair getKeyPair() { return keyPair.get(); + } + + } + + } + + @Override + public ClientGetCommand clientGet() { + return new ClientGetCommandImpl(); + } + + private class ClientGetCommandImpl implements ClientGetCommand { + + private String identifier; + private boolean ignoreDataStore; + private boolean dataStoreOnly; + private Long maxSize; + private Priority priority; + private boolean realTime; + private boolean global; + + @Override + public ClientGetCommand identifier(String identifier) { + this.identifier = identifier; + return this; + } + + @Override + public ClientGetCommand ignoreDataStore() { + ignoreDataStore = true; + return this; + } + + @Override + public ClientGetCommand dataStoreOnly() { + dataStoreOnly = true; + return this; + } + + @Override + public ClientGetCommand maxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } + + @Override + public ClientGetCommand priority(Priority priority) { + this.priority = priority; + return this; + } + + @Override + public ClientGetCommand realTime() { + realTime = true; + return this; + } + + @Override + public ClientGetCommand global() { + global = true; + return this; + } + + @Override + public Future> uri(String uri) { + return threadPool.submit(new Callable>() { + @Override + public Optional call() throws Exception { + DefaultFcpClient.this.connect(); + ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); + if (ignoreDataStore) { + clientGet.setIgnoreDataStore(true); + } + if (dataStoreOnly) { + clientGet.setDataStoreOnly(true); + } + if (maxSize != null) { + clientGet.setMaxSize(maxSize); + } + if (priority != null) { + clientGet.setPriority(priority); + } + if (realTime) { + clientGet.setRealTimeFlag(true); + } + if (global) { + clientGet.setGlobal(true); + } + try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) { + Sequence sequence = new Sequence(identifier); + replySequence.handle(AllData.class).with(sequence::allData); + replySequence.handle(GetFailed.class).with(sequence::getFailed); + replySequence.handleClose().with(sequence::disconnect); + replySequence.waitFor(sequence::isFinished); + replySequence.send(clientGet).get(); + return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty(); + } + } }); } + private class Sequence { + + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + + private final String identifier; + + private String contentType; + private long dataLength; + private InputStream payload; + + private Sequence(String identifier) { + this.identifier = identifier; + } + + public boolean isFinished() { + return finished.get() || failed.get(); + } + + public boolean isSuccessful() { + return !failed.get(); + } + + public Data getData() { + return new Data() { + @Override + public String getMimeType() { + synchronized (Sequence.this) { + return contentType; + } + } + + @Override + public long size() { + synchronized (Sequence.this) { + return dataLength; + } + } + + @Override + public InputStream getInputStream() { + synchronized (Sequence.this) { + return payload; + } + } + }; + } + + public void allData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } + } + } + + public void getFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { + failed.set(true); + } + } + + public void disconnect(Throwable t) { + failed.set(true); + } + + } + } } +