import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* A pipeline is responsible for streaming audio data from a {@link Source} to
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Expose this pipeline’s source.
+ *
+ * @return This pipeline’s source
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+ * the given source.
+ *
+ * @param source
+ * The source to get the sinks for
+ * @return The sinks connected to the given source, or an empty list if the
+ * source does not exist in this pipeline
+ */
+ public Collection<Sink> sinks(Source source) {
+ return sinks.get(source);
+ }
+
+ //
// ACTIONS
//
this.source = source;
this.sinks = sinks;
if (sinks.size() == 1) {
- executorService = Executors.newSingleThreadExecutor();
+ executorService = MoreExecutors.sameThreadExecutor();
} else {
executorService = Executors.newCachedThreadPool();
}
try {
final Metadata lastMetadata = firstMetadata;
final Metadata metadata = firstMetadata = source.metadata();
- final byte[] buffer = source.get(4096);
+ final byte[] buffer;
+ try {
+ logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+ buffer = source.get(4096);
+ logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+ } catch (IOException ioe1) {
+ throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+ }
List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
@Override
if (!metadata.equals(lastMetadata)) {
sink.metadataUpdated(metadata);
}
- sink.process(buffer);
+ try {
+ logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+ sink.process(buffer);
+ logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+ } catch (IOException ioe1) {
+ throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+ }
return null;
}
};
} catch (IOException e) {
/* TODO */
e.printStackTrace();
+ break;
} catch (InterruptedException e) {
/* TODO */
e.printStackTrace();
+ break;
} catch (ExecutionException e) {
/* TODO */
e.printStackTrace();
+ break;
}
}
}