+ @Override
+ public ClientGetCommand global() {
+ global = true;
+ return this;
+ }
+
+ @Override
+ public Future<Optional<Data>> uri(String uri) {
+ ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
+ if (ignoreDataStore) {
+ clientGet.setIgnoreDataStore(true);
+ }
+ if (dataStoreOnly) {
+ clientGet.setDataStoreOnly(true);
+ }
+ if (maxSize != null) {
+ clientGet.setMaxSize(maxSize);
+ }
+ if (priority != null) {
+ clientGet.setPriority(priority);
+ }
+ if (realTime) {
+ clientGet.setRealTimeFlag(true);
+ }
+ if (global) {
+ clientGet.setGlobal(true);
+ }
+ return threadPool.submit(() -> {
+ FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
+ private final AtomicBoolean finished = new AtomicBoolean();
+ private final AtomicBoolean failed = new AtomicBoolean();
+
+ private final String identifier = ClientGetCommandImpl.this.identifier;
+
+ private String contentType;
+ private long dataLength;
+ private InputStream payload;
+
+ @Override
+ protected boolean isFinished() {
+ return finished.get() || failed.get();
+ }
+
+ @Override
+ protected Optional<Data> getResult() {
+ return failed.get() ? Optional.empty() : Optional.of(new Data() {
+ @Override
+ public String getMimeType() {
+ return contentType;
+ }
+
+ @Override
+ public long size() {
+ return dataLength;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return payload;
+ }
+ });
+ }
+
+ @Override
+ protected void consumeAllData(AllData allData) {
+ if (allData.getIdentifier().equals(identifier)) {
+ synchronized (this) {
+ contentType = allData.getContentType();
+ dataLength = allData.getDataLength();
+ try {
+ payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+ finished.set(true);
+ } catch (IOException e) {
+ // TODO – logging
+ failed.set(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void consumeGetFailed(GetFailed getFailed) {
+ if (getFailed.getIdentifier().equals(identifier)) {
+ failed.set(true);
+ }
+ }
+
+ @Override
+ protected void consumeConnectionClosed(Throwable throwable) {
+ failed.set(true);
+ }
+ };
+ return replySequence.send(clientGet).get();
+ });