Close FCP reply sequences after use
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientGetCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11
12 import net.pterodactylus.fcp.AllData;
13 import net.pterodactylus.fcp.ClientGet;
14 import net.pterodactylus.fcp.FcpMessage;
15 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
16 import net.pterodactylus.fcp.GetFailed;
17 import net.pterodactylus.fcp.Priority;
18 import net.pterodactylus.fcp.ReturnType;
19
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23
24 /**
25  * Implementation of the {@link ClientGetCommand}.
26  *
27  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
28  */
29 class ClientGetCommandImpl implements ClientGetCommand {
30
31         private final ListeningExecutorService threadPool;
32         private final ConnectionSupplier connectionSupplier;
33
34         private boolean ignoreDataStore;
35         private boolean dataStoreOnly;
36         private Long maxSize;
37         private Priority priority;
38         private boolean realTime;
39         private boolean global;
40
41         public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
42                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
43                 this.connectionSupplier = connectionSupplier;
44         }
45
46         @Override
47         public ClientGetCommand ignoreDataStore() {
48                 ignoreDataStore = true;
49                 return this;
50         }
51
52         @Override
53         public ClientGetCommand dataStoreOnly() {
54                 dataStoreOnly = true;
55                 return this;
56         }
57
58         @Override
59         public ClientGetCommand maxSize(long maxSize) {
60                 this.maxSize = maxSize;
61                 return this;
62         }
63
64         @Override
65         public ClientGetCommand priority(Priority priority) {
66                 this.priority = priority;
67                 return this;
68         }
69
70         @Override
71         public ClientGetCommand realTime() {
72                 realTime = true;
73                 return this;
74         }
75
76         @Override
77         public ClientGetCommand global() {
78                 global = true;
79                 return this;
80         }
81
82         @Override
83         public Executable<Optional<Data>> uri(String uri) {
84                 return () -> threadPool.submit(() -> execute(uri));
85         }
86
87         private Optional<Data> execute(String uri) throws InterruptedException, ExecutionException, IOException {
88                 ClientGet clientGet = createClientGetCommand(uri);
89                 try (ClientGetReplySequence clientGetReplySequence = new ClientGetReplySequence()) {
90                         return clientGetReplySequence.send(clientGet).get();
91                 }
92         }
93
94         private ClientGet createClientGetCommand(String uri) {
95                 String identifier = new RandomIdentifierGenerator().generate();
96                 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
97                 if (ignoreDataStore) {
98                         clientGet.setIgnoreDataStore(true);
99                 }
100                 if (dataStoreOnly) {
101                         clientGet.setDataStoreOnly(true);
102                 }
103                 if (maxSize != null) {
104                         clientGet.setMaxSize(maxSize);
105                 }
106                 if (priority != null) {
107                         clientGet.setPriority(priority);
108                 }
109                 if (realTime) {
110                         clientGet.setRealTimeFlag(true);
111                 }
112                 if (global) {
113                         clientGet.setGlobal(true);
114                 }
115                 return clientGet;
116         }
117
118         private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
119
120                 private final AtomicBoolean finished = new AtomicBoolean();
121                 private final AtomicBoolean failed = new AtomicBoolean();
122
123                 private String contentType;
124                 private long dataLength;
125                 private InputStream payload;
126
127                 public ClientGetReplySequence() throws IOException {
128                         super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
129                 }
130
131                 @Override
132                 protected boolean isFinished() {
133                         return finished.get() || failed.get();
134                 }
135
136                 @Override
137                 protected Optional<Data> getResult() {
138                         return failed.get() ? Optional.empty() : Optional.of(new Data() {
139                                 @Override
140                                 public String getMimeType() {
141                                         return contentType;
142                                 }
143
144                                 @Override
145                                 public long size() {
146                                         return dataLength;
147                                 }
148
149                                 @Override
150                                 public InputStream getInputStream() {
151                                         return payload;
152                                 }
153                         });
154                 }
155
156                 @Override
157                 protected void consumeAllData(AllData allData) {
158                         synchronized (this) {
159                                 contentType = allData.getContentType();
160                                 dataLength = allData.getDataLength();
161                                 try {
162                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
163                                         finished.set(true);
164                                 } catch (IOException e) {
165                                         // TODO – logging
166                                         failed.set(true);
167                                 }
168                         }
169                 }
170
171                 @Override
172                 protected void consumeGetFailed(GetFailed getFailed) {
173                         failed.set(true);
174                 }
175
176         }
177
178 }