Change filter opening.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.ListMultimap;
40 import com.google.common.collect.Lists;
41 import com.google.common.collect.Multimap;
42 import com.google.common.util.concurrent.MoreExecutors;
43
44 /**
45  * A pipeline is responsible for streaming audio data from a {@link Filter} to
46  * an arbitrary number of connected {@link Filter}s.
47  *
48  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
49  */
50 public class Pipeline implements Iterable<Filter> {
51
52         /** The logger. */
53         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
54
55         /** The source of the audio stream. */
56         private final Filter source;
57
58         /** The filters for each source. */
59         private final ListMultimap<Filter, Filter> filters;
60
61         /** All started connections. */
62         private final List<Connection> connections = Lists.newArrayList();
63
64         /**
65          * Creates a new pipeline.
66          *
67          * @param source
68          *              The source of the audio stream
69          * @param filters
70          *              The filters for each source
71          */
72         private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
73                 this.source = Preconditions.checkNotNull(source, "source must not be null");
74                 this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
75         }
76
77         //
78         // ACCESSORS
79         //
80
81         /**
82          * Expose this pipeline’s source.
83          *
84          * @return This pipeline’s source
85          */
86         public Filter source() {
87                 return source;
88         }
89
90         /**
91          * Returns all {@link Filter}s that are connected to the given filter.
92          *
93          * @param filter
94          *              The filter to get the connected filters for
95          * @return The filters connected to the given filter, or an empty list if the
96          *         filter does not exist in this pipeline, or is not connected to any filters
97          */
98         public List<Filter> filters(Filter filter) {
99                 return filters.get(filter);
100         }
101
102         /**
103          * Returns the traffic counters of the given filter.
104          *
105          * @param filter
106          *              The filter to get the traffic counters for
107          * @return The traffic counters for the given filter
108          */
109         public TrafficCounter trafficCounter(Filter filter) {
110                 long input = -1;
111                 long output = -1;
112                 for (Connection connection : connections) {
113                         /* the connection where the source matches knows the output. */
114                         if (connection.source.equals(filter)) {
115                                 output = connection.counter();
116                         } else if (connection.sinks.contains(filter)) {
117                                 input = connection.counter();
118                         }
119                 }
120                 return new TrafficCounter(input, output);
121         }
122
123         //
124         // ACTIONS
125         //
126
127         /**
128          * Starts the pipeline.
129          *
130          * @throws IOException
131          *              if any of the filters can not be opened
132          * @throws IllegalStateException
133          *              if the pipeline is already running
134          */
135         public void start() throws IOException, IllegalStateException {
136                 if (!connections.isEmpty()) {
137                         throw new IllegalStateException("Pipeline is already running!");
138                 }
139                 List<Filter> filters = Lists.newArrayList();
140                 filters.add(source);
141                 Metadata currentMetadata = Metadata.UNKNOWN;
142                 /* collect all source->sink pairs. */
143                 while (!filters.isEmpty()) {
144                         Filter filter = filters.remove(0);
145                         logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
146                         filter.open(currentMetadata);
147                         currentMetadata = filter.metadata();
148                         Collection<Filter> sinks = this.filters.get(filter);
149                         connections.add(new Connection(filter, sinks));
150                         for (Filter sink : sinks) {
151                                 filters.add(sink);
152                         }
153                 }
154                 for (Connection connection : connections) {
155                         String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
156
157                                 @Override
158                                 public String apply(Filter sink) {
159                                         return sink.name();
160                                 }
161                         }));
162                         logger.info(String.format("Starting Thread: %s", threadName));
163                         new Thread(connection, threadName).start();
164                 }
165         }
166
167         public void stop() {
168                 if (!connections.isEmpty()) {
169                         /* pipeline is not running. */
170                         return;
171                 }
172                 for (Connection connection : connections) {
173                         connection.stop();
174                 }
175         }
176
177         //
178         // ITERABLE METHODS
179         //
180
181         @Override
182         public Iterator<Filter> iterator() {
183                 return filters().iterator();
184         }
185
186         //
187         // PRIVATE METHODS
188         //
189
190         /**
191          * Returns all filters of this pipeline, listed breadth-first, starting with
192          * the source.
193          *
194          * @return All filters of this pipeline
195          */
196         public List<Filter> filters() {
197                 ImmutableList.Builder<Filter> filters = ImmutableList.builder();
198                 List<Filter> remainingFilters = Lists.newArrayList();
199                 filters.add(source);
200                 remainingFilters.add(source);
201                 while (!remainingFilters.isEmpty()) {
202                         Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
203                         for (Filter sink : sinks) {
204                                 filters.add(sink);
205                                 remainingFilters.add(sink);
206                         }
207                 }
208                 return filters.build();
209         }
210
211         //
212         // STATIC METHODS
213         //
214
215         /**
216          * Returns a new pipeline builder.
217          *
218          * @param source
219          *              The source at which to start
220          * @return A builder for a new pipeline
221          */
222         public static Builder builder(Filter source) {
223                 return new Builder(source);
224         }
225
226         /**
227          * A builder for a {@link Pipeline}.
228          *
229          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
230          */
231         public static class Builder {
232
233                 /** The source of the pipeline. */
234                 private final Filter source;
235
236                 /** The filters to which each source streams. */
237                 private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
238
239                 /** The last added source. */
240                 private Filter lastSource;
241
242                 /**
243                  * Creates a new builder.
244                  *
245                  * @param source
246                  *              The source that starts the pipeline
247                  */
248                 private Builder(Filter source) {
249                         this.source = source;
250                         lastSource = source;
251                 }
252
253                 /**
254                  * Adds a {@link Filter} as a recipient for the last added source.
255                  *
256                  * @param sink
257                  *              The sink to add
258                  * @return This builder
259                  */
260                 public Builder to(Filter sink) {
261                         nextSinks.put(lastSource, sink);
262                         lastSource = sink;
263                         return this;
264                 }
265
266                 /**
267                  * Locates the given source and sets it as the last added node so that the
268                  * next invocation of {@link #to(Filter)} can “fork” the pipeline.
269                  *
270                  * @param source
271                  *              The source to locate
272                  * @return This builder
273                  * @throws IllegalStateException
274                  *              if the given source was not previously added as a sink
275                  */
276                 public Builder find(Filter source) {
277                         Preconditions.checkState(nextSinks.containsValue(source));
278                         lastSource = source;
279                         return this;
280                 }
281
282                 /**
283                  * Builds the pipeline.
284                  *
285                  * @return The created pipeline
286                  */
287                 public Pipeline build() {
288                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
289                 }
290
291         }
292
293         /**
294          * A connection is responsible for streaming audio from one {@link Filter} to
295          * an arbitrary number of {@link Filter}s it is connected to. A connection is
296          * started by creating a {@link Thread} wrapping it and starting said thread.
297          *
298          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
299          */
300         public static class Connection implements Runnable {
301
302                 /** The source. */
303                 private final Filter source;
304
305                 /** The filters. */
306                 private final Collection<Filter> sinks;
307
308                 /** Whether the feeder was stopped. */
309                 private final AtomicBoolean stopped = new AtomicBoolean(false);
310
311                 /** The executor service. */
312                 private final ExecutorService executorService;
313
314                 /** The time the connection was started. */
315                 private long startTime;
316
317                 /** The number of copied bytes. */
318                 private long counter;
319
320                 /**
321                  * Creates a new connection.
322                  *
323                  * @param source
324                  *              The source of the stream
325                  * @param sinks
326                  *              The filters to which to stream
327                  */
328                 public Connection(Filter source, Collection<Filter> sinks) {
329                         this.source = source;
330                         this.sinks = sinks;
331                         if (sinks.size() < 2) {
332                                 executorService = MoreExecutors.sameThreadExecutor();
333                         } else {
334                                 executorService = Executors.newCachedThreadPool();
335                         }
336                 }
337
338                 //
339                 // ACCESSORS
340                 //
341
342                 /**
343                  * Returns the source of this connection.
344                  *
345                  * @return The source of this connection
346                  */
347                 public Filter source() {
348                         return source;
349                 }
350
351                 /**
352                  * Returns the sinks of this connection.
353                  *
354                  * @return The sinks of this connection
355                  */
356                 public Collection<Filter> sinks() {
357                         return sinks;
358                 }
359
360                 /**
361                  * Returns the time this connection was started.
362                  *
363                  * @return The time this connection was started (in milliseconds since Jan 1,
364                  *         1970 UTC)
365                  */
366                 public long startTime() {
367                         return startTime;
368                 }
369
370                 /**
371                  * Returns the number of bytes that this connection has received from its
372                  * source during its lifetime.
373                  *
374                  * @return The number of processed input bytes
375                  */
376                 public long counter() {
377                         return counter;
378                 }
379
380                 //
381                 // ACTIONS
382                 //
383
384                 /** Stops this connection. */
385                 public void stop() {
386                         stopped.set(true);
387                 }
388
389                 //
390                 // RUNNABLE METHODS
391                 //
392
393                 @Override
394                 public void run() {
395                         startTime = System.currentTimeMillis();
396                         while (!stopped.get()) {
397                                 try {
398                                         final DataPacket dataPacket;
399                                         try {
400                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
401                                                 dataPacket = source.get(4096);
402                                                 logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
403                                         } catch (IOException ioe1) {
404                                                 throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
405                                         }
406                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
407
408                                                 @Override
409                                                 public Callable<Void> apply(final Filter sink) {
410                                                         return new Callable<Void>() {
411
412                                                                 @Override
413                                                                 public Void call() throws Exception {
414                                                                         try {
415                                                                                 logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
416                                                                                 sink.process(dataPacket);
417                                                                                 logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
418                                                                         } catch (IOException ioe1) {
419                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
420                                                                         }
421                                                                         return null;
422                                                                 }
423                                                         };
424                                                 }
425                                         }).toList());
426                                         /* check all threads for exceptions. */
427                                         for (Future<Void> future : futures) {
428                                                 future.get();
429                                         }
430                                         counter += dataPacket.buffer().length;
431                                 } catch (IOException e) {
432                                         /* TODO */
433                                         e.printStackTrace();
434                                         break;
435                                 } catch (InterruptedException e) {
436                                         /* TODO */
437                                         e.printStackTrace();
438                                         break;
439                                 } catch (ExecutionException e) {
440                                         /* TODO */
441                                         e.printStackTrace();
442                                         break;
443                                 }
444                         }
445                 }
446
447         }
448
449         /**
450          * Container for input and output counters.
451          *
452          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
453          */
454         public static class TrafficCounter {
455
456                 /** The number of input bytes. */
457                 private final long input;
458
459                 /** The number of output bytes. */
460                 private final long output;
461
462                 /**
463                  * Creates a new traffic counter.
464                  *
465                  * @param input
466                  *              The number of input bytes (may be {@code -1} to signify non-available
467                  *              input)
468                  * @param output
469                  *              The number of output bytes (may be {@code -1} to signify non-available
470                  *              output)
471                  */
472                 public TrafficCounter(long input, long output) {
473                         this.input = input;
474                         this.output = output;
475                 }
476
477                 //
478                 // ACCESSORS
479                 //
480
481                 /**
482                  * Returns the number of input bytes.
483                  *
484                  * @return The number of input bytes, or {@link Optional#absent()} if the
485                  *         filter did not receive input
486                  */
487                 public Optional<Long> input() {
488                         return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
489                 }
490
491                 /**
492                  * Returns the number of output bytes.
493                  *
494                  * @return The number of output bytes, or {@link Optional#absent()} if the
495                  *         filter did not send output
496                  */
497                 public Optional<Long> output() {
498                         return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
499                 }
500
501         }
502
503 }