- private <M> void consume(Consumer<M> consumer, M message) {
- consumer.accept(message);
- synchronized (syncObject) {
- syncObject.notifyAll();
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+ consume(consumer, message, "Identifier");
+ }
+
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+ String identifier) {
+ if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+ consumeAlways(consumer, message);