X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FFcpReplySequence.java;h=9b44b9a4e47283ea9f12be4952463b1e1a2c402f;hb=fc1c3f3719425dfafb42fedef9ecad05783dd32c;hp=25db84713e8689caa87fa28e38fa329672cf5447;hpb=28e284e354ca3825675010f36c2b16b11c5f70ef;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 25db847..9b44b9a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,11 +1,17 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.ConfigData; import net.pterodactylus.fcp.DataFound; @@ -46,6 +52,10 @@ import net.pterodactylus.fcp.URIGenerated; import net.pterodactylus.fcp.UnknownNodeIdentifier; import net.pterodactylus.fcp.UnknownPeerNoteType; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies. * @@ -54,34 +64,60 @@ import net.pterodactylus.fcp.UnknownPeerNoteType; public abstract class FcpReplySequence implements AutoCloseable, FcpListener { private final Object syncObject = new Object(); - private final ExecutorService executorService; + private final ListeningExecutorService executorService; private final FcpConnection fcpConnection; + private final Queue messages = new ConcurrentLinkedQueue<>(); + private final AtomicReference identifier = new AtomicReference<>(); + private final AtomicBoolean connectionClosed = new AtomicBoolean(); + private final AtomicReference connectionFailureReason = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { - this.executorService = executorService; + this.executorService = MoreExecutors.listeningDecorator(executorService); this.fcpConnection = fcpConnection; } + protected void setIdentifier(String identifier) { + this.identifier.set(identifier); + } + protected abstract boolean isFinished(); - public Future send(FcpMessage fcpMessage) throws IOException { - try { + public ListenableFuture send(FcpMessage fcpMessage) throws IOException { + setIdentifier(fcpMessage.getField("Identifier")); fcpConnection.addFcpListener(this); - - } catch (Throwable throwable) { - throwable.printStackTrace(); - } - fcpConnection.sendMessage(fcpMessage); + messages.add(fcpMessage); return executorService.submit(() -> { synchronized (syncObject) { - while (!isFinished()) { + while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) { + while (messages.peek() != null) { + FcpMessage message = messages.poll(); + fcpConnection.sendMessage(message); + } + if (isFinished() || connectionClosed.get()) { + continue; + } syncObject.wait(); } } + Throwable throwable = connectionFailureReason.get(); + if (throwable != null) { + throw new ExecutionException(throwable); + } return getResult(); }); } + protected void sendMessage(FcpMessage fcpMessage) { + messages.add(fcpMessage); + notifySyncObject(); + } + + private void notifySyncObject() { + synchronized (syncObject) { + syncObject.notifyAll(); + } + } + protected R getResult() { return null; } @@ -91,25 +127,31 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener fcpConnection.removeFcpListener(this); } - private void consume(Consumer consumer, M message) { - consumer.accept(message); - synchronized (syncObject) { - syncObject.notifyAll(); + private void consume(Consumer consumer, M message) { + consume(consumer, message, "Identifier"); + } + + private void consume(Consumer consumer, M message, + String identifier) { + if (Objects.equals(message.getField(identifier), this.identifier.get())) { + consumeAlways(consumer, message); } } + private void consumeAlways(Consumer consumer, M message) { + consumer.accept(message); + notifySyncObject(); + } + private void consumeUnknown(FcpMessage fcpMessage) { consumeUnknownMessage(fcpMessage); - synchronized (syncObject) { - syncObject.notifyAll(); - } + notifySyncObject(); } private void consumeClose(Throwable throwable) { - consumeConnectionClosed(throwable); - synchronized (syncObject) { - syncObject.notifyAll(); - } + connectionFailureReason.set(throwable); + connectionClosed.set(true); + notifySyncObject(); } @Override @@ -122,11 +164,11 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consume(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); + connectionFailureReason.set(new IOException("duplicate client name")); + connectionClosed.set(true); + notifySyncObject(); } - protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } - @Override public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { consume(this::consumeSSKKeypair, sskKeypair); @@ -178,14 +220,14 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(this::consumeTestDDAReply, testDDAReply); + consume(this::consumeTestDDAReply, testDDAReply, "Directory"); } protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } @Override public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(this::consumeTestDDAComplete, testDDAComplete); + consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); } protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } @@ -388,6 +430,4 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener consumeClose(throwable); } - protected void consumeConnectionClosed(Throwable throwable) { } - }