X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=79c7f44f9d27672d06a3d5829293301f6219d6f9;hb=7980851d84be2f31d0db3837619eb1548c3847c7;hp=9cb098155e33c0c68bf4f1f619b3775d0ef9a8e6;hpb=aa659bcdaa77efb1e902a279a3505964d26e09ff;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 9cb0981..79c7f44 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,6 +1,8 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -8,13 +10,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.ClientGet; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpKeyPair; -import net.pterodactylus.fcp.GenerateSSK; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; +import net.pterodactylus.fcp.GetFailed; import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.ReturnType; /** * Default {@link FcpClient} implementation. @@ -39,26 +44,38 @@ public class DefaultFcpClient implements FcpClient { this.expectedVersion = expectedVersion; } - private void connect() throws IOException { - if (fcpConnection.get() != null) { - return; + private FcpConnection connect() throws IOException { + FcpConnection fcpConnection = this.fcpConnection.get(); + if (fcpConnection != null) { + return fcpConnection; } - fcpConnection.compareAndSet(null, createConnection()); + fcpConnection = createConnection(); + this.fcpConnection.compareAndSet(null, fcpConnection); + return fcpConnection; } private FcpConnection createConnection() throws IOException { FcpConnection connection = new FcpConnection(hostname, port); connection.connect(); - AtomicReference receivedNodeHello = new AtomicReference<>(); - AtomicBoolean receivedClosed = new AtomicBoolean(); - FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection); - nodeHelloSequence - .handle(NodeHello.class) - .with((nodeHello) -> receivedNodeHello.set(nodeHello)); - nodeHelloSequence - .handle(CloseConnectionDuplicateClientName.class) - .with((closeConnection) -> receivedClosed.set(true)); - nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get()); + FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection) { + private final AtomicReference receivedNodeHello = new AtomicReference<>(); + private final AtomicBoolean receivedClosed = new AtomicBoolean(); + @Override + protected boolean isFinished() { + return receivedNodeHello.get() != null || receivedClosed.get(); + } + + @Override + protected void consumeNodeHello(NodeHello nodeHello) { + receivedNodeHello.set(nodeHello); + } + + @Override + protected void consumeCloseConnectionDuplicateClientName( + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + receivedClosed.set(true); + } + }; ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { nodeHelloSequence.send(clientHello).get(); @@ -71,42 +88,157 @@ public class DefaultFcpClient implements FcpClient { @Override public GenerateKeypairCommand generateKeypair() { - return new GenerateKeypairCommandImpl(); + return new GenerateKeypairCommandImpl(threadPool, this::connect); } - private class GenerateKeypairCommandImpl implements GenerateKeypairCommand { + @Override + public ClientGetCommand clientGet() { + return new ClientGetCommandImpl(); + } + + private class ClientGetCommandImpl implements ClientGetCommand { + + private String identifier; + private boolean ignoreDataStore; + private boolean dataStoreOnly; + private Long maxSize; + private Priority priority; + private boolean realTime; + private boolean global; @Override - public Future execute() { - return threadPool.submit(() -> { - connect(); - Sequence sequence = new Sequence(); - FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get()); - replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair); - replySequence.waitFor(sequence::isFinished); - replySequence.send(new GenerateSSK()).get(); - return sequence.getKeyPair(); - }); + public ClientGetCommand identifier(String identifier) { + this.identifier = identifier; + return this; } - private class Sequence { + @Override + public ClientGetCommand ignoreDataStore() { + ignoreDataStore = true; + return this; + } - private AtomicReference keyPair = new AtomicReference<>(); + @Override + public ClientGetCommand dataStoreOnly() { + dataStoreOnly = true; + return this; + } - public void handleSSKKeypair(SSKKeypair sskKeypair) { - keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI())); - } + @Override + public ClientGetCommand maxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } - public boolean isFinished() { - return keyPair.get() != null; - } + @Override + public ClientGetCommand priority(Priority priority) { + this.priority = priority; + return this; + } - public FcpKeyPair getKeyPair() { - return keyPair.get(); - } + @Override + public ClientGetCommand realTime() { + realTime = true; + return this; + } + @Override + public ClientGetCommand global() { + global = true; + return this; + } + + @Override + public Future> uri(String uri) { + ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); + if (ignoreDataStore) { + clientGet.setIgnoreDataStore(true); + } + if (dataStoreOnly) { + clientGet.setDataStoreOnly(true); + } + if (maxSize != null) { + clientGet.setMaxSize(maxSize); + } + if (priority != null) { + clientGet.setPriority(priority); + } + if (realTime) { + clientGet.setRealTimeFlag(true); + } + if (global) { + clientGet.setGlobal(true); + } + return threadPool.submit(() -> { + FcpReplySequence> replySequence = new FcpReplySequence>(threadPool, connect()) { + private final AtomicBoolean finished = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + + private final String identifier = ClientGetCommandImpl.this.identifier; + + private String contentType; + private long dataLength; + private InputStream payload; + + @Override + protected boolean isFinished() { + return finished.get() || failed.get(); + } + + @Override + protected Optional getResult() { + return failed.get() ? Optional.empty() : Optional.of(new Data() { + @Override + public String getMimeType() { + return contentType; + } + + @Override + public long size() { + return dataLength; + } + + @Override + public InputStream getInputStream() { + return payload; + } + }); + } + + @Override + protected void consumeAllData(AllData allData) { + if (allData.getIdentifier().equals(identifier)) { + synchronized (this) { + contentType = allData.getContentType(); + dataLength = allData.getDataLength(); + try { + payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); + finished.set(true); + } catch (IOException e) { + // TODO – logging + failed.set(true); + } + } + } + } + + @Override + protected void consumeGetFailed(GetFailed getFailed) { + if (getFailed.getIdentifier().equals(identifier)) { + failed.set(true); + } + } + + @Override + protected void consumeConnectionClosed(Throwable throwable) { + failed.set(true); + } + }; + return replySequence.send(clientGet).get(); + }); } } } +