4e6a457ad8c87704fbb4e4489e6170ad04e132d1
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.logging.Logger;
30
31 import com.google.common.base.Function;
32 import com.google.common.base.Preconditions;
33 import com.google.common.collect.ArrayListMultimap;
34 import com.google.common.collect.FluentIterable;
35 import com.google.common.collect.ImmutableMultimap;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Multimap;
38 import com.google.common.util.concurrent.MoreExecutors;
39
40 /**
41  * A pipeline is responsible for streaming audio data from a {@link Source} to
42  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
43  *
44  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
45  */
46 public class Pipeline {
47
48         /** The logger. */
49         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
50
51         /** The source of the audio stream. */
52         private final Source source;
53
54         /** The sinks for each source. */
55         private final Multimap<Source, Sink> sinks;
56
57         /** All started feeders. */
58         private final List<Feeder> feeders = Lists.newArrayList();
59
60         /**
61          * Creates a new pipeline.
62          *
63          * @param source
64          *              The source of the audio stream
65          * @param sinks
66          *              The sinks for each source
67          */
68         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
69                 this.source = Preconditions.checkNotNull(source, "source must not be null");
70                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
71         }
72
73         //
74         // ACTIONS
75         //
76
77         /**
78          * Starts the pipeline.
79          *
80          * @throws IOException
81          *              if any of the sinks can not be opened
82          * @throws IllegalStateException
83          *              if the pipeline is already running
84          */
85         public void start() throws IOException, IllegalStateException {
86                 if (!feeders.isEmpty()) {
87                         throw new IllegalStateException("Pipeline is already running!");
88                 }
89                 List<Source> sources = Lists.newArrayList();
90                 sources.add(source);
91                 /* collect all source->sink pairs. */
92                 while (!sources.isEmpty()) {
93                         Source source = sources.remove(0);
94                         Collection<Sink> sinks = this.sinks.get(source);
95                         feeders.add(new Feeder(source, sinks));
96                         for (Sink sink : sinks) {
97                                 sink.open(source.metadata());
98                                 if (sink instanceof Filter) {
99                                         sources.add((Source) sink);
100                                 }
101                         }
102                 }
103                 for (Feeder feeder : feeders) {
104                         logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
105                         new Thread(feeder).start();
106                 }
107         }
108
109         public void stop() {
110                 if (!feeders.isEmpty()) {
111                         /* pipeline is not running. */
112                         return;
113                 }
114                 for (Feeder feeder : feeders) {
115                         feeder.stop();
116                 }
117         }
118
119         //
120         // STATIC METHODS
121         //
122
123         /**
124          * Returns a new pipeline builder.
125          *
126          * @param source
127          *              The source at which to start
128          * @return A builder for a new pipeline
129          */
130         public static Builder builder(Source source) {
131                 return new Builder(source);
132         }
133
134         /**
135          * A builder for a {@link Pipeline}.
136          *
137          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
138          */
139         public static class Builder {
140
141                 /** The source of the pipeline. */
142                 private final Source source;
143
144                 /** The sinks to which each source streams. */
145                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
146
147                 /** The last added source. */
148                 private Source lastSource;
149
150                 /**
151                  * Creates a new builder.
152                  *
153                  * @param source
154                  *              The source that starts the pipeline
155                  */
156                 private Builder(Source source) {
157                         this.source = source;
158                         lastSource = source;
159                 }
160
161                 /**
162                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
163                  * {@link Source}.
164                  *
165                  * @param sink
166                  *              The sink to add
167                  * @return This builder
168                  * @throws IllegalStateException
169                  *              if the last added {@link Sink} was not also a {@link Source}
170                  */
171                 public Builder to(Sink sink) {
172                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
173                         nextSinks.put(lastSource, sink);
174                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
175                         return this;
176                 }
177
178                 /**
179                  * Locates the given source and sets it as the last added node so that the
180                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
181                  *
182                  * @param source
183                  *              The source to locate
184                  * @return This builder
185                  * @throws IllegalStateException
186                  *              if the given source was not previously added as a sink
187                  */
188                 public Builder find(Source source) {
189                         Preconditions.checkState(nextSinks.containsValue(source));
190                         lastSource = source;
191                         return this;
192                 }
193
194                 /**
195                  * Builds the pipeline.
196                  *
197                  * @return The created pipeline
198                  */
199                 public Pipeline build() {
200                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
201                 }
202
203         }
204
205         /**
206          * A feeder is responsible for streaming audio from one {@link Source} to an
207          * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
208          * creating a {@link Thread} wrapping it and starting said thread.
209          *
210          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
211          */
212         private class Feeder implements Runnable {
213
214                 /** The source. */
215                 private final Source source;
216
217                 /** The sinks. */
218                 private final Collection<Sink> sinks;
219
220                 /** Whether the feeder was stopped. */
221                 private final AtomicBoolean stopped = new AtomicBoolean(false);
222
223                 /** The executor service. */
224                 private final ExecutorService executorService;
225
226                 /**
227                  * Creates a new feeder.
228                  *
229                  * @param source
230                  *              The source of the stream
231                  * @param sinks
232                  *              The sinks to which to stream
233                  */
234                 public Feeder(Source source, Collection<Sink> sinks) {
235                         this.source = source;
236                         this.sinks = sinks;
237                         if (sinks.size() == 1) {
238                                 executorService = MoreExecutors.sameThreadExecutor();
239                         } else {
240                                 executorService = Executors.newCachedThreadPool();
241                         }
242                 }
243
244                 //
245                 // ACTIONS
246                 //
247
248                 /** Stops this feeder. */
249                 public void stop() {
250                         stopped.set(true);
251                 }
252
253                 //
254                 // RUNNABLE METHODS
255                 //
256
257                 @Override
258                 public void run() {
259                         Metadata firstMetadata = null;
260                         while (!stopped.get()) {
261                                 try {
262                                         final Metadata lastMetadata = firstMetadata;
263                                         final Metadata metadata = firstMetadata = source.metadata();
264                                         final byte[] buffer;
265                                         try {
266                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
267                                                 buffer = source.get(4096);
268                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
269                                         } catch (IOException ioe1) {
270                                                 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
271                                         }
272                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
273
274                                                 @Override
275                                                 public Callable<Void> apply(final Sink sink) {
276                                                         return new Callable<Void>() {
277
278                                                                 @Override
279                                                                 public Void call() throws Exception {
280                                                                         if (!metadata.equals(lastMetadata)) {
281                                                                                 sink.metadataUpdated(metadata);
282                                                                         }
283                                                                         try {
284                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
285                                                                                 sink.process(buffer);
286                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
287                                                                         } catch (IOException ioe1) {
288                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
289                                                                         }
290                                                                         return null;
291                                                                 }
292                                                         };
293                                                 }
294                                         }).toList());
295                                         /* check all threads for exceptions. */
296                                         for (Future<Void> future : futures) {
297                                                 future.get();
298                                         }
299                                 } catch (IOException e) {
300                                         /* TODO */
301                                         e.printStackTrace();
302                                         break;
303                                 } catch (InterruptedException e) {
304                                         /* TODO */
305                                         e.printStackTrace();
306                                         break;
307                                 } catch (ExecutionException e) {
308                                         /* TODO */
309                                         e.printStackTrace();
310                                         break;
311                                 }
312                         }
313                 }
314
315         }
316
317 }