X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FFcpReplySequence.java;h=9b44b9a4e47283ea9f12be4952463b1e1a2c402f;hb=fc1c3f3719425dfafb42fedef9ecad05783dd32c;hp=8bdaf5c81f3455ad2775cdd44fc8ba6484b70767;hpb=52ddba35dcbe01d9c18d33814bab1dce85897d7e;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 8bdaf5c..9b44b9a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -6,6 +6,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -67,6 +68,7 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener private final FcpConnection fcpConnection; private final Queue messages = new ConcurrentLinkedQueue<>(); private final AtomicReference identifier = new AtomicReference<>(); + private final AtomicBoolean connectionClosed = new AtomicBoolean(); private final AtomicReference connectionFailureReason = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { @@ -86,12 +88,12 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener messages.add(fcpMessage); return executorService.submit(() -> { synchronized (syncObject) { - while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) { + while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) { while (messages.peek() != null) { FcpMessage message = messages.poll(); fcpConnection.sendMessage(message); } - if (isFinished() || (connectionFailureReason.get() != null)) { + if (isFinished() || connectionClosed.get()) { continue; } syncObject.wait(); @@ -142,14 +144,13 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener } private void consumeUnknown(FcpMessage fcpMessage) { - if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) { - consumeUnknownMessage(fcpMessage); - notifySyncObject(); - } + consumeUnknownMessage(fcpMessage); + notifySyncObject(); } private void consumeClose(Throwable throwable) { connectionFailureReason.set(throwable); + connectionClosed.set(true); notifySyncObject(); } @@ -163,11 +164,11 @@ public abstract class FcpReplySequence implements AutoCloseable, FcpListener @Override public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); + connectionFailureReason.set(new IOException("duplicate client name")); + connectionClosed.set(true); + notifySyncObject(); } - protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } - @Override public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { consume(this::consumeSSKKeypair, sskKeypair);