Transport metadata inline.
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Wed, 5 Jun 2013 04:29:53 +0000 (06:29 +0200)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Thu, 6 Jun 2013 20:28:12 +0000 (22:28 +0200)
12 files changed:
src/main/java/net/pterodactylus/sonitus/data/AbstractFilter.java
src/main/java/net/pterodactylus/sonitus/data/DataPacket.java [new file with mode: 0644]
src/main/java/net/pterodactylus/sonitus/data/Filter.java
src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java
src/main/java/net/pterodactylus/sonitus/data/filter/TimeCounterFilter.java
src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java
src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java
src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java
src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java
src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java
src/main/java/net/pterodactylus/sonitus/data/source/StreamSource.java

index 6fa6511..981f42e 100644 (file)
@@ -124,19 +124,22 @@ public abstract class AbstractFilter implements Filter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               outputStream.write(buffer);
+       public void process(DataPacket dataPacket) throws IOException {
+               if (dataPacket.metadata().isPresent() && !dataPacket.metadata().get().equalsIgnoreComment(this.metadata.get())) {
+                       metadataUpdated(dataPacket.metadata().get());
+               }
+               outputStream.write(dataPacket.buffer());
                outputStream.flush();
        }
 
        @Override
-       public byte[] get(int bufferSize) throws IOException {
+       public DataPacket get(int bufferSize) throws IOException {
                byte[] buffer = new byte[bufferSize];
                int read = inputStream.read(buffer);
                if (read == -1) {
                        throw new EOFException();
                }
-               return Arrays.copyOf(buffer, read);
+               return new DataPacket(metadata(), Arrays.copyOf(buffer, read));
        }
 
        //
diff --git a/src/main/java/net/pterodactylus/sonitus/data/DataPacket.java b/src/main/java/net/pterodactylus/sonitus/data/DataPacket.java
new file mode 100644 (file)
index 0000000..707bb59
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Sonitus - DataPacket.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+
+/**
+ * A data packet is a container for audio data and optional metadata.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class DataPacket {
+
+       /** The metadata. */
+       private final Optional<Metadata> metadata;
+
+       /** The audio data. */
+       private final byte[] buffer;
+
+       /**
+        * Creates a new data packet.
+        *
+        * @param metadata
+        *              The metadata (may be {@code null})
+        * @param buffer
+        *              The audio data
+        */
+       public DataPacket(Metadata metadata, byte[] buffer) {
+               this(Optional.fromNullable(metadata), buffer);
+       }
+
+       /**
+        * Creates a new data packet.
+        *
+        * @param metadata
+        *              The metadata
+        * @param buffer
+        *              The audio date
+        */
+       public DataPacket(Optional<Metadata> metadata, byte[] buffer) {
+               this.metadata = checkNotNull(metadata, "metadata must not be null");
+               this.buffer = checkNotNull(buffer, "buffer must not be null");
+       }
+
+       //
+       // ACCESSORS
+       //
+
+       /**
+        * Returns the metadata of this data packet.
+        *
+        * @return The metadata of this data packet
+        */
+       public Optional<Metadata> metadata() {
+               return metadata;
+       }
+
+       /**
+        * Returns the audio data of this data packet.
+        *
+        * @return The audio data of this data packet
+        */
+       public byte[] buffer() {
+               return buffer;
+       }
+
+       //
+       // OBJECT METHODS
+       //
+
+       @Override
+       public String toString() {
+               return String.format("%s (%d)", metadata, buffer.length);
+       }
+
+}
index ec8fb94..2b27e9f 100644 (file)
@@ -80,13 +80,15 @@ public interface Filter {
         *
         * @param bufferSize
         *              The maximum amount of bytes to retrieve from the audio stream
-        * @return A buffer filled with up to {@code bufferSize} bytes of data; the
+        * @return A data packet containing the metadata of the stream (optional) and
+        *         the buffer filled with up to {@code bufferSize} bytes of data; the
         *         returned buffer may contain less data than requested but will not
-        *         contain excess elements
+        *         contain excess elements (i.e. it can be smaller than the requested
+        *         size)
         * @throws IOException
         *              if an I/O error occurs
         */
-       byte[] get(int bufferSize) throws IOException;
+       DataPacket get(int bufferSize) throws IOException;
 
        /**
         * Opens this sink using the format parameters of the given metadata.
@@ -102,13 +104,13 @@ public interface Filter {
        void close();
 
        /**
-        * Processes the given buffer of data.
+        * Processes the given data packet.
         *
-        * @param buffer
+        * @param dataPacket
         *              The data to process
         * @throws IOException
         *              if an I/O error occurs
         */
-       void process(byte[] buffer) throws IOException;
+       void process(DataPacket dataPacket) throws IOException;
 
 }
