Open the source, too.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.ListMultimap;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Multimap;
42 import com.google.common.util.concurrent.MoreExecutors;
43
44 /**
45  * A pipeline is responsible for streaming audio data from a {@link Filter} to
46  * an arbitrary number of connected {@link Filter}s.
47  *
48  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
49  */
50 public class Pipeline implements Iterable<Filter> {
51
52         /** The logger. */
53         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
54
55         /** The source of the audio stream. */
56         private final Filter source;
57
58         /** The filters for each source. */
59         private final ListMultimap<Filter, Filter> filters;
60
61         /** All started connections. */
62         private final List<Connection> connections = Lists.newArrayList();
63
64         /**
65          * Creates a new pipeline.
66          *
67          * @param source
68          *              The source of the audio stream
69          * @param filters
70          *              The filters for each source
71          */
72         private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
73                 this.source = Preconditions.checkNotNull(source, "source must not be null");
74                 this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
75                 for (Filter filter : Lists.reverse(filters())) {
76                         logger.finest(String.format("Adding Listener to %s.", filter.name()));
77                         filter.addMetadataListener(new MetadataListener() {
78
79                                 @Override
80                                 public void metadataUpdated(Filter filter, Metadata metadata) {
81                                         for (Filter sinks : filters(filter)) {
82                                                 logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
83                                                 sinks.metadataUpdated(metadata);
84                                         }
85                                 }
86                         });
87                 }
88         }
89
90         //
91         // ACCESSORS
92         //
93
94         /**
95          * Expose this pipeline’s source.
96          *
97          * @return This pipeline’s source
98          */
99         public Filter source() {
100                 return source;
101         }
102
103         /**
104          * Returns all {@link Filter}s that are connected to the given filter.
105          *
106          * @param filter
107          *              The filter to get the connected filters for
108          * @return The filters connected to the given filter, or an empty list if the
109          *         filter does not exist in this pipeline, or is not connected to any filters
110          */
111         public List<Filter> filters(Filter filter) {
112                 return filters.get(filter);
113         }
114
115         /**
116          * Returns the traffic counters of the given filter.
117          *
118          * @param filter
119          *              The filter to get the traffic counters for
120          * @return The traffic counters for the given filter
121          */
122         public TrafficCounter trafficCounter(Filter filter) {
123                 long input = -1;
124                 long output = -1;
125                 for (Connection connection : connections) {
126                         /* the connection where the source matches knows the output. */
127                         if (connection.source.equals(filter)) {
128                                 output = connection.counter();
129                         } else if (connection.sinks.contains(filter)) {
130                                 input = connection.counter();
131                         }
132                 }
133                 return new TrafficCounter(input, output);
134         }
135
136         //
137         // ACTIONS
138         //
139
140         /**
141          * Starts the pipeline.
142          *
143          * @throws IOException
144          *              if any of the filters can not be opened
145          * @throws IllegalStateException
146          *              if the pipeline is already running
147          */
148         public void start() throws IOException, IllegalStateException {
149                 if (!connections.isEmpty()) {
150                         throw new IllegalStateException("Pipeline is already running!");
151                 }
152                 List<Filter> filters = Lists.newArrayList();
153                 filters.add(source);
154                 source.open(Metadata.UNKNOWN);
155                 /* collect all source->sink pairs. */
156                 while (!filters.isEmpty()) {
157                         Filter filter = filters.remove(0);
158                         Collection<Filter> sinks = this.filters.get(filter);
159                         connections.add(new Connection(filter, sinks));
160                         for (Filter sink : sinks) {
161                                 logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
162                                 sink.open(filter.metadata());
163                                 filters.add(sink);
164                         }
165                 }
166                 for (Connection connection : connections) {
167                         String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
168
169                                 @Override
170                                 public String apply(Filter sink) {
171                                         return sink.name();
172                                 }
173                         }));
174                         logger.info(String.format("Starting Thread: %s", threadName));
175                         new Thread(connection, threadName).start();
176                 }
177         }
178
179         public void stop() {
180                 if (!connections.isEmpty()) {
181                         /* pipeline is not running. */
182                         return;
183                 }
184                 for (Connection connection : connections) {
185                         connection.stop();
186                 }
187         }
188
189         //
190         // ITERABLE METHODS
191         //
192
193         @Override
194         public Iterator<Filter> iterator() {
195                 return filters().iterator();
196         }
197
198         //
199         // PRIVATE METHODS
200         //
201
202         /**
203          * Returns all filters of this pipeline, listed breadth-first, starting with
204          * the source.
205          *
206          * @return All filters of this pipeline
207          */
208         public List<Filter> filters() {
209                 ImmutableList.Builder<Filter> filters = ImmutableList.builder();
210                 List<Filter> remainingFilters = Lists.newArrayList();
211                 filters.add(source);
212                 remainingFilters.add(source);
213                 while (!remainingFilters.isEmpty()) {
214                         Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
215                         for (Filter sink : sinks) {
216                                 filters.add(sink);
217                                 remainingFilters.add(sink);
218                         }
219                 }
220                 return filters.build();
221         }
222
223         //
224         // STATIC METHODS
225         //
226
227         /**
228          * Returns a new pipeline builder.
229          *
230          * @param source
231          *              The source at which to start
232          * @return A builder for a new pipeline
233          */
234         public static Builder builder(Filter source) {
235                 return new Builder(source);
236         }
237
238         /**
239          * A builder for a {@link Pipeline}.
240          *
241          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
242          */
243         public static class Builder {
244
245                 /** The source of the pipeline. */
246                 private final Filter source;
247
248                 /** The filters to which each source streams. */
249                 private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
250
251                 /** The last added source. */
252                 private Filter lastSource;
253
254                 /**
255                  * Creates a new builder.
256                  *
257                  * @param source
258                  *              The source that starts the pipeline
259                  */
260                 private Builder(Filter source) {
261                         this.source = source;
262                         lastSource = source;
263                 }
264
265                 /**
266                  * Adds a {@link Filter} as a recipient for the last added source.
267                  *
268                  * @param sink
269                  *              The sink to add
270                  * @return This builder
271                  */
272                 public Builder to(Filter sink) {
273                         nextSinks.put(lastSource, sink);
274                         lastSource = sink;
275                         return this;
276                 }
277
278                 /**
279                  * Locates the given source and sets it as the last added node so that the
280                  * next invocation of {@link #to(Filter)} can “fork” the pipeline.
281                  *
282                  * @param source
283                  *              The source to locate
284                  * @return This builder
285                  * @throws IllegalStateException
286                  *              if the given source was not previously added as a sink
287                  */
288                 public Builder find(Filter source) {
289                         Preconditions.checkState(nextSinks.containsValue(source));
290                         lastSource = source;
291                         return this;
292                 }
293
294                 /**
295                  * Builds the pipeline.
296                  *
297                  * @return The created pipeline
298                  */
299                 public Pipeline build() {
300                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
301                 }
302
303         }
304
305         /**
306          * A connection is responsible for streaming audio from one {@link Filter} to
307          * an arbitrary number of {@link Filter}s it is connected to. A connection is
308          * started by creating a {@link Thread} wrapping it and starting said thread.
309          *
310          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
311          */
312         public class Connection implements Runnable {
313
314                 /** The source. */
315                 private final Filter source;
316
317                 /** The filters. */
318                 private final Collection<Filter> sinks;
319
320                 /** Whether the feeder was stopped. */
321                 private final AtomicBoolean stopped = new AtomicBoolean(false);
322
323                 /** The executor service. */
324                 private final ExecutorService executorService;
325
326                 /** The time the connection was started. */
327                 private long startTime;
328
329                 /** The number of copied bytes. */
330                 private long counter;
331
332                 /**
333                  * Creates a new connection.
334                  *
335                  * @param source
336                  *              The source of the stream
337                  * @param sinks
338                  *              The filters to which to stream
339                  */
340                 public Connection(Filter source, Collection<Filter> sinks) {
341                         this.source = source;
342                         this.sinks = sinks;
343                         if (sinks.size() < 2) {
344                                 executorService = MoreExecutors.sameThreadExecutor();
345                         } else {
346                                 executorService = Executors.newCachedThreadPool();
347                         }
348                 }
349
350                 //
351                 // ACCESSORS
352                 //
353
354                 /**
355                  * Returns the time this connection was started.
356                  *
357                  * @return The time this connection was started (in milliseconds since Jan 1,
358                  *         1970 UTC)
359                  */
360                 public long startTime() {
361                         return startTime;
362                 }
363
364                 /**
365                  * Returns the number of bytes that this connection has received from its
366                  * source during its lifetime.
367                  *
368                  * @return The number of processed input bytes
369                  */
370                 public long counter() {
371                         return counter;
372                 }
373
374                 //
375                 // ACTIONS
376                 //
377
378                 /** Stops this connection. */
379                 public void stop() {
380                         stopped.set(true);
381                 }
382
383                 //
384                 // RUNNABLE METHODS
385                 //
386
387                 @Override
388                 public void run() {
389                         startTime = System.currentTimeMillis();
390                         while (!stopped.get()) {
391                                 try {
392                                         final byte[] buffer;
393                                         try {
394                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
395                                                 buffer = source.get(4096);
396                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
397                                         } catch (IOException ioe1) {
398                                                 throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
399                                         }
400                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
401
402                                                 @Override
403                                                 public Callable<Void> apply(final Filter sink) {
404                                                         return new Callable<Void>() {
405
406                                                                 @Override
407                                                                 public Void call() throws Exception {
408                                                                         try {
409                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
410                                                                                 sink.process(buffer);
411                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
412                                                                         } catch (IOException ioe1) {
413                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
414                                                                         }
415                                                                         return null;
416                                                                 }
417                                                         };
418                                                 }
419                                         }).toList());
420                                         /* check all threads for exceptions. */
421                                         for (Future<Void> future : futures) {
422                                                 future.get();
423                                         }
424                                         counter += buffer.length;
425                                 } catch (IOException e) {
426                                         /* TODO */
427                                         e.printStackTrace();
428                                         break;
429                                 } catch (InterruptedException e) {
430                                         /* TODO */
431                                         e.printStackTrace();
432                                         break;
433                                 } catch (ExecutionException e) {
434                                         /* TODO */
435                                         e.printStackTrace();
436                                         break;
437                                 }
438                         }
439                 }
440
441         }
442
443         /**
444          * Container for input and output counters.
445          *
446          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
447          */
448         public static class TrafficCounter {
449
450                 /** The number of input bytes. */
451                 private final long input;
452
453                 /** The number of output bytes. */
454                 private final long output;
455
456                 /**
457                  * Creates a new traffic counter.
458                  *
459                  * @param input
460                  *              The number of input bytes (may be {@code -1} to signify non-available
461                  *              input)
462                  * @param output
463                  *              The number of output bytes (may be {@code -1} to signify non-available
464                  *              output)
465                  */
466                 public TrafficCounter(long input, long output) {
467                         this.input = input;
468                         this.output = output;
469                 }
470
471                 //
472                 // ACCESSORS
473                 //
474
475                 /**
476                  * Returns the number of input bytes.
477                  *
478                  * @return The number of input bytes, or {@link Optional#absent()} if the
479                  *         filter did not receive input
480                  */
481                 public Optional<Long> input() {
482                         return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
483                 }
484
485                 /**
486                  * Returns the number of output bytes.
487                  *
488                  * @return The number of output bytes, or {@link Optional#absent()} if the
489                  *         filter did not send output
490                  */
491                 public Optional<Long> output() {
492                         return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
493                 }
494
495         }
496
497 }