2 * Sonitus - Pipeline.java - Copyright © 2013 David Roden
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 package net.pterodactylus.sonitus.data;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Multimap;
41 import com.google.common.util.concurrent.MoreExecutors;
44 * A pipeline is responsible for streaming audio data from a {@link Source} to
45 * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
47 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
49 public class Pipeline implements Iterable<ControlledComponent> {
52 private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
54 /** The source of the audio stream. */
55 private final Source source;
57 /** The sinks for each source. */
58 private final Multimap<Source, Sink> sinks;
60 /** All started connections. */
61 private final List<Connection> connections = Lists.newArrayList();
64 * Creates a new pipeline.
67 * The source of the audio stream
69 * The sinks for each source
71 private Pipeline(Source source, Multimap<Source, Sink> sinks) {
72 this.source = Preconditions.checkNotNull(source, "source must not be null");
73 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
74 for (ControlledComponent component : Lists.reverse(components())) {
75 logger.finest(String.format("Adding Listener to %s.", component.name()));
76 component.addMetadataListener(new MetadataListener() {
78 public void metadataUpdated(ControlledComponent component, Metadata metadata) {
79 if (!(component instanceof Source)) {
82 for (ControlledComponent controlledComponent : sinks((Source) component)) {
83 logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
84 controlledComponent.metadataUpdated(metadata);
96 * Expose this pipeline’s source.
98 * @return This pipeline’s source
100 public Source source() {
105 * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
109 * The source to get the sinks for
110 * @return The sinks connected to the given source, or an empty list if the
111 * source does not exist in this pipeline
113 public Collection<Sink> sinks(Source source) {
114 return sinks.get(source);
118 * Returns the traffic counters of the given controlled component.
120 * @param controlledComponent
121 * The controlled component to get the traffic counters for
122 * @return The traffic counters for the given controlled component
124 public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
127 for (Connection connection : connections) {
128 /* the connection where the source matches knows the output. */
129 if (connection.source.equals(controlledComponent)) {
130 output = connection.counter();
131 } else if (connection.sinks.contains(controlledComponent)) {
132 input = connection.counter();
135 return new TrafficCounter(input, output);
143 * Starts the pipeline.
145 * @throws IOException
146 * if any of the sinks can not be opened
147 * @throws IllegalStateException
148 * if the pipeline is already running
150 public void start() throws IOException, IllegalStateException {
151 if (!connections.isEmpty()) {
152 throw new IllegalStateException("Pipeline is already running!");
154 List<Source> sources = Lists.newArrayList();
156 /* collect all source->sink pairs. */
157 while (!sources.isEmpty()) {
158 Source source = sources.remove(0);
159 Collection<Sink> sinks = this.sinks.get(source);
160 connections.add(new Connection(source, sinks));
161 for (Sink sink : sinks) {
162 logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
163 sink.open(source.metadata());
164 if (sink instanceof Filter) {
165 sources.add((Source) sink);
169 for (Connection connection : connections) {
170 String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
173 public String apply(Sink sink) {
177 logger.info(String.format("Starting Thread: %s", threadName));
178 new Thread(connection, threadName).start();
183 if (!connections.isEmpty()) {
184 /* pipeline is not running. */
187 for (Connection connection : connections) {
197 public Iterator<ControlledComponent> iterator() {
198 return components().iterator();
206 * Returns all components of this pipeline, listed breadth-first, starting with
209 * @return All components of this pipeline
211 public List<ControlledComponent> components() {
212 ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
213 List<ControlledComponent> currentComponents = Lists.newArrayList();
214 components.add(source);
215 currentComponents.add(source);
216 while (!currentComponents.isEmpty()) {
217 Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
218 for (Sink sink : sinks) {
219 components.add(sink);
220 if (sink instanceof Source) {
221 currentComponents.add(sink);
225 return components.build();
233 * Returns a new pipeline builder.
236 * The source at which to start
237 * @return A builder for a new pipeline
239 public static Builder builder(Source source) {
240 return new Builder(source);
244 * A builder for a {@link Pipeline}.
246 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
248 public static class Builder {
250 /** The source of the pipeline. */
251 private final Source source;
253 /** The sinks to which each source streams. */
254 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
256 /** The last added source. */
257 private Source lastSource;
260 * Creates a new builder.
263 * The source that starts the pipeline
265 private Builder(Source source) {
266 this.source = source;
271 * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
276 * @return This builder
277 * @throws IllegalStateException
278 * if the last added {@link Sink} was not also a {@link Source}
280 public Builder to(Sink sink) {
281 Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
282 nextSinks.put(lastSource, sink);
283 lastSource = (sink instanceof Filter) ? (Source) sink : null;
288 * Locates the given source and sets it as the last added node so that the
289 * next invocation of {@link #to(Sink)} can “fork” the pipeline.
292 * The source to locate
293 * @return This builder
294 * @throws IllegalStateException
295 * if the given source was not previously added as a sink
297 public Builder find(Source source) {
298 Preconditions.checkState(nextSinks.containsValue(source));
304 * Builds the pipeline.
306 * @return The created pipeline
308 public Pipeline build() {
309 return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
315 * A connection is responsible for streaming audio from one {@link Source} to
316 * an arbitrary number of {@link Sink}s it is connected to. A connection is
317 * started by creating a {@link Thread} wrapping it and starting said thread.
319 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
321 public class Connection implements Runnable {
324 private final Source source;
327 private final Collection<Sink> sinks;
329 /** Whether the feeder was stopped. */
330 private final AtomicBoolean stopped = new AtomicBoolean(false);
332 /** The executor service. */
333 private final ExecutorService executorService;
335 /** The time the connection was started. */
336 private long startTime;
338 /** The number of copied bytes. */
339 private long counter;
342 * Creates a new connection.
345 * The source of the stream
347 * The sinks to which to stream
349 public Connection(Source source, Collection<Sink> sinks) {
350 this.source = source;
352 if (sinks.size() < 2) {
353 executorService = MoreExecutors.sameThreadExecutor();
355 executorService = Executors.newCachedThreadPool();
364 * Returns the time this connection was started.
366 * @return The time this connection was started (in milliseconds since Jan 1,
369 public long startTime() {
374 * Returns the number of bytes that this connection has received from its
375 * source during its lifetime.
377 * @return The number of processed input bytes
379 public long counter() {
387 /** Stops this connection. */
398 startTime = System.currentTimeMillis();
399 while (!stopped.get()) {
403 logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
404 buffer = source.get(4096);
405 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
406 } catch (IOException ioe1) {
407 throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
409 List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
412 public Callable<Void> apply(final Sink sink) {
413 return new Callable<Void>() {
416 public Void call() throws Exception {
418 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
419 sink.process(buffer);
420 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
421 } catch (IOException ioe1) {
422 throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
429 /* check all threads for exceptions. */
430 for (Future<Void> future : futures) {
433 counter += buffer.length;
434 } catch (IOException e) {
438 } catch (InterruptedException e) {
442 } catch (ExecutionException e) {
453 * Container for input and output counters.
455 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
457 public static class TrafficCounter {
459 /** The number of input bytes. */
460 private final long input;
462 /** The number of output bytes. */
463 private final long output;
466 * Creates a new traffic counter.
469 * The number of input bytes (may be {@code -1} to signify non-available
472 * The number of output bytes (may be {@code -1} to signify non-available
475 public TrafficCounter(long input, long output) {
477 this.output = output;
485 * Returns the number of input bytes.
487 * @return The number of input bytes, or {@link Optional#absent()} if the
488 * component can not receive input
490 public Optional<Long> input() {
491 return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
495 * Returns the number of output bytes.
497 * @return The number of output bytes, or {@link Optional#absent()} if the
498 * component can not send output
500 public Optional<Long> output() {
501 return (output == -1) ? Optional.<Long>absent() : Optional.of(output);