Send metadata change on start.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.logging.Logger;
30
31 import com.google.common.base.Function;
32 import com.google.common.base.Preconditions;
33 import com.google.common.collect.ArrayListMultimap;
34 import com.google.common.collect.FluentIterable;
35 import com.google.common.collect.ImmutableMultimap;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Multimap;
38
39 /**
40  * A pipeline is responsible for streaming audio data from a {@link Source} to
41  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
42  *
43  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
44  */
45 public class Pipeline {
46
47         /** The logger. */
48         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
49
50         /** The source of the audio stream. */
51         private final Source source;
52
53         /** The sinks for each source. */
54         private final Multimap<Source, Sink> sinks;
55
56         /** All started feeders. */
57         private final List<Feeder> feeders = Lists.newArrayList();
58
59         /**
60          * Creates a new pipeline.
61          *
62          * @param source
63          *              The source of the audio stream
64          * @param sinks
65          *              The sinks for each source
66          */
67         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
68                 this.source = Preconditions.checkNotNull(source, "source must not be null");
69                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
70         }
71
72         //
73         // ACTIONS
74         //
75
76         /**
77          * Starts the pipeline.
78          *
79          * @throws IOException
80          *              if any of the sinks can not be opened
81          * @throws IllegalStateException
82          *              if the pipeline is already running
83          */
84         public void start() throws IOException, IllegalStateException {
85                 if (!feeders.isEmpty()) {
86                         throw new IllegalStateException("Pipeline is already running!");
87                 }
88                 List<Source> sources = Lists.newArrayList();
89                 sources.add(source);
90                 /* collect all source->sink pairs. */
91                 while (!sources.isEmpty()) {
92                         Source source = sources.remove(0);
93                         Collection<Sink> sinks = this.sinks.get(source);
94                         feeders.add(new Feeder(source, sinks));
95                         for (Sink sink : sinks) {
96                                 sink.open(source.metadata());
97                                 if (sink instanceof Filter) {
98                                         sources.add((Source) sink);
99                                 }
100                         }
101                 }
102                 for (Feeder feeder : feeders) {
103                         logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
104                         new Thread(feeder).start();
105                 }
106         }
107
108         public void stop() {
109                 if (!feeders.isEmpty()) {
110                         /* pipeline is not running. */
111                         return;
112                 }
113                 for (Feeder feeder : feeders) {
114                         feeder.stop();
115                 }
116         }
117
118         //
119         // STATIC METHODS
120         //
121
122         /**
123          * Returns a new pipeline builder.
124          *
125          * @param source
126          *              The source at which to start
127          * @return A builder for a new pipeline
128          */
129         public static Builder builder(Source source) {
130                 return new Builder(source);
131         }
132
133         /**
134          * A builder for a {@link Pipeline}.
135          *
136          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
137          */
138         public static class Builder {
139
140                 /** The source of the pipeline. */
141                 private final Source source;
142
143                 /** The sinks to which each source streams. */
144                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
145
146                 /** The last added source. */
147                 private Source lastSource;
148
149                 /**
150                  * Creates a new builder.
151                  *
152                  * @param source
153                  *              The source that starts the pipeline
154                  */
155                 private Builder(Source source) {
156                         this.source = source;
157                         lastSource = source;
158                 }
159
160                 /**
161                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
162                  * {@link Source}.
163                  *
164                  * @param sink
165                  *              The sink to add
166                  * @return This builder
167                  * @throws IllegalStateException
168                  *              if the last added {@link Sink} was not also a {@link Source}
169                  */
170                 public Builder to(Sink sink) {
171                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
172                         nextSinks.put(lastSource, sink);
173                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
174                         return this;
175                 }
176
177                 /**
178                  * Locates the given source and sets it as the last added node so that the
179                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
180                  *
181                  * @param source
182                  *              The source to locate
183                  * @return This builder
184                  * @throws IllegalStateException
185                  *              if the given source was not previously added as a sink
186                  */
187                 public Builder find(Source source) {
188                         Preconditions.checkState(nextSinks.containsValue(source));
189                         lastSource = source;
190                         return this;
191                 }
192
193                 /**
194                  * Builds the pipeline.
195                  *
196                  * @return The created pipeline
197                  */
198                 public Pipeline build() {
199                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
200                 }
201
202         }
203
204         /**
205          * A feeder is responsible for streaming audio from one {@link Source} to an
206          * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
207          * creating a {@link Thread} wrapping it and starting said thread.
208          *
209          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
210          */
211         private class Feeder implements Runnable {
212
213                 /** The source. */
214                 private final Source source;
215
216                 /** The sinks. */
217                 private final Collection<Sink> sinks;
218
219                 /** Whether the feeder was stopped. */
220                 private final AtomicBoolean stopped = new AtomicBoolean(false);
221
222                 /** The executor service. */
223                 private final ExecutorService executorService;
224
225                 /**
226                  * Creates a new feeder.
227                  *
228                  * @param source
229                  *              The source of the stream
230                  * @param sinks
231                  *              The sinks to which to stream
232                  */
233                 public Feeder(Source source, Collection<Sink> sinks) {
234                         this.source = source;
235                         this.sinks = sinks;
236                         if (sinks.size() == 1) {
237                                 executorService = Executors.newSingleThreadExecutor();
238                         } else {
239                                 executorService = Executors.newCachedThreadPool();
240                         }
241                 }
242
243                 //
244                 // ACTIONS
245                 //
246
247                 /** Stops this feeder. */
248                 public void stop() {
249                         stopped.set(true);
250                 }
251
252                 //
253                 // RUNNABLE METHODS
254                 //
255
256                 @Override
257                 public void run() {
258                         Metadata firstMetadata = null;
259                         while (!stopped.get()) {
260                                 try {
261                                         final Metadata lastMetadata = firstMetadata;
262                                         final Metadata metadata = firstMetadata = source.metadata();
263                                         final byte[] buffer = source.get(4096);
264                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
265
266                                                 @Override
267                                                 public Callable<Void> apply(final Sink sink) {
268                                                         return new Callable<Void>() {
269
270                                                                 @Override
271                                                                 public Void call() throws Exception {
272                                                                         if (!metadata.equals(lastMetadata)) {
273                                                                                 sink.metadataUpdated(metadata);
274                                                                         }
275                                                                         sink.process(buffer);
276                                                                         return null;
277                                                                 }
278                                                         };
279                                                 }
280                                         }).toList());
281                                         /* check all threads for exceptions. */
282                                         for (Future<Void> future : futures) {
283                                                 future.get();
284                                         }
285                                 } catch (IOException e) {
286                                         /* TODO */
287                                         e.printStackTrace();
288                                 } catch (InterruptedException e) {
289                                         /* TODO */
290                                         e.printStackTrace();
291                                 } catch (ExecutionException e) {
292                                         /* TODO */
293                                         e.printStackTrace();
294                                 }
295                         }
296                 }
297
298         }
299
300 }