X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=9464f23491f897c745a1039e78c924acfb2e4e44;hb=ada071e03e74811d9d23af3fc388733a1c3ff12f;hp=3ee0c9da70e36d2440372e1121a0a0da88244002;hpb=7188da95cfb6dc2bf140eb8ac7e4dc99a0761a97;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 3ee0c9d..9464f23 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -35,6 +35,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.MoreExecutors; /** * A pipeline is responsible for streaming audio data from a {@link Source} to @@ -70,6 +71,32 @@ public class Pipeline { } // + // ACCESSORS + // + + /** + * Expose this pipeline’s source. + * + * @return This pipeline’s source + */ + public Source source() { + return source; + } + + /** + * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to + * the given source. + * + * @param source + * The source to get the sinks for + * @return The sinks connected to the given source, or an empty list if the + * source does not exist in this pipeline + */ + public Collection sinks(Source source) { + return sinks.get(source); + } + + // // ACTIONS // @@ -234,7 +261,7 @@ public class Pipeline { this.source = source; this.sinks = sinks; if (sinks.size() == 1) { - executorService = Executors.newSingleThreadExecutor(); + executorService = MoreExecutors.sameThreadExecutor(); } else { executorService = Executors.newCachedThreadPool(); } @@ -255,12 +282,19 @@ public class Pipeline { @Override public void run() { - Metadata firstMetadata = source.metadata(); + Metadata firstMetadata = null; while (!stopped.get()) { try { final Metadata lastMetadata = firstMetadata; final Metadata metadata = firstMetadata = source.metadata(); - final byte[] buffer = source.get(4096); + final byte[] buffer; + try { + logger.finest(String.format("Getting %d bytes from %s...", 4096, source)); + buffer = source.get(4096); + logger.finest(String.format("Got %d bytes from %s.", buffer.length, source)); + } catch (IOException ioe1) { + throw new IOException(String.format("I/O error while reading from %s.", source), ioe1); + } List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { @Override @@ -272,7 +306,13 @@ public class Pipeline { if (!metadata.equals(lastMetadata)) { sink.metadataUpdated(metadata); } - sink.process(buffer); + try { + logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink)); + sink.process(buffer); + logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink)); + } catch (IOException ioe1) { + throw new IOException(String.format("I/O error while writing to %s", sink), ioe1); + } return null; } }; @@ -285,12 +325,15 @@ public class Pipeline { } catch (IOException e) { /* TODO */ e.printStackTrace(); + break; } catch (InterruptedException e) { /* TODO */ e.printStackTrace(); + break; } catch (ExecutionException e) { /* TODO */ e.printStackTrace(); + break; } } }