X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=8d2afc0a5a06f22aa4bfbf56e11f108100d94e90;hb=ff2e4b93bfb1030fd11ec295f4ae6e9bd29b6966;hp=9c7a7056b4eb7dc17a1e01ea6f0b68e970b15418;hpb=0594300da2613fc58e794b655b2dd9b27c074b0b;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 9c7a705..8d2afc0 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -3,14 +3,13 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import net.pterodactylus.fcp.ClientHello; -import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.NodeHello; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Default {@link FcpClient} implementation. @@ -19,20 +18,17 @@ import net.pterodactylus.fcp.NodeHello; */ public class DefaultFcpClient implements FcpClient { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final String hostname; private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); private final Supplier clientName; - private final Supplier expectedVersion; - public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName, - Supplier expectedVersion) { - this.threadPool = threadPool; + public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName) { + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.hostname = hostname; this.port = port; this.clientName = clientName; - this.expectedVersion = expectedVersion; } private FcpConnection connect() throws IOException { @@ -46,17 +42,16 @@ public class DefaultFcpClient implements FcpClient { } private FcpConnection createConnection() throws IOException { - FcpConnection connection = new FcpConnection(hostname, port); - connection.connect(); - FcpReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection); - ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { - nodeHelloSequence.send(clientHello).get(); + return new ClientHelloImpl(threadPool, hostname, port).withName(clientName.get()).execute().get(); } catch (InterruptedException | ExecutionException e) { - connection.close(); - throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e); + throw new IOException(e); } - return connection; + } + + @Override + public GetNodeCommand getNode() { + return new GetNodeCommandImpl(threadPool, this::connect); } @Override @@ -69,33 +64,14 @@ public class DefaultFcpClient implements FcpClient { return new ClientGetCommandImpl(threadPool, this::connect); } - private class ClientHelloReplySequence extends FcpReplySequence { - - private final AtomicReference receivedNodeHello; - private final AtomicBoolean receivedClosed; - - public ClientHelloReplySequence(FcpConnection connection) { - super(DefaultFcpClient.this.threadPool, connection); - receivedNodeHello = new AtomicReference<>(); - receivedClosed = new AtomicBoolean(); - } - - @Override - protected boolean isFinished() { - return receivedNodeHello.get() != null || receivedClosed.get(); - } - - @Override - protected void consumeNodeHello(NodeHello nodeHello) { - receivedNodeHello.set(nodeHello); - } - - @Override - protected void consumeCloseConnectionDuplicateClientName( - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - receivedClosed.set(true); - } + @Override + public ClientPutCommand clientPut() { + return new ClientPutCommandImpl(threadPool, this::connect); + } + @Override + public ListPeersCommand listPeers() { + return new ListPeersCommandImpl(threadPool, this::connect); } }