Fix formatting.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.ListMultimap;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Multimap;
42 import com.google.common.util.concurrent.MoreExecutors;
43
44 /**
45  * A pipeline is responsible for streaming audio data from a {@link Filter} to
46  * an arbitrary number of connected {@link Filter}s.
47  *
48  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
49  */
50 public class Pipeline implements Iterable<Filter> {
51
52         /** The logger. */
53         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
54
55         /** The source of the audio stream. */
56         private final Filter source;
57
58         /** The filters for each source. */
59         private final ListMultimap<Filter, Filter> filters;
60
61         /** All started connections. */
62         private final List<Connection> connections = Lists.newArrayList();
63
64         /**
65          * Creates a new pipeline.
66          *
67          * @param source
68          *              The source of the audio stream
69          * @param filters
70          *              The filters for each source
71          */
72         private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
73                 this.source = Preconditions.checkNotNull(source, "source must not be null");
74                 this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
75         }
76
77         //
78         // ACCESSORS
79         //
80
81         /**
82          * Expose this pipeline’s source.
83          *
84          * @return This pipeline’s source
85          */
86         public Filter source() {
87                 return source;
88         }
89
90         /**
91          * Returns all {@link Filter}s that are connected to the given filter.
92          *
93          * @param filter
94          *              The filter to get the connected filters for
95          * @return The filters connected to the given filter, or an empty list if the
96          *         filter does not exist in this pipeline, or is not connected to any
97          *         filters
98          */
99         public List<Filter> filters(Filter filter) {
100                 return filters.get(filter);
101         }
102
103         /**
104          * Returns the traffic counters of the given filter.
105          *
106          * @param filter
107          *              The filter to get the traffic counters for
108          * @return The traffic counters for the given filter
109          */
110         public TrafficCounter trafficCounter(Filter filter) {
111                 long input = -1;
112                 long output = -1;
113                 for (Connection connection : connections) {
114                         /* the connection where the source matches knows the output. */
115                         if (connection.source.equals(filter)) {
116                                 output = connection.counter();
117                         } else if (connection.sinks.contains(filter)) {
118                                 input = connection.counter();
119                         }
120                 }
121                 return new TrafficCounter(input, output);
122         }
123
124         //
125         // ACTIONS
126         //
127
128         /**
129          * Starts the pipeline.
130          *
131          * @throws IOException
132          *              if any of the filters can not be opened
133          * @throws IllegalStateException
134          *              if the pipeline is already running
135          */
136         public void start() throws IOException, IllegalStateException {
137                 if (!connections.isEmpty()) {
138                         throw new IllegalStateException("Pipeline is already running!");
139                 }
140                 List<Filter> filters = Lists.newArrayList();
141                 filters.add(source);
142                 Metadata currentMetadata = Metadata.UNKNOWN;
143                 /* collect all source->sink pairs. */
144                 while (!filters.isEmpty()) {
145                         Filter filter = filters.remove(0);
146                         logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
147                         filter.open(currentMetadata);
148                         currentMetadata = filter.metadata();
149                         Collection<Filter> sinks = this.filters.get(filter);
150                         connections.add(new Connection(filter, sinks));
151                         for (Filter sink : sinks) {
152                                 filters.add(sink);
153                         }
154                 }
155                 for (Connection connection : connections) {
156                         String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
157
158                                 @Override
159                                 public String apply(Filter sink) {
160                                         return sink.name();
161                                 }
162                         }));
163                         logger.info(String.format("Starting Thread: %s", threadName));
164                         new Thread(connection, threadName).start();
165                 }
166         }
167
168         public void stop() {
169                 if (!connections.isEmpty()) {
170                         /* pipeline is not running. */
171                         return;
172                 }
173                 for (Connection connection : connections) {
174                         connection.stop();
175                 }
176         }
177
178         //
179         // ITERABLE METHODS
180         //
181
182         @Override
183         public Iterator<Filter> iterator() {
184                 return filters().iterator();
185         }
186
187         //
188         // PRIVATE METHODS
189         //
190
191         /**
192          * Returns all filters of this pipeline, listed breadth-first, starting with
193          * the source.
194          *
195          * @return All filters of this pipeline
196          */
197         public List<Filter> filters() {
198                 ImmutableList.Builder<Filter> filters = ImmutableList.builder();
199                 List<Filter> remainingFilters = Lists.newArrayList();
200                 filters.add(source);
201                 remainingFilters.add(source);
202                 while (!remainingFilters.isEmpty()) {
203                         Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
204                         for (Filter sink : sinks) {
205                                 filters.add(sink);
206                                 remainingFilters.add(sink);
207                         }
208                 }
209                 return filters.build();
210         }
211
212         //
213         // STATIC METHODS
214         //
215
216         /**
217          * Returns a new pipeline builder.
218          *
219          * @param source
220          *              The source at which to start
221          * @return A builder for a new pipeline
222          */
223         public static Builder builder(Filter source) {
224                 return new Builder(source);
225         }
226
227         /**
228          * A builder for a {@link Pipeline}.
229          *
230          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
231          */
232         public static class Builder {
233
234                 /** The source of the pipeline. */
235                 private final Filter source;
236
237                 /** The filters to which each source streams. */
238                 private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
239
240                 /** The last added source. */
241                 private Filter lastSource;
242
243                 /**
244                  * Creates a new builder.
245                  *
246                  * @param source
247                  *              The source that starts the pipeline
248                  */
249                 private Builder(Filter source) {
250                         this.source = source;
251                         lastSource = source;
252                 }
253
254                 /**
255                  * Adds a {@link Filter} as a recipient for the last added source.
256                  *
257                  * @param sink
258                  *              The sink to add
259                  * @return This builder
260                  */
261                 public Builder to(Filter sink) {
262                         nextSinks.put(lastSource, sink);
263                         lastSource = sink;
264                         return this;
265                 }
266
267                 /**
268                  * Locates the given source and sets it as the last added node so that the
269                  * next invocation of {@link #to(Filter)} can “fork” the pipeline.
270                  *
271                  * @param source
272                  *              The source to locate
273                  * @return This builder
274                  * @throws IllegalStateException
275                  *              if the given source was not previously added as a sink
276                  */
277                 public Builder find(Filter source) {
278                         Preconditions.checkState(nextSinks.containsValue(source));
279                         lastSource = source;
280                         return this;
281                 }
282
283                 /**
284                  * Builds the pipeline.
285                  *
286                  * @return The created pipeline
287                  */
288                 public Pipeline build() {
289                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
290                 }
291
292         }
293
294         /**
295          * A connection is responsible for streaming audio from one {@link Filter} to
296          * an arbitrary number of {@link Filter}s it is connected to. A connection is
297          * started by creating a {@link Thread} wrapping it and starting said thread.
298          *
299          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
300          */
301         public static class Connection implements Runnable {
302
303                 /** The logger. */
304                 private static final Logger logger = Logger.getLogger(Connection.class.getName());
305
306                 /** The source. */
307                 private final Filter source;
308
309                 /** The filters. */
310                 private final Collection<Filter> sinks;
311
312                 /** Whether the feeder was stopped. */
313                 private final AtomicBoolean stopped = new AtomicBoolean(false);
314
315                 /** The executor service. */
316                 private final ExecutorService executorService;
317
318                 /** The time the connection was started. */
319                 private long startTime;
320
321                 /** The number of copied bytes. */
322                 private long counter;
323
324                 /** The exception that was encountered, if any. */
325                 private Optional<IOException> ioException = Optional.absent();
326
327                 /**
328                  * Creates a new connection.
329                  *
330                  * @param source
331                  *              The source of the stream
332                  * @param sinks
333                  *              The filters to which to stream
334                  */
335                 public Connection(Filter source, Collection<Filter> sinks) {
336                         this.source = source;
337                         this.sinks = sinks;
338                         if (sinks.size() < 2) {
339                                 executorService = MoreExecutors.sameThreadExecutor();
340                         } else {
341                                 executorService = Executors.newCachedThreadPool();
342                         }
343                 }
344
345                 //
346                 // ACCESSORS
347                 //
348
349                 /**
350                  * Returns the source of this connection.
351                  *
352                  * @return The source of this connection
353                  */
354                 public Filter source() {
355                         return source;
356                 }
357
358                 /**
359                  * Returns the sinks of this connection.
360                  *
361                  * @return The sinks of this connection
362                  */
363                 public Collection<Filter> sinks() {
364                         return sinks;
365                 }
366
367                 /**
368                  * Returns the time this connection was started.
369                  *
370                  * @return The time this connection was started (in milliseconds since Jan 1,
371                  *         1970 UTC)
372                  */
373                 public long startTime() {
374                         return startTime;
375                 }
376
377                 /**
378                  * Returns the number of bytes that this connection has received from its
379                  * source during its lifetime.
380                  *
381                  * @return The number of processed input bytes
382                  */
383                 public long counter() {
384                         return counter;
385                 }
386
387                 /**
388                  * Returns the I/O exception that was encountered while processing this
389                  * connection.
390                  *
391                  * @return The I/O exception that occured, or {@link Optional#absent()} if no
392                  *         exception occured
393                  */
394                 public Optional<IOException> ioException() {
395                         return ioException;
396                 }
397
398                 //
399                 // ACTIONS
400                 //
401
402                 /** Stops this connection. */
403                 public void stop() {
404                         stopped.set(true);
405                 }
406
407                 //
408                 // RUNNABLE METHODS
409                 //
410
411                 @Override
412                 public void run() {
413                         startTime = System.currentTimeMillis();
414                         while (!stopped.get()) {
415                                 try {
416                                         final DataPacket dataPacket;
417                                         logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
418                                         dataPacket = source.get(4096);
419                                         logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
420                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
421
422                                                 @Override
423                                                 public Callable<Void> apply(final Filter sink) {
424                                                         return new Callable<Void>() {
425
426                                                                 @Override
427                                                                 public Void call() throws Exception {
428                                                                         logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
429                                                                         sink.process(dataPacket);
430                                                                         logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
431                                                                         return null;
432                                                                 }
433                                                         };
434                                                 }
435                                         }).toList());
436                                         /* check all threads for exceptions. */
437                                         for (Future<Void> future : futures) {
438                                                 future.get();
439                                         }
440                                         counter += dataPacket.buffer().length;
441                                 } catch (IOException e) {
442                                         ioException = Optional.of(e);
443                                         break;
444                                 } catch (InterruptedException e) {
445                                         /* TODO */
446                                         e.printStackTrace();
447                                         break;
448                                 } catch (ExecutionException e) {
449                                         /* TODO */
450                                         e.printStackTrace();
451                                         break;
452                                 }
453                         }
454                 }
455
456         }
457
458         /**
459          * Container for input and output counters.
460          *
461          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
462          */
463         public static class TrafficCounter {
464
465                 /** The number of input bytes. */
466                 private final long input;
467
468                 /** The number of output bytes. */
469                 private final long output;
470
471                 /**
472                  * Creates a new traffic counter.
473                  *
474                  * @param input
475                  *              The number of input bytes (may be {@code -1} to signify non-available
476                  *              input)
477                  * @param output
478                  *              The number of output bytes (may be {@code -1} to signify non-available
479                  *              output)
480                  */
481                 public TrafficCounter(long input, long output) {
482                         this.input = input;
483                         this.output = output;
484                 }
485
486                 //
487                 // ACCESSORS
488                 //
489
490                 /**
491                  * Returns the number of input bytes.
492                  *
493                  * @return The number of input bytes, or {@link Optional#absent()} if the
494                  *         filter did not receive input
495                  */
496                 public Optional<Long> input() {
497                         return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
498                 }
499
500                 /**
501                  * Returns the number of output bytes.
502                  *
503                  * @return The number of output bytes, or {@link Optional#absent()} if the
504                  *         filter did not send output
505                  */
506                 public Optional<Long> output() {
507                         return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
508                 }
509
510         }
511
512 }