Expose source and sinks from the pipeline.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.logging.Logger;
30
31 import com.google.common.base.Function;
32 import com.google.common.base.Preconditions;
33 import com.google.common.collect.ArrayListMultimap;
34 import com.google.common.collect.FluentIterable;
35 import com.google.common.collect.ImmutableMultimap;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Multimap;
38 import com.google.common.util.concurrent.MoreExecutors;
39
40 /**
41  * A pipeline is responsible for streaming audio data from a {@link Source} to
42  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
43  *
44  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
45  */
46 public class Pipeline {
47
48         /** The logger. */
49         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
50
51         /** The source of the audio stream. */
52         private final Source source;
53
54         /** The sinks for each source. */
55         private final Multimap<Source, Sink> sinks;
56
57         /** All started feeders. */
58         private final List<Feeder> feeders = Lists.newArrayList();
59
60         /**
61          * Creates a new pipeline.
62          *
63          * @param source
64          *              The source of the audio stream
65          * @param sinks
66          *              The sinks for each source
67          */
68         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
69                 this.source = Preconditions.checkNotNull(source, "source must not be null");
70                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
71         }
72
73         //
74         // ACCESSORS
75         //
76
77         /**
78          * Expose this pipeline’s source.
79          *
80          * @return This pipeline’s source
81          */
82         public Source source() {
83                 return source;
84         }
85
86         /**
87          * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
88          * the given source.
89          *
90          * @param source
91          *              The source to get the sinks for
92          * @return The sinks connected to the given source, or an empty list if the
93          *         source does not exist in this pipeline
94          */
95         public Collection<Sink> sinks(Source source) {
96                 return sinks.get(source);
97         }
98
99         //
100         // ACTIONS
101         //
102
103         /**
104          * Starts the pipeline.
105          *
106          * @throws IOException
107          *              if any of the sinks can not be opened
108          * @throws IllegalStateException
109          *              if the pipeline is already running
110          */
111         public void start() throws IOException, IllegalStateException {
112                 if (!feeders.isEmpty()) {
113                         throw new IllegalStateException("Pipeline is already running!");
114                 }
115                 List<Source> sources = Lists.newArrayList();
116                 sources.add(source);
117                 /* collect all source->sink pairs. */
118                 while (!sources.isEmpty()) {
119                         Source source = sources.remove(0);
120                         Collection<Sink> sinks = this.sinks.get(source);
121                         feeders.add(new Feeder(source, sinks));
122                         for (Sink sink : sinks) {
123                                 sink.open(source.metadata());
124                                 if (sink instanceof Filter) {
125                                         sources.add((Source) sink);
126                                 }
127                         }
128                 }
129                 for (Feeder feeder : feeders) {
130                         logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
131                         new Thread(feeder).start();
132                 }
133         }
134
135         public void stop() {
136                 if (!feeders.isEmpty()) {
137                         /* pipeline is not running. */
138                         return;
139                 }
140                 for (Feeder feeder : feeders) {
141                         feeder.stop();
142                 }
143         }
144
145         //
146         // STATIC METHODS
147         //
148
149         /**
150          * Returns a new pipeline builder.
151          *
152          * @param source
153          *              The source at which to start
154          * @return A builder for a new pipeline
155          */
156         public static Builder builder(Source source) {
157                 return new Builder(source);
158         }
159
160         /**
161          * A builder for a {@link Pipeline}.
162          *
163          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
164          */
165         public static class Builder {
166
167                 /** The source of the pipeline. */
168                 private final Source source;
169
170                 /** The sinks to which each source streams. */
171                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
172
173                 /** The last added source. */
174                 private Source lastSource;
175
176                 /**
177                  * Creates a new builder.
178                  *
179                  * @param source
180                  *              The source that starts the pipeline
181                  */
182                 private Builder(Source source) {
183                         this.source = source;
184                         lastSource = source;
185                 }
186
187                 /**
188                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
189                  * {@link Source}.
190                  *
191                  * @param sink
192                  *              The sink to add
193                  * @return This builder
194                  * @throws IllegalStateException
195                  *              if the last added {@link Sink} was not also a {@link Source}
196                  */
197                 public Builder to(Sink sink) {
198                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
199                         nextSinks.put(lastSource, sink);
200                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
201                         return this;
202                 }
203
204                 /**
205                  * Locates the given source and sets it as the last added node so that the
206                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
207                  *
208                  * @param source
209                  *              The source to locate
210                  * @return This builder
211                  * @throws IllegalStateException
212                  *              if the given source was not previously added as a sink
213                  */
214                 public Builder find(Source source) {
215                         Preconditions.checkState(nextSinks.containsValue(source));
216                         lastSource = source;
217                         return this;
218                 }
219
220                 /**
221                  * Builds the pipeline.
222                  *
223                  * @return The created pipeline
224                  */
225                 public Pipeline build() {
226                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
227                 }
228
229         }
230
231         /**
232          * A feeder is responsible for streaming audio from one {@link Source} to an
233          * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
234          * creating a {@link Thread} wrapping it and starting said thread.
235          *
236          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
237          */
238         private class Feeder implements Runnable {
239
240                 /** The source. */
241                 private final Source source;
242
243                 /** The sinks. */
244                 private final Collection<Sink> sinks;
245
246                 /** Whether the feeder was stopped. */
247                 private final AtomicBoolean stopped = new AtomicBoolean(false);
248
249                 /** The executor service. */
250                 private final ExecutorService executorService;
251
252                 /**
253                  * Creates a new feeder.
254                  *
255                  * @param source
256                  *              The source of the stream
257                  * @param sinks
258                  *              The sinks to which to stream
259                  */
260                 public Feeder(Source source, Collection<Sink> sinks) {
261                         this.source = source;
262                         this.sinks = sinks;
263                         if (sinks.size() == 1) {
264                                 executorService = MoreExecutors.sameThreadExecutor();
265                         } else {
266                                 executorService = Executors.newCachedThreadPool();
267                         }
268                 }
269
270                 //
271                 // ACTIONS
272                 //
273
274                 /** Stops this feeder. */
275                 public void stop() {
276                         stopped.set(true);
277                 }
278
279                 //
280                 // RUNNABLE METHODS
281                 //
282
283                 @Override
284                 public void run() {
285                         Metadata firstMetadata = null;
286                         while (!stopped.get()) {
287                                 try {
288                                         final Metadata lastMetadata = firstMetadata;
289                                         final Metadata metadata = firstMetadata = source.metadata();
290                                         final byte[] buffer;
291                                         try {
292                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
293                                                 buffer = source.get(4096);
294                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
295                                         } catch (IOException ioe1) {
296                                                 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
297                                         }
298                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
299
300                                                 @Override
301                                                 public Callable<Void> apply(final Sink sink) {
302                                                         return new Callable<Void>() {
303
304                                                                 @Override
305                                                                 public Void call() throws Exception {
306                                                                         if (!metadata.equals(lastMetadata)) {
307                                                                                 sink.metadataUpdated(metadata);
308                                                                         }
309                                                                         try {
310                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
311                                                                                 sink.process(buffer);
312                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
313                                                                         } catch (IOException ioe1) {
314                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
315                                                                         }
316                                                                         return null;
317                                                                 }
318                                                         };
319                                                 }
320                                         }).toList());
321                                         /* check all threads for exceptions. */
322                                         for (Future<Void> future : futures) {
323                                                 future.get();
324                                         }
325                                 } catch (IOException e) {
326                                         /* TODO */
327                                         e.printStackTrace();
328                                         break;
329                                 } catch (InterruptedException e) {
330                                         /* TODO */
331                                         e.printStackTrace();
332                                         break;
333                                 } catch (ExecutionException e) {
334                                         /* TODO */
335                                         e.printStackTrace();
336                                         break;
337                                 }
338                         }
339                 }
340
341         }
342
343 }