X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=9b6d1607cc64c3c60c1a057b91a37b410ecb7e23;hb=d77d156488f12a0f141a9be98ed23c2b9bee4f7a;hp=93648bc56c18136449512f4336ae464e8748394f;hpb=940ea8e8a65ad0d08fc2938b84a09b0e4c6b518c;p=sonitus.git
diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
index 93648bc..9b6d160 100644
--- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
+++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
@@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
@@ -43,7 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors;
*
* @author David âBombeâ Roden
*/
-public class Pipeline {
+public class Pipeline implements Iterable {
/** The logger. */
private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -71,6 +73,32 @@ public class Pipeline {
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Expose this pipelineâs source.
+ *
+ * @return This pipelineâs source
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+ * the given source.
+ *
+ * @param source
+ * The source to get the sinks for
+ * @return The sinks connected to the given source, or an empty list if the
+ * source does not exist in this pipeline
+ */
+ public Collection sinks(Source source) {
+ return sinks.get(source);
+ }
+
+ //
// ACTIONS
//
@@ -117,6 +145,15 @@ public class Pipeline {
}
//
+ // ITERABLE METHODS
+ //
+
+ @Override
+ public Iterator iterator() {
+ return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator();
+ }
+
+ //
// STATIC METHODS
//
@@ -261,7 +298,14 @@ public class Pipeline {
try {
final Metadata lastMetadata = firstMetadata;
final Metadata metadata = firstMetadata = source.metadata();
- final byte[] buffer = source.get(4096);
+ final byte[] buffer;
+ try {
+ logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+ buffer = source.get(4096);
+ logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+ } catch (IOException ioe1) {
+ throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+ }
List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() {
@Override
@@ -273,7 +317,13 @@ public class Pipeline {
if (!metadata.equals(lastMetadata)) {
sink.metadataUpdated(metadata);
}
- sink.process(buffer);
+ try {
+ logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+ sink.process(buffer);
+ logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+ } catch (IOException ioe1) {
+ throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+ }
return null;
}
};