Create threads with decent names.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Multimap;
41 import com.google.common.util.concurrent.MoreExecutors;
42
43 /**
44  * A pipeline is responsible for streaming audio data from a {@link Source} to
45  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
46  *
47  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
48  */
49 public class Pipeline implements Iterable<ControlledComponent> {
50
51         /** The logger. */
52         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
53
54         /** The source of the audio stream. */
55         private final Source source;
56
57         /** The sinks for each source. */
58         private final Multimap<Source, Sink> sinks;
59
60         /** All started connections. */
61         private final List<Connection> connections = Lists.newArrayList();
62
63         /**
64          * Creates a new pipeline.
65          *
66          * @param source
67          *              The source of the audio stream
68          * @param sinks
69          *              The sinks for each source
70          */
71         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
72                 this.source = Preconditions.checkNotNull(source, "source must not be null");
73                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
74                 for (ControlledComponent component : Lists.reverse(components())) {
75                         logger.finest(String.format("Adding Listener to %s.", component.name()));
76                         component.addMetadataListener(new MetadataListener() {
77                                 @Override
78                                 public void metadataUpdated(ControlledComponent component, Metadata metadata) {
79                                         if (!(component instanceof Source)) {
80                                                 return;
81                                         }
82                                         for (ControlledComponent controlledComponent : sinks((Source) component)) {
83                                                 logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
84                                                 controlledComponent.metadataUpdated(metadata);
85                                         }
86                                 }
87                         });
88                 }
89         }
90
91         //
92         // ACCESSORS
93         //
94
95         /**
96          * Expose this pipeline’s source.
97          *
98          * @return This pipeline’s source
99          */
100         public Source source() {
101                 return source;
102         }
103
104         /**
105          * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
106          * the given source.
107          *
108          * @param source
109          *              The source to get the sinks for
110          * @return The sinks connected to the given source, or an empty list if the
111          *         source does not exist in this pipeline
112          */
113         public Collection<Sink> sinks(Source source) {
114                 return sinks.get(source);
115         }
116
117         /**
118          * Returns the traffic counters of the given controlled component.
119          *
120          * @param controlledComponent
121          *              The controlled component to get the traffic counters for
122          * @return The traffic counters for the given controlled component
123          */
124         public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
125                 long input = -1;
126                 long output = -1;
127                 for (Connection connection : connections) {
128                         /* the connection where the source matches knows the output. */
129                         if (connection.source.equals(controlledComponent)) {
130                                 output = connection.counter();
131                         } else if (connection.sinks.contains(controlledComponent)) {
132                                 input = connection.counter();
133                         }
134                 }
135                 return new TrafficCounter(input, output);
136         }
137
138         //
139         // ACTIONS
140         //
141
142         /**
143          * Starts the pipeline.
144          *
145          * @throws IOException
146          *              if any of the sinks can not be opened
147          * @throws IllegalStateException
148          *              if the pipeline is already running
149          */
150         public void start() throws IOException, IllegalStateException {
151                 if (!connections.isEmpty()) {
152                         throw new IllegalStateException("Pipeline is already running!");
153                 }
154                 List<Source> sources = Lists.newArrayList();
155                 sources.add(source);
156                 /* collect all source->sink pairs. */
157                 while (!sources.isEmpty()) {
158                         Source source = sources.remove(0);
159                         Collection<Sink> sinks = this.sinks.get(source);
160                         connections.add(new Connection(source, sinks));
161                         for (Sink sink : sinks) {
162                                 logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
163                                 sink.open(source.metadata());
164                                 if (sink instanceof Filter) {
165                                         sources.add((Source) sink);
166                                 }
167                         }
168                 }
169                 for (Connection connection : connections) {
170                         String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
171
172                                 @Override
173                                 public String apply(Sink sink) {
174                                         return sink.name();
175                                 }
176                         }));
177                         logger.info(String.format("Starting Thread: %s", threadName));
178                         new Thread(connection, threadName).start();
179                 }
180         }
181
182         public void stop() {
183                 if (!connections.isEmpty()) {
184                         /* pipeline is not running. */
185                         return;
186                 }
187                 for (Connection connection : connections) {
188                         connection.stop();
189                 }
190         }
191
192         //
193         // ITERABLE METHODS
194         //
195
196         @Override
197         public Iterator<ControlledComponent> iterator() {
198                 return components().iterator();
199         }
200
201         //
202         // PRIVATE METHODS
203         //
204
205         /**
206          * Returns all components of this pipeline, listed breadth-first, starting with
207          * the source.
208          *
209          * @return All components of this pipeline
210          */
211         public List<ControlledComponent> components() {
212                 ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
213                 List<ControlledComponent> currentComponents = Lists.newArrayList();
214                 components.add(source);
215                 currentComponents.add(source);
216                 while (!currentComponents.isEmpty()) {
217                         Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
218                         for (Sink sink : sinks) {
219                                 components.add(sink);
220                                 if (sink instanceof Source) {
221                                         currentComponents.add(sink);
222                                 }
223                         }
224                 }
225                 return components.build();
226         }
227
228         //
229         // STATIC METHODS
230         //
231
232         /**
233          * Returns a new pipeline builder.
234          *
235          * @param source
236          *              The source at which to start
237          * @return A builder for a new pipeline
238          */
239         public static Builder builder(Source source) {
240                 return new Builder(source);
241         }
242
243         /**
244          * A builder for a {@link Pipeline}.
245          *
246          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
247          */
248         public static class Builder {
249
250                 /** The source of the pipeline. */
251                 private final Source source;
252
253                 /** The sinks to which each source streams. */
254                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
255
256                 /** The last added source. */
257                 private Source lastSource;
258
259                 /**
260                  * Creates a new builder.
261                  *
262                  * @param source
263                  *              The source that starts the pipeline
264                  */
265                 private Builder(Source source) {
266                         this.source = source;
267                         lastSource = source;
268                 }
269
270                 /**
271                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
272                  * {@link Source}.
273                  *
274                  * @param sink
275                  *              The sink to add
276                  * @return This builder
277                  * @throws IllegalStateException
278                  *              if the last added {@link Sink} was not also a {@link Source}
279                  */
280                 public Builder to(Sink sink) {
281                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
282                         nextSinks.put(lastSource, sink);
283                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
284                         return this;
285                 }
286
287                 /**
288                  * Locates the given source and sets it as the last added node so that the
289                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
290                  *
291                  * @param source
292                  *              The source to locate
293                  * @return This builder
294                  * @throws IllegalStateException
295                  *              if the given source was not previously added as a sink
296                  */
297                 public Builder find(Source source) {
298                         Preconditions.checkState(nextSinks.containsValue(source));
299                         lastSource = source;
300                         return this;
301                 }
302
303                 /**
304                  * Builds the pipeline.
305                  *
306                  * @return The created pipeline
307                  */
308                 public Pipeline build() {
309                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
310                 }
311
312         }
313
314         /**
315          * A connection is responsible for streaming audio from one {@link Source} to
316          * an arbitrary number of {@link Sink}s it is connected to. A connection is
317          * started by creating a {@link Thread} wrapping it and starting said thread.
318          *
319          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
320          */
321         public class Connection implements Runnable {
322
323                 /** The source. */
324                 private final Source source;
325
326                 /** The sinks. */
327                 private final Collection<Sink> sinks;
328
329                 /** Whether the feeder was stopped. */
330                 private final AtomicBoolean stopped = new AtomicBoolean(false);
331
332                 /** The executor service. */
333                 private final ExecutorService executorService;
334
335                 /** The time the connection was started. */
336                 private long startTime;
337
338                 /** The number of copied bytes. */
339                 private long counter;
340
341                 /**
342                  * Creates a new connection.
343                  *
344                  * @param source
345                  *              The source of the stream
346                  * @param sinks
347                  *              The sinks to which to stream
348                  */
349                 public Connection(Source source, Collection<Sink> sinks) {
350                         this.source = source;
351                         this.sinks = sinks;
352                         if (sinks.size() < 2) {
353                                 executorService = MoreExecutors.sameThreadExecutor();
354                         } else {
355                                 executorService = Executors.newCachedThreadPool();
356                         }
357                 }
358
359                 //
360                 // ACCESSORS
361                 //
362
363                 /**
364                  * Returns the time this connection was started.
365                  *
366                  * @return The time this connection was started (in milliseconds since Jan 1,
367                  *         1970 UTC)
368                  */
369                 public long startTime() {
370                         return startTime;
371                 }
372
373                 /**
374                  * Returns the number of bytes that this connection has received from its
375                  * source during its lifetime.
376                  *
377                  * @return The number of processed input bytes
378                  */
379                 public long counter() {
380                         return counter;
381                 }
382
383                 //
384                 // ACTIONS
385                 //
386
387                 /** Stops this connection. */
388                 public void stop() {
389                         stopped.set(true);
390                 }
391
392                 //
393                 // RUNNABLE METHODS
394                 //
395
396                 @Override
397                 public void run() {
398                         startTime = System.currentTimeMillis();
399                         while (!stopped.get()) {
400                                 try {
401                                         final byte[] buffer;
402                                         try {
403                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
404                                                 buffer = source.get(4096);
405                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
406                                         } catch (IOException ioe1) {
407                                                 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
408                                         }
409                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
410
411                                                 @Override
412                                                 public Callable<Void> apply(final Sink sink) {
413                                                         return new Callable<Void>() {
414
415                                                                 @Override
416                                                                 public Void call() throws Exception {
417                                                                         try {
418                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
419                                                                                 sink.process(buffer);
420                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
421                                                                         } catch (IOException ioe1) {
422                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
423                                                                         }
424                                                                         return null;
425                                                                 }
426                                                         };
427                                                 }
428                                         }).toList());
429                                         /* check all threads for exceptions. */
430                                         for (Future<Void> future : futures) {
431                                                 future.get();
432                                         }
433                                         counter += buffer.length;
434                                 } catch (IOException e) {
435                                         /* TODO */
436                                         e.printStackTrace();
437                                         break;
438                                 } catch (InterruptedException e) {
439                                         /* TODO */
440                                         e.printStackTrace();
441                                         break;
442                                 } catch (ExecutionException e) {
443                                         /* TODO */
444                                         e.printStackTrace();
445                                         break;
446                                 }
447                         }
448                 }
449
450         }
451
452         /**
453          * Container for input and output counters.
454          *
455          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
456          */
457         public static class TrafficCounter {
458
459                 /** The number of input bytes. */
460                 private final long input;
461
462                 /** The number of output bytes. */
463                 private final long output;
464
465                 /**
466                  * Creates a new traffic counter.
467                  *
468                  * @param input
469                  *              The number of input bytes (may be {@code -1} to signify non-available
470                  *              input)
471                  * @param output
472                  *              The number of output bytes (may be {@code -1} to signify non-available
473                  *              output)
474                  */
475                 public TrafficCounter(long input, long output) {
476                         this.input = input;
477                         this.output = output;
478                 }
479
480                 //
481                 // ACCESSORS
482                 //
483
484                 /**
485                  * Returns the number of input bytes.
486                  *
487                  * @return The number of input bytes, or {@link Optional#absent()} if the
488                  *         component can not receive input
489                  */
490                 public Optional<Long> input() {
491                         return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
492                 }
493
494                 /**
495                  * Returns the number of output bytes.
496                  *
497                  * @return The number of output bytes, or {@link Optional#absent()} if the
498                  *         component can not send output
499                  */
500                 public Optional<Long> output() {
501                         return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
502                 }
503
504         }
505
506 }