index ae4924b..46d2374 100644 (file)
@@ -72,19 +72,6 @@ public class Pipeline implements Iterable<Filter> {
        private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
-               for (Filter filter : Lists.reverse(filters())) {
-                       logger.finest(String.format("Adding Listener to %s.", filter.name()));
-                       filter.addMetadataListener(new MetadataListener() {
-
-                               @Override
-                               public void metadataUpdated(Filter filter, Metadata metadata) {
-                                       for (Filter sinks : filters(filter)) {
-                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
-                                               sinks.metadataUpdated(metadata);
-                                       }
-                               }
-                       });
-               }
        }
 
        //
@@ -389,11 +376,11 @@ public class Pipeline implements Iterable<Filter> {
                        startTime = System.currentTimeMillis();
                        while (!stopped.get()) {
                                try {
-                                       final byte[] buffer;
+                                       final DataPacket dataPacket;
                                        try {
                                                logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
-                                               buffer = source.get(4096);
-                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
+                                               dataPacket = source.get(4096);
+                                               logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
                                        } catch (IOException ioe1) {
                                                throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
                                        }
@@ -406,9 +393,9 @@ public class Pipeline implements Iterable<Filter> {
                                                                @Override
                                                                public Void call() throws Exception {
                                                                        try {
-                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
-                                                                               sink.process(buffer);
-                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
+                                                                               logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
+                                                                               sink.process(dataPacket);
+                                                                               logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
                                                                        } catch (IOException ioe1) {
                                                                                throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
                                                                        }
@@ -421,7 +408,7 @@ public class Pipeline implements Iterable<Filter> {
                                        for (Future<Void> future : futures) {
                                                future.get();
                                        }
-                                       counter += buffer.length;
+                                       counter += dataPacket.buffer().length;
                                } catch (IOException e) {
                                        /* TODO */
                                        e.printStackTrace();
index 04ed2d1..609eb69 100644 (file)
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.logging.Logger;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 
@@ -83,10 +84,10 @@ public class RateLimitingFilter extends AbstractFilter implements Filter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               super.process(buffer);
+       public void process(DataPacket dataPacket) throws IOException {
+               super.process(dataPacket);
                /* delay. */
-               counter += buffer.length;
+               counter += dataPacket.buffer().length;
                long waitTime = (long) (counter / (rate / 1000.0));
                while ((System.currentTimeMillis() - startTime) < waitTime) {
                        try {
index 0eb8281..4c58acc 100644 (file)
@@ -22,12 +22,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 
 /**
  * {@link Filter} implementation that uses the number of bytes that have been
- * {@link #process(byte[]) processed} together with the {@link Metadata} to
+ * {@link #process(DataPacket) processed} together with the {@link Metadata} to
  * calculate how long a source is already playing.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
@@ -77,7 +78,7 @@ public class TimeCounterFilter extends AbstractFilter implements Filter {
 
        /**
         * Returns the number of milliseconds worth of data that has been passed into
-        * {@link #process(byte[])}. If no metadata has yet been set, {@code 0} is
+        * {@link #process(DataPacket)}. If no metadata has yet been set, {@code 0} is
         * returned.
         *
         * @return The number of milliseconds the current source is already playing
@@ -109,9 +110,9 @@ public class TimeCounterFilter extends AbstractFilter implements Filter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               super.process(buffer);
-               counter.getAndAdd(buffer.length);
+       public void process(DataPacket dataPacket) throws IOException {
+               super.process(dataPacket);
+               counter.getAndAdd(dataPacket.buffer().length);
                updateTimestamp(false);
        }
 
index 6cace62..fda40fd 100644 (file)
@@ -36,6 +36,7 @@ import javax.sound.sampled.SourceDataLine;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.data.controller.Fader;
@@ -172,10 +173,10 @@ public class AudioSink extends AbstractFilter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               sourceDataLineOutputStream.write(buffer);
-               super.process(buffer);
-               logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
+       public void process(DataPacket dataPacket) throws IOException {
+               sourceDataLineOutputStream.write(dataPacket.buffer());
+               super.process(dataPacket);
+               logger.finest(String.format("AudioSink: Wrote %d Bytes.", dataPacket.buffer().length));
        }
 
        //
index 5011321..e8c550f 100644 (file)
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.logging.Logger;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Metadata;
 
 /**
@@ -72,9 +73,9 @@ public class FileSink extends AbstractFilter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               fileOutputStream.write(buffer);
-               logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
+       public void process(DataPacket dataPacket) throws IOException {
+               fileOutputStream.write(dataPacket.buffer());
+               logger.finest(String.format("FileSink: Wrote %d Bytes.", dataPacket.buffer().length));
        }
 
 }
index f98fba5..5faa1ec 100644 (file)
@@ -30,6 +30,8 @@ import java.util.logging.Logger;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
+import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.io.InputStreamDrainer;
 
@@ -190,8 +192,8 @@ public class Icecast2Sink extends AbstractFilter {
        }
 
        @Override
-       public void process(byte[] buffer) throws IOException {
-               socketOutputStream.write(buffer);
+       public void process(DataPacket dataPacket) throws IOException {
+               socketOutputStream.write(dataPacket.buffer());
                socketOutputStream.flush();
        }
 
index 7251f65..01216cd 100644 (file)
@@ -29,6 +29,7 @@ import java.util.List;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.io.IdentifyingInputStream;
@@ -82,13 +83,13 @@ public class FileSource extends AbstractFilter {
        }
 
        @Override
-       public byte[] get(int bufferSize) throws IOException {
+       public DataPacket get(int bufferSize) throws IOException {
                byte[] buffer = new byte[bufferSize];
                int read = fileInputStream.read(buffer);
                if (read == -1) {
                        throw new EOFException();
                }
-               return Arrays.copyOf(buffer, read);
+               return new DataPacket(metadata(), Arrays.copyOf(buffer, read));
        }
 
        //
index b8afd82..0ff1126 100644 (file)
@@ -29,6 +29,7 @@ import javax.swing.event.EventListenerList;
 
 import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 
@@ -144,7 +145,7 @@ public class MultiSource extends AbstractFilter {
        }
 
        @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
+       public DataPacket get(int bufferSize) throws EOFException, IOException {
                while (true) {
                        try {
                                return source.get().get(bufferSize);
index a66fb7a..2efc66d 100644 (file)
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
 import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.ContentMetadata;
 import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
 import net.pterodactylus.sonitus.data.FormatMetadata;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.io.MetadataStream;
@@ -150,10 +151,10 @@ public class StreamSource extends AbstractFilter {
        }
 
        @Override
-       public byte[] get(int bufferSize) throws IOException {
+       public DataPacket get(int bufferSize) throws IOException {
                byte[] buffer = new byte[bufferSize];
                metadataStream.read(buffer);
-               return buffer;
+               return new DataPacket(metadata(), buffer);
        }
 
        //