X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=3dd0b7692ef25d5c7ba6ea9b1d7dbc6397d899ec;hb=e591c6488b692e3dfcf2efd1905d399f39c6067f;hp=6edce63cadb471aee6745a3edef2f415e7663ff6;hpb=8ba2bf0baac5fa3b872c4a1f5020a15d825e525c;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 6edce63..3dd0b76 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -1,28 +1,17 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; -import java.io.InputStream; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import net.pterodactylus.fcp.AllData; -import net.pterodactylus.fcp.ClientGet; -import net.pterodactylus.fcp.ClientHello; -import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpKeyPair; -import net.pterodactylus.fcp.FcpUtils.TempInputStream; -import net.pterodactylus.fcp.GenerateSSK; -import net.pterodactylus.fcp.GetFailed; -import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.Priority; -import net.pterodactylus.fcp.ReturnType; -import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.Peer; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Default {@link FcpClient} implementation. @@ -31,245 +20,80 @@ import net.pterodactylus.fcp.SSKKeypair; */ public class DefaultFcpClient implements FcpClient { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final String hostname; private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); private final Supplier clientName; - private final Supplier expectedVersion; - public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName, - Supplier expectedVersion) { - this.threadPool = threadPool; + public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName) { + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.hostname = hostname; this.port = port; this.clientName = clientName; - this.expectedVersion = expectedVersion; } private FcpConnection connect() throws IOException { FcpConnection fcpConnection = this.fcpConnection.get(); - if (fcpConnection != null) { + if ((fcpConnection != null) && !fcpConnection.isClosed()) { return fcpConnection; } fcpConnection = createConnection(); - this.fcpConnection.compareAndSet(null, fcpConnection); + this.fcpConnection.set(fcpConnection); return fcpConnection; } private FcpConnection createConnection() throws IOException { - FcpConnection connection = new FcpConnection(hostname, port); - connection.connect(); - FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection) { - private final AtomicReference receivedNodeHello = new AtomicReference<>(); - private final AtomicBoolean receivedClosed = new AtomicBoolean(); - @Override - protected boolean isFinished() { - return receivedNodeHello.get() != null || receivedClosed.get(); - } - - @Override - protected void consumeNodeHello(NodeHello nodeHello) { - receivedNodeHello.set(nodeHello); - } - - @Override - protected void consumeCloseConnectionDuplicateClientName( - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - receivedClosed.set(true); - } - }; - ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { - nodeHelloSequence.send(clientHello).get(); + return new ClientHelloImpl(threadPool, hostname, port).withName(clientName.get()).execute().get(); } catch (InterruptedException | ExecutionException e) { - connection.close(); - throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e); + throw new IOException(e); } - return connection; } @Override - public GenerateKeypairCommand generateKeypair() { - return new GenerateKeypairCommandImpl(); + public GetNodeCommand getNode() { + return new GetNodeCommandImpl(threadPool, this::connect); } - private class GenerateKeypairCommandImpl implements GenerateKeypairCommand { - - @Override - public Future execute() { - return threadPool.submit(() -> { - connect(); - return new FcpReplySequence(threadPool, connect()) { - private AtomicReference keyPair = new AtomicReference<>(); - - @Override - protected boolean isFinished() { - return keyPair.get() != null; - } - - @Override - protected FcpKeyPair getResult() { - return keyPair.get(); - } - - @Override - protected void consumeSSKKeypair(SSKKeypair sskKeypair) { - keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI())); - } - }.send(new GenerateSSK()).get(); - }); - } - + @Override + public GenerateKeypairCommand generateKeypair() { + return new GenerateKeypairCommandImpl(threadPool, this::connect); } @Override public ClientGetCommand clientGet() { - return new ClientGetCommandImpl(); + return new ClientGetCommandImpl(threadPool, this::connect); } - private class ClientGetCommandImpl implements ClientGetCommand { - - private String identifier; - private boolean ignoreDataStore; - private boolean dataStoreOnly; - private Long maxSize; - private Priority priority; - private boolean realTime; - private boolean global; - - @Override - public ClientGetCommand identifier(String identifier) { - this.identifier = identifier; - return this; - } - - @Override - public ClientGetCommand ignoreDataStore() { - ignoreDataStore = true; - return this; - } - - @Override - public ClientGetCommand dataStoreOnly() { - dataStoreOnly = true; - return this; - } - - @Override - public ClientGetCommand maxSize(long maxSize) { - this.maxSize = maxSize; - return this; - } - - @Override - public ClientGetCommand priority(Priority priority) { - this.priority = priority; - return this; - } - - @Override - public ClientGetCommand realTime() { - realTime = true; - return this; - } - - @Override - public ClientGetCommand global() { - global = true; - return this; - } - - @Override - public Future> uri(String uri) { - ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct); - if (ignoreDataStore) { - clientGet.setIgnoreDataStore(true); - } - if (dataStoreOnly) { - clientGet.setDataStoreOnly(true); - } - if (maxSize != null) { - clientGet.setMaxSize(maxSize); - } - if (priority != null) { - clientGet.setPriority(priority); - } - if (realTime) { - clientGet.setRealTimeFlag(true); - } - if (global) { - clientGet.setGlobal(true); - } - return threadPool.submit(() -> { - FcpReplySequence> replySequence = new FcpReplySequence>(threadPool, connect()) { - private final AtomicBoolean finished = new AtomicBoolean(); - private final AtomicBoolean failed = new AtomicBoolean(); - - private final String identifier = ClientGetCommandImpl.this.identifier; - - private String contentType; - private long dataLength; - private InputStream payload; - - @Override - protected boolean isFinished() { - return finished.get() || failed.get(); - } - - @Override - protected Optional getResult() { - return failed.get() ? Optional.empty() : Optional.of(new Data() { - @Override - public String getMimeType() { - return contentType; - } - - @Override - public long size() { - return dataLength; - } + @Override + public ClientPutCommand clientPut() { + return new ClientPutCommandImpl(threadPool, this::connect); + } - @Override - public InputStream getInputStream() { - return payload; - } - }); - } + @Override + public ListPeerCommand listPeer() { + return new ListPeerCommandImpl(threadPool, this::connect); + } - @Override - protected void consumeAllData(AllData allData) { - if (allData.getIdentifier().equals(identifier)) { - synchronized (this) { - contentType = allData.getContentType(); - dataLength = allData.getDataLength(); - try { - payload = new TempInputStream(allData.getPayloadInputStream(), dataLength); - finished.set(true); - } catch (IOException e) { - // TODO – logging - failed.set(true); - } - } - } - } + @Override + public ListPeersCommand listPeers() { + return new ListPeersCommandImpl(threadPool, this::connect); + } - @Override - protected void consumeGetFailed(GetFailed getFailed) { - if (getFailed.getIdentifier().equals(identifier)) { - failed.set(true); - } - } + @Override + public AddPeerCommand addPeer() { + return new AddPeerCommandImpl(threadPool, this::connect); + } - @Override - protected void consumeConnectionClosed(Throwable throwable) { - failed.set(true); - } - }; - return replySequence.send(clientGet).get(); - }); - } + @Override + public ModifyPeerCommand modifyPeer() { + return new ModifyPeerCommandImpl(threadPool, this::connect); + } + @Override + public ListPeerNotesCommand listPeerNotes() { + return new ListPeerNotesCommandImpl(threadPool, this::connect); } }