X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClient.java;h=42522e049f7d73e05ce85060ca81f059cd2522fe;hb=e556f854462736f67977c60978a13cc1e1f7ee88;hp=e0c6e12528d2ed0dfdd044994479f7dc260f5fdd;hpb=075f351f114b8a58f82a8e2d2e295b987237d66e;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index e0c6e12..42522e0 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -3,7 +3,6 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -11,10 +10,10 @@ import java.util.function.Supplier; import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.FcpConnection; -import net.pterodactylus.fcp.FcpKeyPair; -import net.pterodactylus.fcp.GenerateSSK; import net.pterodactylus.fcp.NodeHello; -import net.pterodactylus.fcp.SSKKeypair; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Default {@link FcpClient} implementation. @@ -23,7 +22,7 @@ import net.pterodactylus.fcp.SSKKeypair; */ public class DefaultFcpClient implements FcpClient { - private final ExecutorService threadPool; + private final ListeningExecutorService threadPool; private final String hostname; private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); @@ -31,34 +30,28 @@ public class DefaultFcpClient implements FcpClient { private final Supplier expectedVersion; public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName, - Supplier expectedVersion) { - this.threadPool = threadPool; + Supplier expectedVersion) { + this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.hostname = hostname; this.port = port; this.clientName = clientName; this.expectedVersion = expectedVersion; } - private void connect() throws IOException { - if (fcpConnection.get() != null) { - return; + private FcpConnection connect() throws IOException { + FcpConnection fcpConnection = this.fcpConnection.get(); + if (fcpConnection != null) { + return fcpConnection; } - fcpConnection.compareAndSet(null, createConnection()); + fcpConnection = createConnection(); + this.fcpConnection.compareAndSet(null, fcpConnection); + return fcpConnection; } private FcpConnection createConnection() throws IOException { FcpConnection connection = new FcpConnection(hostname, port); connection.connect(); - AtomicReference receivedNodeHello = new AtomicReference<>(); - AtomicBoolean receivedClosed = new AtomicBoolean(); - FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection); - nodeHelloSequence - .handle(NodeHello.class) - .with((nodeHello) -> receivedNodeHello.set(nodeHello)); - nodeHelloSequence - .handle(CloseConnectionDuplicateClientName.class) - .with((closeConnection) -> receivedClosed.set(true)); - nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get()); + FcpReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection); ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); try { nodeHelloSequence.send(clientHello).get(); @@ -71,27 +64,47 @@ public class DefaultFcpClient implements FcpClient { @Override public GenerateKeypairCommand generateKeypair() { - return new GenerateKeypairCommandImpl(); + return new GenerateKeypairCommandImpl(threadPool, this::connect); } - private class GenerateKeypairCommandImpl implements GenerateKeypairCommand { + @Override + public ClientGetCommand clientGet() { + return new ClientGetCommandImpl(threadPool, this::connect); + } + + @Override + public ClientPutCommand clientPut() { + return new ClientPutCommandImpl(threadPool, this::connect); + } + + private class ClientHelloReplySequence extends FcpReplySequence { + + private final AtomicReference receivedNodeHello; + private final AtomicBoolean receivedClosed; + + public ClientHelloReplySequence(FcpConnection connection) { + super(DefaultFcpClient.this.threadPool, connection); + receivedNodeHello = new AtomicReference<>(); + receivedClosed = new AtomicBoolean(); + } + + @Override + protected boolean isFinished() { + return receivedNodeHello.get() != null || receivedClosed.get(); + } @Override - public Future execute() { - return threadPool.submit(() -> { - connect(); - GenerateSSK generateSSK = new GenerateSSK(); - AtomicReference keyPair = new AtomicReference<>(); - FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get()); - replySequence.handle(SSKKeypair.class) - .with((message) -> keyPair.set( - new FcpKeyPair(message.getRequestURI(), message.getInsertURI()))); - replySequence.waitFor(() -> keyPair.get() != null); - replySequence.send(generateSSK).get(); - return keyPair.get(); - }); + protected void consumeNodeHello(NodeHello nodeHello) { + receivedNodeHello.set(nodeHello); + } + + @Override + protected void consumeCloseConnectionDuplicateClientName( + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + receivedClosed.set(true); } } } +