package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.ClientGet;
import net.pterodactylus.fcp.ClientHello;
import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpKeyPair;
-import net.pterodactylus.fcp.GenerateSSK;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
+import net.pterodactylus.fcp.GetFailed;
import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.Priority;
+import net.pterodactylus.fcp.ReturnType;
/**
* Default {@link FcpClient} implementation.
this.expectedVersion = expectedVersion;
}
- private void connect() throws IOException {
- if (fcpConnection.get() != null) {
- return;
+ private FcpConnection connect() throws IOException {
+ FcpConnection fcpConnection = this.fcpConnection.get();
+ if (fcpConnection != null) {
+ return fcpConnection;
}
- fcpConnection.compareAndSet(null, createConnection());
+ fcpConnection = createConnection();
+ this.fcpConnection.compareAndSet(null, fcpConnection);
+ return fcpConnection;
}
private FcpConnection createConnection() throws IOException {
FcpConnection connection = new FcpConnection(hostname, port);
connection.connect();
- AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- AtomicBoolean receivedClosed = new AtomicBoolean();
- FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
- nodeHelloSequence
- .handle(NodeHello.class)
- .with((nodeHello) -> receivedNodeHello.set(nodeHello));
- nodeHelloSequence
- .handle(CloseConnectionDuplicateClientName.class)
- .with((closeConnection) -> receivedClosed.set(true));
- nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+ FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
+ private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+ private final AtomicBoolean receivedClosed = new AtomicBoolean();
+ @Override
+ protected boolean isFinished() {
+ return receivedNodeHello.get() != null || receivedClosed.get();
+ }
+
+ @Override
+ protected void consumeNodeHello(NodeHello nodeHello) {
+ receivedNodeHello.set(nodeHello);
+ }
+
+ @Override
+ protected void consumeCloseConnectionDuplicateClientName(
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ receivedClosed.set(true);
+ }
+ };
ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
try {
nodeHelloSequence.send(clientHello).get();
@Override
public GenerateKeypairCommand generateKeypair() {
- return new GenerateKeypairCommandImpl();
+ return new GenerateKeypairCommandImpl(threadPool, this::connect);
}
- private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
+ @Override
+ public ClientGetCommand clientGet() {
+ return new ClientGetCommandImpl();
+ }
+
+ private class ClientGetCommandImpl implements ClientGetCommand {
+
+ private String identifier;
+ private boolean ignoreDataStore;
+ private boolean dataStoreOnly;
+ private Long maxSize;
+ private Priority priority;
+ private boolean realTime;
+ private boolean global;
@Override
- public Future<FcpKeyPair> execute() {
- return threadPool.submit(() -> {
- connect();
- Sequence sequence = new Sequence();
- FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
- replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
- replySequence.waitFor(sequence::isFinished);
- replySequence.send(new GenerateSSK()).get();
- return sequence.getKeyPair();
- });
+ public ClientGetCommand identifier(String identifier) {
+ this.identifier = identifier;
+ return this;
}
- private class Sequence {
+ @Override
+ public ClientGetCommand ignoreDataStore() {
+ ignoreDataStore = true;
+ return this;
+ }
- private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+ @Override
+ public ClientGetCommand dataStoreOnly() {
+ dataStoreOnly = true;
+ return this;
+ }
- public void handleSSKKeypair(SSKKeypair sskKeypair) {
- keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
- }
+ @Override
+ public ClientGetCommand maxSize(long maxSize) {
+ this.maxSize = maxSize;
+ return this;
+ }
- public boolean isFinished() {
- return keyPair.get() != null;
- }
+ @Override
+ public ClientGetCommand priority(Priority priority) {
+ this.priority = priority;
+ return this;
+ }
- public FcpKeyPair getKeyPair() {
- return keyPair.get();
- }
+ @Override
+ public ClientGetCommand realTime() {
+ realTime = true;
+ return this;
+ }
+ @Override
+ public ClientGetCommand global() {
+ global = true;
+ return this;
+ }
+
+ @Override
+ public Future<Optional<Data>> uri(String uri) {
+ ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
+ if (ignoreDataStore) {
+ clientGet.setIgnoreDataStore(true);
+ }
+ if (dataStoreOnly) {
+ clientGet.setDataStoreOnly(true);
+ }
+ if (maxSize != null) {
+ clientGet.setMaxSize(maxSize);
+ }
+ if (priority != null) {
+ clientGet.setPriority(priority);
+ }
+ if (realTime) {
+ clientGet.setRealTimeFlag(true);
+ }
+ if (global) {
+ clientGet.setGlobal(true);
+ }
+ return threadPool.submit(() -> {
+ FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
+ private final AtomicBoolean finished = new AtomicBoolean();
+ private final AtomicBoolean failed = new AtomicBoolean();
+
+ private final String identifier = ClientGetCommandImpl.this.identifier;
+
+ private String contentType;
+ private long dataLength;
+ private InputStream payload;
+
+ @Override
+ protected boolean isFinished() {
+ return finished.get() || failed.get();
+ }
+
+ @Override
+ protected Optional<Data> getResult() {
+ return failed.get() ? Optional.empty() : Optional.of(new Data() {
+ @Override
+ public String getMimeType() {
+ return contentType;
+ }
+
+ @Override
+ public long size() {
+ return dataLength;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return payload;
+ }
+ });
+ }
+
+ @Override
+ protected void consumeAllData(AllData allData) {
+ if (allData.getIdentifier().equals(identifier)) {
+ synchronized (this) {
+ contentType = allData.getContentType();
+ dataLength = allData.getDataLength();
+ try {
+ payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+ finished.set(true);
+ } catch (IOException e) {
+ // TODO – logging
+ failed.set(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ if (getFailed.getIdentifier().equals(identifier)) {
+ failed.set(true);
+ }
+ }
+
+ @Override
+ protected void consumeConnectionClosed(Throwable throwable) {
+ failed.set(true);
+ }
+ };
+ return replySequence.send(clientGet).get();
+ });
}
}
}
+