2 * Sonitus - Pipeline.java - Copyright © 2013 David Roden
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 package net.pterodactylus.sonitus.data;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.logging.Logger;
31 import com.google.common.base.Function;
32 import com.google.common.base.Preconditions;
33 import com.google.common.collect.ArrayListMultimap;
34 import com.google.common.collect.FluentIterable;
35 import com.google.common.collect.ImmutableMultimap;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Multimap;
38 import com.google.common.util.concurrent.MoreExecutors;
41 * A pipeline is responsible for streaming audio data from a {@link Source} to
42 * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
44 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
46 public class Pipeline {
49 private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
51 /** The source of the audio stream. */
52 private final Source source;
54 /** The sinks for each source. */
55 private final Multimap<Source, Sink> sinks;
57 /** All started feeders. */
58 private final List<Feeder> feeders = Lists.newArrayList();
61 * Creates a new pipeline.
64 * The source of the audio stream
66 * The sinks for each source
68 private Pipeline(Source source, Multimap<Source, Sink> sinks) {
69 this.source = Preconditions.checkNotNull(source, "source must not be null");
70 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
78 * Starts the pipeline.
81 * if any of the sinks can not be opened
82 * @throws IllegalStateException
83 * if the pipeline is already running
85 public void start() throws IOException, IllegalStateException {
86 if (!feeders.isEmpty()) {
87 throw new IllegalStateException("Pipeline is already running!");
89 List<Source> sources = Lists.newArrayList();
91 /* collect all source->sink pairs. */
92 while (!sources.isEmpty()) {
93 Source source = sources.remove(0);
94 Collection<Sink> sinks = this.sinks.get(source);
95 feeders.add(new Feeder(source, sinks));
96 for (Sink sink : sinks) {
97 sink.open(source.metadata());
98 if (sink instanceof Filter) {
99 sources.add((Source) sink);
103 for (Feeder feeder : feeders) {
104 logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
105 new Thread(feeder).start();
110 if (!feeders.isEmpty()) {
111 /* pipeline is not running. */
114 for (Feeder feeder : feeders) {
124 * Returns a new pipeline builder.
127 * The source at which to start
128 * @return A builder for a new pipeline
130 public static Builder builder(Source source) {
131 return new Builder(source);
135 * A builder for a {@link Pipeline}.
137 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
139 public static class Builder {
141 /** The source of the pipeline. */
142 private final Source source;
144 /** The sinks to which each source streams. */
145 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
147 /** The last added source. */
148 private Source lastSource;
151 * Creates a new builder.
154 * The source that starts the pipeline
156 private Builder(Source source) {
157 this.source = source;
162 * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
167 * @return This builder
168 * @throws IllegalStateException
169 * if the last added {@link Sink} was not also a {@link Source}
171 public Builder to(Sink sink) {
172 Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
173 nextSinks.put(lastSource, sink);
174 lastSource = (sink instanceof Filter) ? (Source) sink : null;
179 * Locates the given source and sets it as the last added node so that the
180 * next invocation of {@link #to(Sink)} can “fork” the pipeline.
183 * The source to locate
184 * @return This builder
185 * @throws IllegalStateException
186 * if the given source was not previously added as a sink
188 public Builder find(Source source) {
189 Preconditions.checkState(nextSinks.containsValue(source));
195 * Builds the pipeline.
197 * @return The created pipeline
199 public Pipeline build() {
200 return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
206 * A feeder is responsible for streaming audio from one {@link Source} to an
207 * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
208 * creating a {@link Thread} wrapping it and starting said thread.
210 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
212 private class Feeder implements Runnable {
215 private final Source source;
218 private final Collection<Sink> sinks;
220 /** Whether the feeder was stopped. */
221 private final AtomicBoolean stopped = new AtomicBoolean(false);
223 /** The executor service. */
224 private final ExecutorService executorService;
227 * Creates a new feeder.
230 * The source of the stream
232 * The sinks to which to stream
234 public Feeder(Source source, Collection<Sink> sinks) {
235 this.source = source;
237 if (sinks.size() == 1) {
238 executorService = MoreExecutors.sameThreadExecutor();
240 executorService = Executors.newCachedThreadPool();
248 /** Stops this feeder. */
259 Metadata firstMetadata = null;
260 while (!stopped.get()) {
262 final Metadata lastMetadata = firstMetadata;
263 final Metadata metadata = firstMetadata = source.metadata();
266 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
267 buffer = source.get(4096);
268 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
269 } catch (IOException ioe1) {
270 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
272 List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
275 public Callable<Void> apply(final Sink sink) {
276 return new Callable<Void>() {
279 public Void call() throws Exception {
280 if (!metadata.equals(lastMetadata)) {
281 sink.metadataUpdated(metadata);
284 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
285 sink.process(buffer);
286 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
287 } catch (IOException ioe1) {
288 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
295 /* check all threads for exceptions. */
296 for (Future<Void> future : futures) {
299 } catch (IOException e) {
303 } catch (InterruptedException e) {
307 } catch (ExecutionException e) {