Use execute() to trigger execution of commands
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientGetCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 import java.util.concurrent.atomic.AtomicReference;
9
10 import net.pterodactylus.fcp.AllData;
11 import net.pterodactylus.fcp.ClientGet;
12 import net.pterodactylus.fcp.FcpMessage;
13 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
14 import net.pterodactylus.fcp.GetFailed;
15 import net.pterodactylus.fcp.Priority;
16 import net.pterodactylus.fcp.ReturnType;
17
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21
22 /**
23  * Implementation of the {@link ClientGetCommand}.
24  *
25  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
26  */
27 class ClientGetCommandImpl implements ClientGetCommand {
28
29         private final ListeningExecutorService threadPool;
30         private final ConnectionSupplier connectionSupplier;
31
32         private boolean ignoreDataStore;
33         private boolean dataStoreOnly;
34         private Long maxSize;
35         private Priority priority;
36         private boolean realTime;
37         private boolean global;
38
39         public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41                 this.connectionSupplier = connectionSupplier;
42         }
43
44         @Override
45         public ClientGetCommand ignoreDataStore() {
46                 ignoreDataStore = true;
47                 return this;
48         }
49
50         @Override
51         public ClientGetCommand dataStoreOnly() {
52                 dataStoreOnly = true;
53                 return this;
54         }
55
56         @Override
57         public ClientGetCommand maxSize(long maxSize) {
58                 this.maxSize = maxSize;
59                 return this;
60         }
61
62         @Override
63         public ClientGetCommand priority(Priority priority) {
64                 this.priority = priority;
65                 return this;
66         }
67
68         @Override
69         public ClientGetCommand realTime() {
70                 realTime = true;
71                 return this;
72         }
73
74         @Override
75         public ClientGetCommand global() {
76                 global = true;
77                 return this;
78         }
79
80         @Override
81         public Executable<Optional<Data>> uri(String uri) {
82                 ClientGet clientGet = createClientGetCommand(uri);
83                 return () -> threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get());
84         }
85
86         private ClientGet createClientGetCommand(String uri) {
87                 String identifier = new RandomIdentifierGenerator().generate();
88                 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
89                 if (ignoreDataStore) {
90                         clientGet.setIgnoreDataStore(true);
91                 }
92                 if (dataStoreOnly) {
93                         clientGet.setDataStoreOnly(true);
94                 }
95                 if (maxSize != null) {
96                         clientGet.setMaxSize(maxSize);
97                 }
98                 if (priority != null) {
99                         clientGet.setPriority(priority);
100                 }
101                 if (realTime) {
102                         clientGet.setRealTimeFlag(true);
103                 }
104                 if (global) {
105                         clientGet.setGlobal(true);
106                 }
107                 return clientGet;
108         }
109
110         private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
111
112                 private final AtomicBoolean finished = new AtomicBoolean();
113                 private final AtomicBoolean failed = new AtomicBoolean();
114
115                 private String contentType;
116                 private long dataLength;
117                 private InputStream payload;
118
119                 public ClientGetReplySequence() throws IOException {
120                         super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
121                 }
122
123                 @Override
124                 protected boolean isFinished() {
125                         return finished.get() || failed.get();
126                 }
127
128                 @Override
129                 protected Optional<Data> getResult() {
130                         return failed.get() ? Optional.empty() : Optional.of(new Data() {
131                                 @Override
132                                 public String getMimeType() {
133                                         return contentType;
134                                 }
135
136                                 @Override
137                                 public long size() {
138                                         return dataLength;
139                                 }
140
141                                 @Override
142                                 public InputStream getInputStream() {
143                                         return payload;
144                                 }
145                         });
146                 }
147
148                 @Override
149                 protected void consumeAllData(AllData allData) {
150                         synchronized (this) {
151                                 contentType = allData.getContentType();
152                                 dataLength = allData.getDataLength();
153                                 try {
154                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
155                                         finished.set(true);
156                                 } catch (IOException e) {
157                                         // TODO – logging
158                                         failed.set(true);
159                                 }
160                         }
161                 }
162
163                 @Override
164                 protected void consumeGetFailed(GetFailed getFailed) {
165                         failed.set(true);
166                 }
167
168         }
169
170 }