package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.ConfigData;
import net.pterodactylus.fcp.DataFound;
private final ListeningExecutorService executorService;
private final FcpConnection fcpConnection;
private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+ private final AtomicReference<String> identifier = new AtomicReference<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.fcpConnection = fcpConnection;
}
+ protected void setIdentifier(String identifier) {
+ this.identifier.set(identifier);
+ }
+
protected abstract boolean isFinished();
public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+ setIdentifier(fcpMessage.getField("Identifier"));
fcpConnection.addFcpListener(this);
messages.add(fcpMessage);
return executorService.submit(() -> {
fcpConnection.removeFcpListener(this);
}
- private <M> void consume(Consumer<M> consumer, M message) {
- consumer.accept(message);
- notifySyncObject();
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+ consume(consumer, message, "Identifier");
+ }
+
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+ String identifier) {
+ if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+ consumer.accept(message);
+ notifySyncObject();
+ }
}
private void consumeUnknown(FcpMessage fcpMessage) {
- consumeUnknownMessage(fcpMessage);
- notifySyncObject();
+ if (Objects.equals(fcpMessage.getField("Identifier"), identifier.get())) {
+ consumeUnknownMessage(fcpMessage);
+ notifySyncObject();
+ }
}
private void consumeClose(Throwable throwable) {
@Override
public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
- consume(this::consumeTestDDAReply, testDDAReply);
+ consume(this::consumeTestDDAReply, testDDAReply, "Directory");
}
protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
@Override
public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
- consume(this::consumeTestDDAComplete, testDDAComplete);
+ consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
}
protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
connectNode();
- fcpServer.collectUntil(is("EndMessage"));
+ List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+ String identifier = extractIdentifier(lines);
fcpServer.writeLine("SSKKeypair",
"InsertURI=" + INSERT_URI + "",
"RequestURI=" + REQUEST_URI + "",
- "Identifier=My Identifier from GenerateSSK",
+ "Identifier=" + identifier,
"EndMessage");
FcpKeyPair keyPair = keyPairFuture.get();
assertThat(keyPair.getPublicKey(), is(REQUEST_URI));