}
@Override
- public void process(byte[] buffer) throws IOException {
- outputStream.write(buffer);
+ public void process(DataPacket dataPacket) throws IOException {
+ if (dataPacket.metadata().isPresent() && !dataPacket.metadata().get().equalsIgnoreComment(this.metadata.get())) {
+ metadataUpdated(dataPacket.metadata().get());
+ }
+ outputStream.write(dataPacket.buffer());
outputStream.flush();
}
@Override
- public byte[] get(int bufferSize) throws IOException {
+ public DataPacket get(int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
int read = inputStream.read(buffer);
if (read == -1) {
throw new EOFException();
}
- return Arrays.copyOf(buffer, read);
+ return new DataPacket(metadata(), Arrays.copyOf(buffer, read));
}
//
--- /dev/null
+/*
+ * Sonitus - DataPacket.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+
+/**
+ * A data packet is a container for audio data and optional metadata.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class DataPacket {
+
+ /** The metadata. */
+ private final Optional<Metadata> metadata;
+
+ /** The audio data. */
+ private final byte[] buffer;
+
+ /**
+ * Creates a new data packet.
+ *
+ * @param metadata
+ * The metadata (may be {@code null})
+ * @param buffer
+ * The audio data
+ */
+ public DataPacket(Metadata metadata, byte[] buffer) {
+ this(Optional.fromNullable(metadata), buffer);
+ }
+
+ /**
+ * Creates a new data packet.
+ *
+ * @param metadata
+ * The metadata
+ * @param buffer
+ * The audio date
+ */
+ public DataPacket(Optional<Metadata> metadata, byte[] buffer) {
+ this.metadata = checkNotNull(metadata, "metadata must not be null");
+ this.buffer = checkNotNull(buffer, "buffer must not be null");
+ }
+
+ //
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the metadata of this data packet.
+ *
+ * @return The metadata of this data packet
+ */
+ public Optional<Metadata> metadata() {
+ return metadata;
+ }
+
+ /**
+ * Returns the audio data of this data packet.
+ *
+ * @return The audio data of this data packet
+ */
+ public byte[] buffer() {
+ return buffer;
+ }
+
+ //
+ // OBJECT METHODS
+ //
+
+ @Override
+ public String toString() {
+ return String.format("%s (%d)", metadata, buffer.length);
+ }
+
+}
*
* @param bufferSize
* The maximum amount of bytes to retrieve from the audio stream
- * @return A buffer filled with up to {@code bufferSize} bytes of data; the
+ * @return A data packet containing the metadata of the stream (optional) and
+ * the buffer filled with up to {@code bufferSize} bytes of data; the
* returned buffer may contain less data than requested but will not
- * contain excess elements
+ * contain excess elements (i.e. it can be smaller than the requested
+ * size)
* @throws IOException
* if an I/O error occurs
*/
- byte[] get(int bufferSize) throws IOException;
+ DataPacket get(int bufferSize) throws IOException;
/**
* Opens this sink using the format parameters of the given metadata.
void close();
/**
- * Processes the given buffer of data.
+ * Processes the given data packet.
*
- * @param buffer
+ * @param dataPacket
* The data to process
* @throws IOException
* if an I/O error occurs
*/
- void process(byte[] buffer) throws IOException;
+ void process(DataPacket dataPacket) throws IOException;
}
private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
this.source = Preconditions.checkNotNull(source, "source must not be null");
this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
- for (Filter filter : Lists.reverse(filters())) {
- logger.finest(String.format("Adding Listener to %s.", filter.name()));
- filter.addMetadataListener(new MetadataListener() {
-
- @Override
- public void metadataUpdated(Filter filter, Metadata metadata) {
- for (Filter sinks : filters(filter)) {
- logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
- sinks.metadataUpdated(metadata);
- }
- }
- });
- }
}
//
startTime = System.currentTimeMillis();
while (!stopped.get()) {
try {
- final byte[] buffer;
+ final DataPacket dataPacket;
try {
logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
- buffer = source.get(4096);
- logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
+ dataPacket = source.get(4096);
+ logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
} catch (IOException ioe1) {
throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
}
@Override
public Void call() throws Exception {
try {
- logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
- sink.process(buffer);
- logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
+ logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
+ sink.process(dataPacket);
+ logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
} catch (IOException ioe1) {
throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
}
for (Future<Void> future : futures) {
future.get();
}
- counter += buffer.length;
+ counter += dataPacket.buffer().length;
} catch (IOException e) {
/* TODO */
e.printStackTrace();
import java.util.logging.Logger;
import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
}
@Override
- public void process(byte[] buffer) throws IOException {
- super.process(buffer);
+ public void process(DataPacket dataPacket) throws IOException {
+ super.process(dataPacket);
/* delay. */
- counter += buffer.length;
+ counter += dataPacket.buffer().length;
long waitTime = (long) (counter / (rate / 1000.0));
while ((System.currentTimeMillis() - startTime) < waitTime) {
try {
import java.util.concurrent.atomic.AtomicReference;
import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
/**
* {@link Filter} implementation that uses the number of bytes that have been
- * {@link #process(byte[]) processed} together with the {@link Metadata} to
+ * {@link #process(DataPacket) processed} together with the {@link Metadata} to
* calculate how long a source is already playing.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
/**
* Returns the number of milliseconds worth of data that has been passed into
- * {@link #process(byte[])}. If no metadata has yet been set, {@code 0} is
+ * {@link #process(DataPacket)}. If no metadata has yet been set, {@code 0} is
* returned.
*
* @return The number of milliseconds the current source is already playing
}
@Override
- public void process(byte[] buffer) throws IOException {
- super.process(buffer);
- counter.getAndAdd(buffer.length);
+ public void process(DataPacket dataPacket) throws IOException {
+ super.process(dataPacket);
+ counter.getAndAdd(dataPacket.buffer().length);
updateTimestamp(false);
}
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.data.controller.Fader;
}
@Override
- public void process(byte[] buffer) throws IOException {
- sourceDataLineOutputStream.write(buffer);
- super.process(buffer);
- logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
+ public void process(DataPacket dataPacket) throws IOException {
+ sourceDataLineOutputStream.write(dataPacket.buffer());
+ super.process(dataPacket);
+ logger.finest(String.format("AudioSink: Wrote %d Bytes.", dataPacket.buffer().length));
}
//
import java.util.logging.Logger;
import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Metadata;
/**
}
@Override
- public void process(byte[] buffer) throws IOException {
- fileOutputStream.write(buffer);
- logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
+ public void process(DataPacket dataPacket) throws IOException {
+ fileOutputStream.write(dataPacket.buffer());
+ logger.finest(String.format("FileSink: Wrote %d Bytes.", dataPacket.buffer().length));
}
}
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
+import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.io.InputStreamDrainer;
}
@Override
- public void process(byte[] buffer) throws IOException {
- socketOutputStream.write(buffer);
+ public void process(DataPacket dataPacket) throws IOException {
+ socketOutputStream.write(dataPacket.buffer());
socketOutputStream.flush();
}
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.io.IdentifyingInputStream;
}
@Override
- public byte[] get(int bufferSize) throws IOException {
+ public DataPacket get(int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
int read = fileInputStream.read(buffer);
if (read == -1) {
throw new EOFException();
}
- return Arrays.copyOf(buffer, read);
+ return new DataPacket(metadata(), Arrays.copyOf(buffer, read));
}
//
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
}
@Override
- public byte[] get(int bufferSize) throws EOFException, IOException {
+ public DataPacket get(int bufferSize) throws EOFException, IOException {
while (true) {
try {
return source.get().get(bufferSize);
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.ContentMetadata;
import net.pterodactylus.sonitus.data.Controller;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.FormatMetadata;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.io.MetadataStream;
}
@Override
- public byte[] get(int bufferSize) throws IOException {
+ public DataPacket get(int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
metadataStream.read(buffer);
- return buffer;
+ return new DataPacket(metadata(), buffer);
}
//