1487d138eecc927f07e91732bf6ac14614256c85
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.FluentIterable;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMultimap;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Multimap;
41 import com.google.common.util.concurrent.MoreExecutors;
42
43 /**
44  * A pipeline is responsible for streaming audio data from a {@link Source} to
45  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
46  *
47  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
48  */
49 public class Pipeline implements Iterable<ControlledComponent> {
50
51         /** The logger. */
52         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
53
54         /** The source of the audio stream. */
55         private final Source source;
56
57         /** The sinks for each source. */
58         private final Multimap<Source, Sink> sinks;
59
60         /** All started connections. */
61         private final List<Connection> connections = Lists.newArrayList();
62
63         /**
64          * Creates a new pipeline.
65          *
66          * @param source
67          *              The source of the audio stream
68          * @param sinks
69          *              The sinks for each source
70          */
71         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
72                 this.source = Preconditions.checkNotNull(source, "source must not be null");
73                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
74         }
75
76         //
77         // ACCESSORS
78         //
79
80         /**
81          * Expose this pipeline’s source.
82          *
83          * @return This pipeline’s source
84          */
85         public Source source() {
86                 return source;
87         }
88
89         /**
90          * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
91          * the given source.
92          *
93          * @param source
94          *              The source to get the sinks for
95          * @return The sinks connected to the given source, or an empty list if the
96          *         source does not exist in this pipeline
97          */
98         public Collection<Sink> sinks(Source source) {
99                 return sinks.get(source);
100         }
101
102         /**
103          * Returns the traffic counters of the given controlled component.
104          *
105          * @param controlledComponent
106          *              The controlled component to get the traffic counters for
107          * @return The traffic counters for the given controlled component
108          */
109         public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
110                 long input = -1;
111                 long output = -1;
112                 for (Connection connection : connections) {
113                         /* the connection where the source matches knows the output. */
114                         if (connection.source.equals(controlledComponent)) {
115                                 output = connection.counter();
116                         } else if (connection.sinks.contains(controlledComponent)) {
117                                 input = connection.counter();
118                         }
119                 }
120                 return new TrafficCounter(input, output);
121         }
122
123         //
124         // ACTIONS
125         //
126
127         /**
128          * Starts the pipeline.
129          *
130          * @throws IOException
131          *              if any of the sinks can not be opened
132          * @throws IllegalStateException
133          *              if the pipeline is already running
134          */
135         public void start() throws IOException, IllegalStateException {
136                 if (!connections.isEmpty()) {
137                         throw new IllegalStateException("Pipeline is already running!");
138                 }
139                 List<Source> sources = Lists.newArrayList();
140                 sources.add(source);
141                 /* collect all source->sink pairs. */
142                 while (!sources.isEmpty()) {
143                         Source source = sources.remove(0);
144                         Collection<Sink> sinks = this.sinks.get(source);
145                         connections.add(new Connection(source, sinks));
146                         for (Sink sink : sinks) {
147                                 sink.open(source.metadata());
148                                 if (sink instanceof Filter) {
149                                         sources.add((Source) sink);
150                                 }
151                         }
152                 }
153                 for (Connection connection : connections) {
154                         logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
155                         new Thread(connection).start();
156                 }
157         }
158
159         public void stop() {
160                 if (!connections.isEmpty()) {
161                         /* pipeline is not running. */
162                         return;
163                 }
164                 for (Connection connection : connections) {
165                         connection.stop();
166                 }
167         }
168
169         //
170         // ITERABLE METHODS
171         //
172
173         @Override
174         public Iterator<ControlledComponent> iterator() {
175                 return components().iterator();
176         }
177
178         //
179         // PRIVATE METHODS
180         //
181
182         /**
183          * Returns all components of this pipeline, listed breadth-first, starting with
184          * the source.
185          *
186          * @return All components of this pipeline
187          */
188         public List<ControlledComponent> components() {
189                 ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
190                 List<ControlledComponent> currentComponents = Lists.newArrayList();
191                 components.add(source);
192                 currentComponents.add(source);
193                 while (!currentComponents.isEmpty()) {
194                         Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
195                         for (Sink sink : sinks) {
196                                 components.add(sink);
197                                 if (sink instanceof Source) {
198                                         currentComponents.add(sink);
199                                 }
200                         }
201                 }
202                 return components.build();
203         }
204
205         //
206         // STATIC METHODS
207         //
208
209         /**
210          * Returns a new pipeline builder.
211          *
212          * @param source
213          *              The source at which to start
214          * @return A builder for a new pipeline
215          */
216         public static Builder builder(Source source) {
217                 return new Builder(source);
218         }
219
220         /**
221          * A builder for a {@link Pipeline}.
222          *
223          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
224          */
225         public static class Builder {
226
227                 /** The source of the pipeline. */
228                 private final Source source;
229
230                 /** The sinks to which each source streams. */
231                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
232
233                 /** The last added source. */
234                 private Source lastSource;
235
236                 /**
237                  * Creates a new builder.
238                  *
239                  * @param source
240                  *              The source that starts the pipeline
241                  */
242                 private Builder(Source source) {
243                         this.source = source;
244                         lastSource = source;
245                 }
246
247                 /**
248                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
249                  * {@link Source}.
250                  *
251                  * @param sink
252                  *              The sink to add
253                  * @return This builder
254                  * @throws IllegalStateException
255                  *              if the last added {@link Sink} was not also a {@link Source}
256                  */
257                 public Builder to(Sink sink) {
258                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
259                         nextSinks.put(lastSource, sink);
260                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
261                         return this;
262                 }
263
264                 /**
265                  * Locates the given source and sets it as the last added node so that the
266                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
267                  *
268                  * @param source
269                  *              The source to locate
270                  * @return This builder
271                  * @throws IllegalStateException
272                  *              if the given source was not previously added as a sink
273                  */
274                 public Builder find(Source source) {
275                         Preconditions.checkState(nextSinks.containsValue(source));
276                         lastSource = source;
277                         return this;
278                 }
279
280                 /**
281                  * Builds the pipeline.
282                  *
283                  * @return The created pipeline
284                  */
285                 public Pipeline build() {
286                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
287                 }
288
289         }
290
291         /**
292          * A connection is responsible for streaming audio from one {@link Source} to
293          * an arbitrary number of {@link Sink}s it is connected to. A connection is
294          * started by creating a {@link Thread} wrapping it and starting said thread.
295          *
296          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
297          */
298         public class Connection implements Runnable {
299
300                 /** The source. */
301                 private final Source source;
302
303                 /** The sinks. */
304                 private final Collection<Sink> sinks;
305
306                 /** Whether the feeder was stopped. */
307                 private final AtomicBoolean stopped = new AtomicBoolean(false);
308
309                 /** The executor service. */
310                 private final ExecutorService executorService;
311
312                 /** The number of copied bytes. */
313                 private long counter;
314
315                 /**
316                  * Creates a new connection.
317                  *
318                  * @param source
319                  *              The source of the stream
320                  * @param sinks
321                  *              The sinks to which to stream
322                  */
323                 public Connection(Source source, Collection<Sink> sinks) {
324                         this.source = source;
325                         this.sinks = sinks;
326                         if (sinks.size() == 1) {
327                                 executorService = MoreExecutors.sameThreadExecutor();
328                         } else {
329                                 executorService = Executors.newCachedThreadPool();
330                         }
331                 }
332
333                 //
334                 // ACCESSORS
335                 //
336
337                 /**
338                  * Returns the number of bytes that this connection has received from its
339                  * source during its lifetime.
340                  *
341                  * @return The number of processed input bytes
342                  */
343                 public long counter() {
344                         return counter;
345                 }
346
347                 //
348                 // ACTIONS
349                 //
350
351                 /** Stops this connection. */
352                 public void stop() {
353                         stopped.set(true);
354                 }
355
356                 //
357                 // RUNNABLE METHODS
358                 //
359
360                 @Override
361                 public void run() {
362                         Metadata firstMetadata = null;
363                         while (!stopped.get()) {
364                                 try {
365                                         final byte[] buffer;
366                                         try {
367                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
368                                                 buffer = source.get(4096);
369                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
370                                         } catch (IOException ioe1) {
371                                                 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
372                                         }
373                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
374
375                                                 @Override
376                                                 public Callable<Void> apply(final Sink sink) {
377                                                         return new Callable<Void>() {
378
379                                                                 @Override
380                                                                 public Void call() throws Exception {
381                                                                         try {
382                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
383                                                                                 sink.process(buffer);
384                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
385                                                                         } catch (IOException ioe1) {
386                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
387                                                                         }
388                                                                         return null;
389                                                                 }
390                                                         };
391                                                 }
392                                         }).toList());
393                                         /* check all threads for exceptions. */
394                                         for (Future<Void> future : futures) {
395                                                 future.get();
396                                         }
397                                         counter += buffer.length;
398                                 } catch (IOException e) {
399                                         /* TODO */
400                                         e.printStackTrace();
401                                         break;
402                                 } catch (InterruptedException e) {
403                                         /* TODO */
404                                         e.printStackTrace();
405                                         break;
406                                 } catch (ExecutionException e) {
407                                         /* TODO */
408                                         e.printStackTrace();
409                                         break;
410                                 }
411                         }
412                 }
413
414         }
415
416         /**
417          * Container for input and output counters.
418          *
419          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
420          */
421         public static class TrafficCounter {
422
423                 /** The number of input bytes. */
424                 private final long input;
425
426                 /** The number of output bytes. */
427                 private final long output;
428
429                 /**
430                  * Creates a new traffic counter.
431                  *
432                  * @param input
433                  *              The number of input bytes (may be {@code -1} to signify non-available
434                  *              input)
435                  * @param output
436                  *              The number of output bytes (may be {@code -1} to signify non-available
437                  *              output)
438                  */
439                 public TrafficCounter(long input, long output) {
440                         this.input = input;
441                         this.output = output;
442                 }
443
444                 //
445                 // ACCESSORS
446                 //
447
448                 /**
449                  * Returns the number of input bytes.
450                  *
451                  * @return The number of input bytes, or {@link Optional#absent()} if the
452                  *         component can not receive input
453                  */
454                 public Optional<Long> input() {
455                         return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
456                 }
457
458                 /**
459                  * Returns the number of output bytes.
460                  *
461                  * @return The number of output bytes, or {@link Optional#absent()} if the
462                  *         component can not send output
463                  */
464                 public Optional<Long> output() {
465                         return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
466                 }
467
468         }
469
470 }