X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=45d20bf313845264aee913ebd07f65a584491bb7;hb=f3d55e97b4d097b415a3577191e2d9030ba28258;hp=1487d138eecc927f07e91732bf6ac14614256c85;hpb=668abadea3b268169c839d8b88260295085a4306;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 1487d13..45d20bf 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -71,6 +71,21 @@ public class Pipeline implements Iterable { private Pipeline(Source source, Multimap sinks) { this.source = Preconditions.checkNotNull(source, "source must not be null"); this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + for (ControlledComponent component : Lists.reverse(components())) { + logger.finest(String.format("Adding Listener to %s.", component.name())); + component.addMetadataListener(new MetadataListener() { + @Override + public void metadataUpdated(ControlledComponent component, Metadata metadata) { + if (!(component instanceof Source)) { + return; + } + for (ControlledComponent controlledComponent : sinks((Source) component)) { + logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata)); + controlledComponent.metadataUpdated(metadata); + } + } + }); + } } // @@ -144,6 +159,7 @@ public class Pipeline implements Iterable { Collection sinks = this.sinks.get(source); connections.add(new Connection(source, sinks)); for (Sink sink : sinks) { + logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata())); sink.open(source.metadata()); if (sink instanceof Filter) { sources.add((Source) sink); @@ -151,7 +167,12 @@ public class Pipeline implements Iterable { } } for (Connection connection : connections) { - logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks)); + logger.info(String.format("Starting Connection from %s to %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function() { + @Override + public String apply(Sink sink) { + return sink.name(); + } + }))); new Thread(connection).start(); } } @@ -309,6 +330,9 @@ public class Pipeline implements Iterable { /** The executor service. */ private final ExecutorService executorService; + /** The time the connection was started. */ + private long startTime; + /** The number of copied bytes. */ private long counter; @@ -335,6 +359,16 @@ public class Pipeline implements Iterable { // /** + * Returns the time this connection was started. + * + * @return The time this connection was started (in milliseconds since Jan 1, + * 1970 UTC) + */ + public long startTime() { + return startTime; + } + + /** * Returns the number of bytes that this connection has received from its * source during its lifetime. * @@ -359,7 +393,7 @@ public class Pipeline implements Iterable { @Override public void run() { - Metadata firstMetadata = null; + startTime = System.currentTimeMillis(); while (!stopped.get()) { try { final byte[] buffer;