Create threads with decent names.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index 75536fc..063a0bc 100644 (file)
@@ -72,7 +72,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
                for (ControlledComponent component : Lists.reverse(components())) {
-                       logger.finest(String.format("Adding Listener to %s.", component));
+                       logger.finest(String.format("Adding Listener to %s.", component.name()));
                        component.addMetadataListener(new MetadataListener() {
                                @Override
                                public void metadataUpdated(ControlledComponent component, Metadata metadata) {
@@ -80,7 +80,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                                                return;
                                        }
                                        for (ControlledComponent controlledComponent : sinks((Source) component)) {
-                                               logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
                                                controlledComponent.metadataUpdated(metadata);
                                        }
                                }
@@ -159,6 +159,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                        Collection<Sink> sinks = this.sinks.get(source);
                        connections.add(new Connection(source, sinks));
                        for (Sink sink : sinks) {
+                               logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
                                sink.open(source.metadata());
                                if (sink instanceof Filter) {
                                        sources.add((Source) sink);
@@ -166,8 +167,15 @@ public class Pipeline implements Iterable<ControlledComponent> {
                        }
                }
                for (Connection connection : connections) {
-                       logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
-                       new Thread(connection).start();
+                       String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+
+                               @Override
+                               public String apply(Sink sink) {
+                                       return sink.name();
+                               }
+                       }));
+                       logger.info(String.format("Starting Thread: %s", threadName));
+                       new Thread(connection, threadName).start();
                }
        }
 
@@ -324,6 +332,9 @@ public class Pipeline implements Iterable<ControlledComponent> {
                /** The executor service. */
                private final ExecutorService executorService;
 
+               /** The time the connection was started. */
+               private long startTime;
+
                /** The number of copied bytes. */
                private long counter;
 
@@ -338,7 +349,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                public Connection(Source source, Collection<Sink> sinks) {
                        this.source = source;
                        this.sinks = sinks;
-                       if (sinks.size() == 1) {
+                       if (sinks.size() < 2) {
                                executorService = MoreExecutors.sameThreadExecutor();
                        } else {
                                executorService = Executors.newCachedThreadPool();
@@ -350,6 +361,16 @@ public class Pipeline implements Iterable<ControlledComponent> {
                //
 
                /**
+                * Returns the time this connection was started.
+                *
+                * @return The time this connection was started (in milliseconds since Jan 1,
+                *         1970 UTC)
+                */
+               public long startTime() {
+                       return startTime;
+               }
+
+               /**
                 * Returns the number of bytes that this connection has received from its
                 * source during its lifetime.
                 *
@@ -374,7 +395,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
 
                @Override
                public void run() {
-                       Metadata firstMetadata = null;
+                       startTime = System.currentTimeMillis();
                        while (!stopped.get()) {
                                try {
                                        final byte[] buffer;