projects
/
jFCPlib.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Rename Keyed interface to WithUri
[jFCPlib.git]
/
src
/
main
/
java
/
net
/
pterodactylus
/
fcp
/
quelaton
/
FcpReplySequence.java
diff --git
a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
index
25db847
..
106bd8a
100644
(file)
--- a/
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
+++ b/
src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java
@@
-1,8
+1,9
@@
package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
import net.pterodactylus.fcp.AllData;
import java.util.function.Consumer;
import net.pterodactylus.fcp.AllData;
@@
-46,6
+47,10
@@
import net.pterodactylus.fcp.URIGenerated;
import net.pterodactylus.fcp.UnknownNodeIdentifier;
import net.pterodactylus.fcp.UnknownPeerNoteType;
import net.pterodactylus.fcp.UnknownNodeIdentifier;
import net.pterodactylus.fcp.UnknownPeerNoteType;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
*
/**
* An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
*
@@
-54,27
+59,30
@@
import net.pterodactylus.fcp.UnknownPeerNoteType;
public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
private final Object syncObject = new Object();
public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
private final Object syncObject = new Object();
- private final ExecutorService executorService;
+ private final
Listening
ExecutorService executorService;
private final FcpConnection fcpConnection;
private final FcpConnection fcpConnection;
+ private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
- this.executorService =
executorService
;
+ this.executorService =
MoreExecutors.listeningDecorator(executorService)
;
this.fcpConnection = fcpConnection;
}
protected abstract boolean isFinished();
this.fcpConnection = fcpConnection;
}
protected abstract boolean isFinished();
- public Future<R> send(FcpMessage fcpMessage) throws IOException {
- try {
+ public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
fcpConnection.addFcpListener(this);
fcpConnection.addFcpListener(this);
-
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- fcpConnection.sendMessage(fcpMessage);
+ messages.add(fcpMessage);
return executorService.submit(() -> {
synchronized (syncObject) {
return executorService.submit(() -> {
synchronized (syncObject) {
- while (!isFinished()) {
+ while (!isFinished() || !messages.isEmpty()) {
+ while (messages.peek() != null) {
+ FcpMessage message = messages.poll();
+ fcpConnection.sendMessage(message);
+ }
+ if (isFinished()) {
+ continue;
+ }
syncObject.wait();
}
}
syncObject.wait();
}
}
@@
-82,6
+90,17
@@
public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
});
}
});
}
+ protected void sendMessage(FcpMessage fcpMessage) {
+ messages.add(fcpMessage);
+ notifySyncObject();
+ }
+
+ private void notifySyncObject() {
+ synchronized (syncObject) {
+ syncObject.notifyAll();
+ }
+ }
+
protected R getResult() {
return null;
}
protected R getResult() {
return null;
}
@@
-93,23
+112,17
@@
public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener
private <M> void consume(Consumer<M> consumer, M message) {
consumer.accept(message);
private <M> void consume(Consumer<M> consumer, M message) {
consumer.accept(message);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
private void consumeUnknown(FcpMessage fcpMessage) {
consumeUnknownMessage(fcpMessage);
}
private void consumeUnknown(FcpMessage fcpMessage) {
consumeUnknownMessage(fcpMessage);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
private void consumeClose(Throwable throwable) {
consumeConnectionClosed(throwable);
}
private void consumeClose(Throwable throwable) {
consumeConnectionClosed(throwable);
- synchronized (syncObject) {
- syncObject.notifyAll();
- }
+ notifySyncObject();
}
@Override
}
@Override