X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=ab6500e01ef6dc5bae315903b634a06a569511f2;hb=f260375da81abdf84e48545a505be6014e75978a;hp=ee67eec2f1fd89da67853fb1727b5bfb7ba64f7c;hpb=fa5196e029c48636f0c318311244395cfae49953;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index ee67eec..ab6500e 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -34,8 +34,8 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.MoreExecutors; @@ -71,6 +71,21 @@ public class Pipeline implements Iterable { private Pipeline(Source source, Multimap sinks) { this.source = Preconditions.checkNotNull(source, "source must not be null"); this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + for (ControlledComponent component : Lists.reverse(components())) { + logger.finest(String.format("Adding Listener to %s.", component.name())); + component.addMetadataListener(new MetadataListener() { + @Override + public void metadataUpdated(ControlledComponent component, Metadata metadata) { + if (!(component instanceof Source)) { + return; + } + for (ControlledComponent controlledComponent : sinks((Source) component)) { + logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata)); + controlledComponent.metadataUpdated(metadata); + } + } + }); + } } // @@ -144,6 +159,7 @@ public class Pipeline implements Iterable { Collection sinks = this.sinks.get(source); connections.add(new Connection(source, sinks)); for (Sink sink : sinks) { + logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata())); sink.open(source.metadata()); if (sink instanceof Filter) { sources.add((Source) sink); @@ -151,8 +167,15 @@ public class Pipeline implements Iterable { } } for (Connection connection : connections) { - logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks)); - new Thread(connection).start(); + String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function() { + + @Override + public String apply(Sink sink) { + return sink.name(); + } + })); + logger.info(String.format("Starting Thread: %s", threadName)); + new Thread(connection, threadName).start(); } } @@ -172,7 +195,34 @@ public class Pipeline implements Iterable { @Override public Iterator iterator() { - return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator(); + return components().iterator(); + } + + // + // PRIVATE METHODS + // + + /** + * Returns all components of this pipeline, listed breadth-first, starting with + * the source. + * + * @return All components of this pipeline + */ + public List components() { + ImmutableList.Builder components = ImmutableList.builder(); + List currentComponents = Lists.newArrayList(); + components.add(source); + currentComponents.add(source); + while (!currentComponents.isEmpty()) { + Collection sinks = this.sinks((Source) currentComponents.remove(0)); + for (Sink sink : sinks) { + components.add(sink); + if (sink instanceof Source) { + currentComponents.add(sink); + } + } + } + return components.build(); } // @@ -282,6 +332,9 @@ public class Pipeline implements Iterable { /** The executor service. */ private final ExecutorService executorService; + /** The time the connection was started. */ + private long startTime; + /** The number of copied bytes. */ private long counter; @@ -296,7 +349,7 @@ public class Pipeline implements Iterable { public Connection(Source source, Collection sinks) { this.source = source; this.sinks = sinks; - if (sinks.size() == 1) { + if (sinks.size() < 2) { executorService = MoreExecutors.sameThreadExecutor(); } else { executorService = Executors.newCachedThreadPool(); @@ -308,6 +361,16 @@ public class Pipeline implements Iterable { // /** + * Returns the time this connection was started. + * + * @return The time this connection was started (in milliseconds since Jan 1, + * 1970 UTC) + */ + public long startTime() { + return startTime; + } + + /** * Returns the number of bytes that this connection has received from its * source during its lifetime. * @@ -332,18 +395,16 @@ public class Pipeline implements Iterable { @Override public void run() { - Metadata firstMetadata = null; + startTime = System.currentTimeMillis(); while (!stopped.get()) { try { - final Metadata lastMetadata = firstMetadata; - final Metadata metadata = firstMetadata = source.metadata(); final byte[] buffer; try { - logger.finest(String.format("Getting %d bytes from %s...", 4096, source)); + logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name())); buffer = source.get(4096); - logger.finest(String.format("Got %d bytes from %s.", buffer.length, source)); + logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name())); } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while reading from %s.", source), ioe1); + throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1); } List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { @@ -353,15 +414,12 @@ public class Pipeline implements Iterable { @Override public Void call() throws Exception { - if (!metadata.equals(lastMetadata)) { - sink.metadataUpdated(metadata); - } try { - logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink)); + logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name())); sink.process(buffer); - logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink)); + logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name())); } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while writing to %s", sink), ioe1); + throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1); } return null; }