2 * Sonitus - Pipeline.java - Copyright © 2013 David Roden
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 package net.pterodactylus.sonitus.data;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.ListMultimap;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Multimap;
42 import com.google.common.util.concurrent.MoreExecutors;
45 * A pipeline is responsible for streaming audio data from a {@link Filter} to
46 * an arbitrary number of connected {@link Filter}s.
48 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
50 public class Pipeline implements Iterable<Filter> {
53 private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
55 /** The source of the audio stream. */
56 private final Filter source;
58 /** The filters for each source. */
59 private final ListMultimap<Filter, Filter> filters;
61 /** All started connections. */
62 private final List<Connection> connections = Lists.newArrayList();
65 * Creates a new pipeline.
68 * The source of the audio stream
70 * The filters for each source
72 private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
73 this.source = Preconditions.checkNotNull(source, "source must not be null");
74 this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
82 * Expose this pipeline’s source.
84 * @return This pipeline’s source
86 public Filter source() {
91 * Returns all {@link Filter}s that are connected to the given filter.
94 * The filter to get the connected filters for
95 * @return The filters connected to the given filter, or an empty list if the
96 * filter does not exist in this pipeline, or is not connected to any filters
98 public List<Filter> filters(Filter filter) {
99 return filters.get(filter);
103 * Returns the traffic counters of the given filter.
106 * The filter to get the traffic counters for
107 * @return The traffic counters for the given filter
109 public TrafficCounter trafficCounter(Filter filter) {
112 for (Connection connection : connections) {
113 /* the connection where the source matches knows the output. */
114 if (connection.source.equals(filter)) {
115 output = connection.counter();
116 } else if (connection.sinks.contains(filter)) {
117 input = connection.counter();
120 return new TrafficCounter(input, output);
128 * Starts the pipeline.
130 * @throws IOException
131 * if any of the filters can not be opened
132 * @throws IllegalStateException
133 * if the pipeline is already running
135 public void start() throws IOException, IllegalStateException {
136 if (!connections.isEmpty()) {
137 throw new IllegalStateException("Pipeline is already running!");
139 List<Filter> filters = Lists.newArrayList();
141 Metadata currentMetadata = Metadata.UNKNOWN;
142 /* collect all source->sink pairs. */
143 while (!filters.isEmpty()) {
144 Filter filter = filters.remove(0);
145 logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
146 filter.open(currentMetadata);
147 currentMetadata = filter.metadata();
148 Collection<Filter> sinks = this.filters.get(filter);
149 connections.add(new Connection(filter, sinks));
150 for (Filter sink : sinks) {
154 for (Connection connection : connections) {
155 String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
158 public String apply(Filter sink) {
162 logger.info(String.format("Starting Thread: %s", threadName));
163 new Thread(connection, threadName).start();
168 if (!connections.isEmpty()) {
169 /* pipeline is not running. */
172 for (Connection connection : connections) {
182 public Iterator<Filter> iterator() {
183 return filters().iterator();
191 * Returns all filters of this pipeline, listed breadth-first, starting with
194 * @return All filters of this pipeline
196 public List<Filter> filters() {
197 ImmutableList.Builder<Filter> filters = ImmutableList.builder();
198 List<Filter> remainingFilters = Lists.newArrayList();
200 remainingFilters.add(source);
201 while (!remainingFilters.isEmpty()) {
202 Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
203 for (Filter sink : sinks) {
205 remainingFilters.add(sink);
208 return filters.build();
216 * Returns a new pipeline builder.
219 * The source at which to start
220 * @return A builder for a new pipeline
222 public static Builder builder(Filter source) {
223 return new Builder(source);
227 * A builder for a {@link Pipeline}.
229 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
231 public static class Builder {
233 /** The source of the pipeline. */
234 private final Filter source;
236 /** The filters to which each source streams. */
237 private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
239 /** The last added source. */
240 private Filter lastSource;
243 * Creates a new builder.
246 * The source that starts the pipeline
248 private Builder(Filter source) {
249 this.source = source;
254 * Adds a {@link Filter} as a recipient for the last added source.
258 * @return This builder
260 public Builder to(Filter sink) {
261 nextSinks.put(lastSource, sink);
267 * Locates the given source and sets it as the last added node so that the
268 * next invocation of {@link #to(Filter)} can “fork” the pipeline.
271 * The source to locate
272 * @return This builder
273 * @throws IllegalStateException
274 * if the given source was not previously added as a sink
276 public Builder find(Filter source) {
277 Preconditions.checkState(nextSinks.containsValue(source));
283 * Builds the pipeline.
285 * @return The created pipeline
287 public Pipeline build() {
288 return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
294 * A connection is responsible for streaming audio from one {@link Filter} to
295 * an arbitrary number of {@link Filter}s it is connected to. A connection is
296 * started by creating a {@link Thread} wrapping it and starting said thread.
298 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
300 public static class Connection implements Runnable {
303 private static final Logger logger = Logger.getLogger(Connection.class.getName());
306 private final Filter source;
309 private final Collection<Filter> sinks;
311 /** Whether the feeder was stopped. */
312 private final AtomicBoolean stopped = new AtomicBoolean(false);
314 /** The executor service. */
315 private final ExecutorService executorService;
317 /** The time the connection was started. */
318 private long startTime;
320 /** The number of copied bytes. */
321 private long counter;
323 /** The exception that was encountered, if any. */
324 private Optional<IOException> ioException = Optional.absent();
327 * Creates a new connection.
330 * The source of the stream
332 * The filters to which to stream
334 public Connection(Filter source, Collection<Filter> sinks) {
335 this.source = source;
337 if (sinks.size() < 2) {
338 executorService = MoreExecutors.sameThreadExecutor();
340 executorService = Executors.newCachedThreadPool();
349 * Returns the source of this connection.
351 * @return The source of this connection
353 public Filter source() {
358 * Returns the sinks of this connection.
360 * @return The sinks of this connection
362 public Collection<Filter> sinks() {
367 * Returns the time this connection was started.
369 * @return The time this connection was started (in milliseconds since Jan 1,
372 public long startTime() {
377 * Returns the number of bytes that this connection has received from its
378 * source during its lifetime.
380 * @return The number of processed input bytes
382 public long counter() {
387 * Returns the I/O exception that was encountered while processing this
390 * @return The I/O exception that occured, or {@link Optional#absent()} if no
393 public Optional<IOException> ioException() {
401 /** Stops this connection. */
412 startTime = System.currentTimeMillis();
413 while (!stopped.get()) {
415 final DataPacket dataPacket;
416 logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
417 dataPacket = source.get(4096);
418 logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
419 List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
422 public Callable<Void> apply(final Filter sink) {
423 return new Callable<Void>() {
426 public Void call() throws Exception {
427 logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
428 sink.process(dataPacket);
429 logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
435 /* check all threads for exceptions. */
436 for (Future<Void> future : futures) {
439 counter += dataPacket.buffer().length;
440 } catch (IOException e) {
441 ioException = Optional.of(e);
443 } catch (InterruptedException e) {
447 } catch (ExecutionException e) {
458 * Container for input and output counters.
460 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
462 public static class TrafficCounter {
464 /** The number of input bytes. */
465 private final long input;
467 /** The number of output bytes. */
468 private final long output;
471 * Creates a new traffic counter.
474 * The number of input bytes (may be {@code -1} to signify non-available
477 * The number of output bytes (may be {@code -1} to signify non-available
480 public TrafficCounter(long input, long output) {
482 this.output = output;
490 * Returns the number of input bytes.
492 * @return The number of input bytes, or {@link Optional#absent()} if the
493 * filter did not receive input
495 public Optional<Long> input() {
496 return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
500 * Returns the number of output bytes.
502 * @return The number of output bytes, or {@link Optional#absent()} if the
503 * filter did not send output
505 public Optional<Long> output() {
506 return (output == -1) ? Optional.<Long>absent() : Optional.of(output);