package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
private final Object syncObject = new Object();
private final ListeningExecutorService executorService;
private final FcpConnection fcpConnection;
+ private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = MoreExecutors.listeningDecorator(executorService);
public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
fcpConnection.addFcpListener(this);
- fcpConnection.sendMessage(fcpMessage);
+ messages.add(fcpMessage);
return executorService.submit(() -> {
synchronized (syncObject) {
- while (!isFinished()) {
+ while (!isFinished() || !messages.isEmpty()) {
+ while (messages.peek() != null) {
+ FcpMessage message = messages.poll();
+ fcpConnection.sendMessage(message);
+ }
+ if (isFinished()) {
+ continue;
+ }
syncObject.wait();
}
}
});
}
+ protected void sendMessage(FcpMessage fcpMessage) {
+ messages.add(fcpMessage);
+ notifySyncObject();
+ }
+
+ private void notifySyncObject() {
+ synchronized (syncObject) {
+ syncObject.notifyAll();
+ }
+ }
+
protected R getResult() {
return null;
}
private <M> void consume(Consumer<M> consumer, M message) {
consumer.accept(message);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
private void consumeUnknown(FcpMessage fcpMessage) {
consumeUnknownMessage(fcpMessage);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
private void consumeClose(Throwable throwable) {
consumeConnectionClosed(throwable);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
@Override