Rename Feeder to Connection.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
1 /*
2  * Sonitus - Pipeline.java - Copyright © 2013 David Roden
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17
18 package net.pterodactylus.sonitus.data;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.logging.Logger;
31
32 import com.google.common.base.Function;
33 import com.google.common.base.Preconditions;
34 import com.google.common.collect.ArrayListMultimap;
35 import com.google.common.collect.FluentIterable;
36 import com.google.common.collect.ImmutableMultimap;
37 import com.google.common.collect.ImmutableSet;
38 import com.google.common.collect.Lists;
39 import com.google.common.collect.Multimap;
40 import com.google.common.util.concurrent.MoreExecutors;
41
42 /**
43  * A pipeline is responsible for streaming audio data from a {@link Source} to
44  * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
45  *
46  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
47  */
48 public class Pipeline implements Iterable<Controlled> {
49
50         /** The logger. */
51         private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
52
53         /** The source of the audio stream. */
54         private final Source source;
55
56         /** The sinks for each source. */
57         private final Multimap<Source, Sink> sinks;
58
59         /** All started connections. */
60         private final List<Connection> connections = Lists.newArrayList();
61
62         /**
63          * Creates a new pipeline.
64          *
65          * @param source
66          *              The source of the audio stream
67          * @param sinks
68          *              The sinks for each source
69          */
70         private Pipeline(Source source, Multimap<Source, Sink> sinks) {
71                 this.source = Preconditions.checkNotNull(source, "source must not be null");
72                 this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
73         }
74
75         //
76         // ACCESSORS
77         //
78
79         /**
80          * Expose this pipeline’s source.
81          *
82          * @return This pipeline’s source
83          */
84         public Source source() {
85                 return source;
86         }
87
88         /**
89          * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
90          * the given source.
91          *
92          * @param source
93          *              The source to get the sinks for
94          * @return The sinks connected to the given source, or an empty list if the
95          *         source does not exist in this pipeline
96          */
97         public Collection<Sink> sinks(Source source) {
98                 return sinks.get(source);
99         }
100
101         //
102         // ACTIONS
103         //
104
105         /**
106          * Starts the pipeline.
107          *
108          * @throws IOException
109          *              if any of the sinks can not be opened
110          * @throws IllegalStateException
111          *              if the pipeline is already running
112          */
113         public void start() throws IOException, IllegalStateException {
114                 if (!connections.isEmpty()) {
115                         throw new IllegalStateException("Pipeline is already running!");
116                 }
117                 List<Source> sources = Lists.newArrayList();
118                 sources.add(source);
119                 /* collect all source->sink pairs. */
120                 while (!sources.isEmpty()) {
121                         Source source = sources.remove(0);
122                         Collection<Sink> sinks = this.sinks.get(source);
123                         connections.add(new Connection(source, sinks));
124                         for (Sink sink : sinks) {
125                                 sink.open(source.metadata());
126                                 if (sink instanceof Filter) {
127                                         sources.add((Source) sink);
128                                 }
129                         }
130                 }
131                 for (Connection connection : connections) {
132                         logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
133                         new Thread(connection).start();
134                 }
135         }
136
137         public void stop() {
138                 if (!connections.isEmpty()) {
139                         /* pipeline is not running. */
140                         return;
141                 }
142                 for (Connection connection : connections) {
143                         connection.stop();
144                 }
145         }
146
147         //
148         // ITERABLE METHODS
149         //
150
151         @Override
152         public Iterator<Controlled> iterator() {
153                 return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
154         }
155
156         //
157         // STATIC METHODS
158         //
159
160         /**
161          * Returns a new pipeline builder.
162          *
163          * @param source
164          *              The source at which to start
165          * @return A builder for a new pipeline
166          */
167         public static Builder builder(Source source) {
168                 return new Builder(source);
169         }
170
171         /**
172          * A builder for a {@link Pipeline}.
173          *
174          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
175          */
176         public static class Builder {
177
178                 /** The source of the pipeline. */
179                 private final Source source;
180
181                 /** The sinks to which each source streams. */
182                 private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
183
184                 /** The last added source. */
185                 private Source lastSource;
186
187                 /**
188                  * Creates a new builder.
189                  *
190                  * @param source
191                  *              The source that starts the pipeline
192                  */
193                 private Builder(Source source) {
194                         this.source = source;
195                         lastSource = source;
196                 }
197
198                 /**
199                  * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
200                  * {@link Source}.
201                  *
202                  * @param sink
203                  *              The sink to add
204                  * @return This builder
205                  * @throws IllegalStateException
206                  *              if the last added {@link Sink} was not also a {@link Source}
207                  */
208                 public Builder to(Sink sink) {
209                         Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
210                         nextSinks.put(lastSource, sink);
211                         lastSource = (sink instanceof Filter) ? (Source) sink : null;
212                         return this;
213                 }
214
215                 /**
216                  * Locates the given source and sets it as the last added node so that the
217                  * next invocation of {@link #to(Sink)} can “fork” the pipeline.
218                  *
219                  * @param source
220                  *              The source to locate
221                  * @return This builder
222                  * @throws IllegalStateException
223                  *              if the given source was not previously added as a sink
224                  */
225                 public Builder find(Source source) {
226                         Preconditions.checkState(nextSinks.containsValue(source));
227                         lastSource = source;
228                         return this;
229                 }
230
231                 /**
232                  * Builds the pipeline.
233                  *
234                  * @return The created pipeline
235                  */
236                 public Pipeline build() {
237                         return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
238                 }
239
240         }
241
242         /**
243          * A connection is responsible for streaming audio from one {@link Source} to
244          * an arbitrary number of {@link Sink}s it is connected to. A connection is
245          * started by creating a {@link Thread} wrapping it and starting said thread.
246          *
247          * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
248          */
249         public class Connection implements Runnable {
250
251                 /** The source. */
252                 private final Source source;
253
254                 /** The sinks. */
255                 private final Collection<Sink> sinks;
256
257                 /** Whether the feeder was stopped. */
258                 private final AtomicBoolean stopped = new AtomicBoolean(false);
259
260                 /** The executor service. */
261                 private final ExecutorService executorService;
262
263                 /**
264                  * Creates a new connection.
265                  *
266                  * @param source
267                  *              The source of the stream
268                  * @param sinks
269                  *              The sinks to which to stream
270                  */
271                 public Connection(Source source, Collection<Sink> sinks) {
272                         this.source = source;
273                         this.sinks = sinks;
274                         if (sinks.size() == 1) {
275                                 executorService = MoreExecutors.sameThreadExecutor();
276                         } else {
277                                 executorService = Executors.newCachedThreadPool();
278                         }
279                 }
280
281                 //
282                 // ACTIONS
283                 //
284
285                 /** Stops this connection. */
286                 public void stop() {
287                         stopped.set(true);
288                 }
289
290                 //
291                 // RUNNABLE METHODS
292                 //
293
294                 @Override
295                 public void run() {
296                         Metadata firstMetadata = null;
297                         while (!stopped.get()) {
298                                 try {
299                                         final Metadata lastMetadata = firstMetadata;
300                                         final Metadata metadata = firstMetadata = source.metadata();
301                                         final byte[] buffer;
302                                         try {
303                                                 logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
304                                                 buffer = source.get(4096);
305                                                 logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
306                                         } catch (IOException ioe1) {
307                                                 throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
308                                         }
309                                         List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
310
311                                                 @Override
312                                                 public Callable<Void> apply(final Sink sink) {
313                                                         return new Callable<Void>() {
314
315                                                                 @Override
316                                                                 public Void call() throws Exception {
317                                                                         if (!metadata.equals(lastMetadata)) {
318                                                                                 sink.metadataUpdated(metadata);
319                                                                         }
320                                                                         try {
321                                                                                 logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
322                                                                                 sink.process(buffer);
323                                                                                 logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
324                                                                         } catch (IOException ioe1) {
325                                                                                 throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
326                                                                         }
327                                                                         return null;
328                                                                 }
329                                                         };
330                                                 }
331                                         }).toList());
332                                         /* check all threads for exceptions. */
333                                         for (Future<Void> future : futures) {
334                                                 future.get();
335                                         }
336                                 } catch (IOException e) {
337                                         /* TODO */
338                                         e.printStackTrace();
339                                         break;
340                                 } catch (InterruptedException e) {
341                                         /* TODO */
342                                         e.printStackTrace();
343                                         break;
344                                 } catch (ExecutionException e) {
345                                         /* TODO */
346                                         e.printStackTrace();
347                                         break;
348                                 }
349                         }
350                 }
351
352         }
353
354 }