import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import net.pterodactylus.fcp.AllData;
import net.pterodactylus.fcp.ClientGet;
+import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.FcpUtils.TempInputStream;
import net.pterodactylus.fcp.GetFailed;
import net.pterodactylus.fcp.Priority;
private final ExecutorService threadPool;
private final ConnectionSupplier connectionSupplier;
- private String identifier;
private boolean ignoreDataStore;
private boolean dataStoreOnly;
private Long maxSize;
}
@Override
- public ClientGetCommand identifier(String identifier) {
- this.identifier = identifier;
- return this;
- }
-
- @Override
public ClientGetCommand ignoreDataStore() {
ignoreDataStore = true;
return this;
}
private ClientGet createClientGetCommand(String uri) {
+ String identifier = new RandomIdentifierGenerator().generate();
ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
if (ignoreDataStore) {
clientGet.setIgnoreDataStore(true);
private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
+ private final AtomicReference<String> identifier = new AtomicReference<>();
private final AtomicBoolean finished = new AtomicBoolean();
private final AtomicBoolean failed = new AtomicBoolean();
- private final String identifier = ClientGetCommandImpl.this.identifier;
-
private String contentType;
private long dataLength;
private InputStream payload;
@Override
protected void consumeAllData(AllData allData) {
- if (allData.getIdentifier().equals(identifier)) {
+ if (allData.getIdentifier().equals(identifier.get())) {
synchronized (this) {
contentType = allData.getContentType();
dataLength = allData.getDataLength();
@Override
protected void consumeGetFailed(GetFailed getFailed) {
- if (getFailed.getIdentifier().equals(identifier)) {
+ if (getFailed.getIdentifier().equals(identifier.get())) {
failed.set(true);
}
}
failed.set(true);
}
+ @Override
+ public Future<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
+ identifier.set(fcpMessage.getField("Identifier"));
+ return super.send(fcpMessage);
+ }
+
}
}
import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
import com.google.common.io.ByteStreams;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.Test;
@Test
public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
- Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+ Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
+ String identifier = extractIdentifier(lines);
fcpServer.writeLine(
"AllData",
- "Identifier=test",
+ "Identifier=" + identifier,
"DataLength=6",
"StartupTime=1435610539000",
"CompletionTime=1435610540000",
assertThat(ByteStreams.toByteArray(data.get().getInputStream()), is("Hello\n".getBytes(StandardCharsets.UTF_8)));
}
+ private String extractIdentifier(List<String> lines) {
+ return lines.stream().filter(s -> s.startsWith("Identifier=")).map(s -> s.substring(s.indexOf('=') + 1)).findFirst().orElse("");
+ }
+
@Test
public void clientGetDownloadsDataForCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
- Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+ Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+ String identifier = extractIdentifier(lines);
fcpServer.writeLine(
"AllData",
"Identifier=not-test",
);
fcpServer.writeLine(
"AllData",
- "Identifier=test",
+ "Identifier=" + identifier,
"DataLength=6",
"StartupTime=1435610539000",
"CompletionTime=1435610540000",
@Test
public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
- Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+ Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+ String identifier = extractIdentifier(lines);
fcpServer.writeLine(
"GetFailed",
- "Identifier=test",
+ "Identifier=" + identifier,
"Code=3",
"EndMessage"
);
@Test
public void clientGetRecognizesGetFailedForCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
- Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+ Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+ String identifier = extractIdentifier(lines);
fcpServer.writeLine(
"GetFailed",
"Identifier=not-test",
);
fcpServer.writeLine(
"GetFailed",
- "Identifier=test",
+ "Identifier=" + identifier,
"Code=3",
"EndMessage"
);
@Test
public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
- Future<Optional<Data>> dataFuture = fcpClient.clientGet().identifier("test").uri("KSK@foo.txt");
+ Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
fcpServer.close();
Optional<Data> data = dataFuture.get();
assertThat(data.isPresent(), is(false));
@Test
public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().ignoreDataStore().identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "IgnoreDS=true",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
}
@Test
public void clientGetWithDataStoreOnlySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().dataStoreOnly().identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "DSonly=true",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
}
@Test
public void clientGetWithMaxSizeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().maxSize(1048576).identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "MaxSize=1048576",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
}
@Test
public void clientGetWithPrioritySettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().priority(Priority.interactive).identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "PriorityClass=1",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
}
@Test
public void clientGetWithRealTimeSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().realTime().identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().realTime().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "RealTimeFlag=true",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
}
@Test
public void clientGetWithGlobalSettingSendsCorrectCommands() throws InterruptedException, ExecutionException, IOException {
- fcpClient.clientGet().global().identifier("test").uri("KSK@foo.txt");
+ fcpClient.clientGet().global().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
- assertThat(lines, containsInAnyOrder(
- "ClientGet",
- "Identifier=test",
- "ReturnType=direct",
- "URI=KSK@foo.txt",
- "Global=true",
- "EndMessage"
- ));
+ assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
+ }
+
+ private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
+ return new TypeSafeDiagnosingMatcher<List<String>>() {
+ @Override
+ protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
+ if (!item.get(0).equals(name)) {
+ mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
+ return false;
+ }
+ for (String requiredLine : requiredLines) {
+ if (item.indexOf(requiredLine) < 1) {
+ mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("FCP message named ").appendValue(name);
+ description.appendValueList(", containing the lines", ", ", "", requiredLines);
+ }
+ };
}
}