X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=42522e049f7d73e05ce85060ca81f059cd2522fe;hb=e556f854462736f67977c60978a13cc1e1f7ee88;hp=30dac917bf451624dd386ab1d29b9216090e6956;hpb=9970ce4a2868da05e778ec998afb0a899a133959;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 30dac91..42522e0 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -12,6 +12,9 @@ import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; import net.pterodactylus.fcp.NodeHello; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * Default {@link FcpClient} implementation. * @@ -19,7 +22,7 @@ import net.pterodactylus.fcp.NodeHello; */ public class DefaultFcpClient implements FcpClient { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final String hostname; private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); @@ -27,8 +30,8 @@ public class DefaultFcpClient implements FcpClient { private final Supplier expectedVersion; public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName, - Supplier expectedVersion) { - this.threadPool = threadPool; + Supplier expectedVersion) { + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.hostname = hostname; this.port = port; this.clientName = clientName; @@ -48,25 +51,7 @@ public class DefaultFcpClient implements FcpClient { private FcpConnection createConnection() throws IOException { FcpConnection connection = new FcpConnection(hostname, port); connection.connect(); - FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection) { - private final AtomicReference receivedNodeHello = new AtomicReference<>(); - private final AtomicBoolean receivedClosed = new AtomicBoolean(); - @Override - protected boolean isFinished() { - return receivedNodeHello.get() != null || receivedClosed.get(); - } - - @Override - protected void consumeNodeHello(NodeHello nodeHello) { - receivedNodeHello.set(nodeHello); - } - - @Override - protected void consumeCloseConnectionDuplicateClientName( - CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - receivedClosed.set(true); - } - }; + FcpReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection); ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { nodeHelloSequence.send(clientHello).get(); @@ -87,5 +72,39 @@ public class DefaultFcpClient implements FcpClient { return new ClientGetCommandImpl(threadPool, this::connect); } + @Override + public ClientPutCommand clientPut() { + return new ClientPutCommandImpl(threadPool, this::connect); + } + + private class ClientHelloReplySequence extends FcpReplySequence { + + private final AtomicReference receivedNodeHello; + private final AtomicBoolean receivedClosed; + + public ClientHelloReplySequence(FcpConnection connection) { + super(DefaultFcpClient.this.threadPool, connection); + receivedNodeHello = new AtomicReference<>(); + receivedClosed = new AtomicBoolean(); + } + + @Override + protected boolean isFinished() { + return receivedNodeHello.get() != null || receivedClosed.get(); + } + + @Override + protected void consumeNodeHello(NodeHello nodeHello) { + receivedNodeHello.set(nodeHello); + } + + @Override + protected void consumeCloseConnectionDuplicateClientName( + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + receivedClosed.set(true); + } + + } + }