2 * Sonitus - Pipeline.java - Copyright © 2013 David Roden
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 package net.pterodactylus.sonitus.data;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
32 import com.google.common.base.Function;
33 import com.google.common.base.Preconditions;
34 import com.google.common.collect.ArrayListMultimap;
35 import com.google.common.collect.FluentIterable;
36 import com.google.common.collect.ImmutableMultimap;
37 import com.google.common.collect.ImmutableSet;
38 import com.google.common.collect.Lists;
39 import com.google.common.collect.Multimap;
40 import com.google.common.util.concurrent.MoreExecutors;
43 * A pipeline is responsible for streaming audio data from a {@link Source} to
44 * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
46 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
48 public class Pipeline implements Iterable<Controlled> {
51 private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
53 /** The source of the audio stream. */
54 private final Source source;
56 /** The sinks for each source. */
57 private final Multimap<Source, Sink> sinks;
59 /** All started connections. */
60 private final List<Connection> connections = Lists.newArrayList();
63 * Creates a new pipeline.
66 * The source of the audio stream
68 * The sinks for each source
70 private Pipeline(Source source, Multimap<Source, Sink> sinks) {
71 this.source = Preconditions.checkNotNull(source, "source must not be null");
72 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
80 * Expose this pipeline’s source.
82 * @return This pipeline’s source
84 public Source source() {
89 * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
93 * The source to get the sinks for
94 * @return The sinks connected to the given source, or an empty list if the
95 * source does not exist in this pipeline
97 public Collection<Sink> sinks(Source source) {
98 return sinks.get(source);
106 * Starts the pipeline.
108 * @throws IOException
109 * if any of the sinks can not be opened
110 * @throws IllegalStateException
111 * if the pipeline is already running
113 public void start() throws IOException, IllegalStateException {
114 if (!connections.isEmpty()) {
115 throw new IllegalStateException("Pipeline is already running!");
117 List<Source> sources = Lists.newArrayList();
119 /* collect all source->sink pairs. */
120 while (!sources.isEmpty()) {
121 Source source = sources.remove(0);
122 Collection<Sink> sinks = this.sinks.get(source);
123 connections.add(new Connection(source, sinks));
124 for (Sink sink : sinks) {
125 sink.open(source.metadata());
126 if (sink instanceof Filter) {
127 sources.add((Source) sink);
131 for (Connection connection : connections) {
132 logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
133 new Thread(connection).start();
138 if (!connections.isEmpty()) {
139 /* pipeline is not running. */
142 for (Connection connection : connections) {
152 public Iterator<Controlled> iterator() {
153 return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
161 * Returns a new pipeline builder.
164 * The source at which to start
165 * @return A builder for a new pipeline
167 public static Builder builder(Source source) {
168 return new Builder(source);
172 * A builder for a {@link Pipeline}.
174 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
176 public static class Builder {
178 /** The source of the pipeline. */
179 private final Source source;
181 /** The sinks to which each source streams. */
182 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
184 /** The last added source. */
185 private Source lastSource;
188 * Creates a new builder.
191 * The source that starts the pipeline
193 private Builder(Source source) {
194 this.source = source;
199 * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
204 * @return This builder
205 * @throws IllegalStateException
206 * if the last added {@link Sink} was not also a {@link Source}
208 public Builder to(Sink sink) {
209 Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
210 nextSinks.put(lastSource, sink);
211 lastSource = (sink instanceof Filter) ? (Source) sink : null;
216 * Locates the given source and sets it as the last added node so that the
217 * next invocation of {@link #to(Sink)} can “fork” the pipeline.
220 * The source to locate
221 * @return This builder
222 * @throws IllegalStateException
223 * if the given source was not previously added as a sink
225 public Builder find(Source source) {
226 Preconditions.checkState(nextSinks.containsValue(source));
232 * Builds the pipeline.
234 * @return The created pipeline
236 public Pipeline build() {
237 return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
243 * A connection is responsible for streaming audio from one {@link Source} to
244 * an arbitrary number of {@link Sink}s it is connected to. A connection is
245 * started by creating a {@link Thread} wrapping it and starting said thread.
247 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
249 public class Connection implements Runnable {
252 private final Source source;
255 private final Collection<Sink> sinks;
257 /** Whether the feeder was stopped. */
258 private final AtomicBoolean stopped = new AtomicBoolean(false);
260 /** The executor service. */
261 private final ExecutorService executorService;
263 /** The number of copied bytes. */
264 private long counter;
267 * Creates a new connection.
270 * The source of the stream
272 * The sinks to which to stream
274 public Connection(Source source, Collection<Sink> sinks) {
275 this.source = source;
277 if (sinks.size() == 1) {
278 executorService = MoreExecutors.sameThreadExecutor();
280 executorService = Executors.newCachedThreadPool();
289 * Returns the number of bytes that this connection has received from its
290 * source during its lifetime.
292 * @return The number of processed input bytes
294 public long counter() {
302 /** Stops this connection. */
313 Metadata firstMetadata = null;
314 while (!stopped.get()) {
316 final Metadata lastMetadata = firstMetadata;
317 final Metadata metadata = firstMetadata = source.metadata();
320 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
321 buffer = source.get(4096);
322 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
323 } catch (IOException ioe1) {
324 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
326 List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
329 public Callable<Void> apply(final Sink sink) {
330 return new Callable<Void>() {
333 public Void call() throws Exception {
334 if (!metadata.equals(lastMetadata)) {
335 sink.metadataUpdated(metadata);
338 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
339 sink.process(buffer);
340 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
341 } catch (IOException ioe1) {
342 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
349 /* check all threads for exceptions. */
350 for (Future<Void> future : futures) {
353 counter += buffer.length;
354 } catch (IOException e) {
358 } catch (InterruptedException e) {
362 } catch (ExecutionException e) {