package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Arrays;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.class */
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
    private final Function<V, KO> foreignKeyExtractor;
    private final Supplier<String> foreignKeySerdeTopicSupplier;
    private final Supplier<String> valueSerdeTopicSupplier;
    private final boolean leftJoin;
    private Serializer<KO> foreignKeySerializer;
    private Serializer<V> valueSerializer;

    /* loaded from: input_file:WEB-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.class */
    private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
        private Sensor droppedRecordsSensor;
        private String foreignKeySerdeTopic;
        private String valueSerdeTopic;

        private UnbindChangeProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.foreignKeySerdeTopic = (String) ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerdeTopicSupplier.get();
            this.valueSerdeTopic = (String) ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerdeTopicSupplier.get();
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer = processorContext.keySerde().serializer();
            }
            if (ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer = processorContext.valueSerde().serializer();
            }
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void process(K k, Change<V> change) {
            long[] hash128 = change.newValue == null ? null : Murmur3.hash128(ForeignJoinSubscriptionSendProcessorSupplier.this.valueSerializer.serialize(this.valueSerdeTopic, change.newValue));
            if (change.oldValue == null) {
                if (change.newValue != null) {
                    SubscriptionWrapper.Instruction instruction = ForeignJoinSubscriptionSendProcessorSupplier.this.leftJoin ? SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE : SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
                    Object apply = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.newValue);
                    if (apply != null) {
                        context().forward(apply, new SubscriptionWrapper(hash128, instruction, k));
                        return;
                    } else {
                        ForeignJoinSubscriptionSendProcessorSupplier.LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", change.newValue, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                        this.droppedRecordsSensor.record();
                        return;
                    }
                }
                return;
            }
            Object apply2 = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.oldValue);
            if (apply2 == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", change.oldValue, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.droppedRecordsSensor.record();
                return;
            }
            if (change.newValue == null) {
                context().forward(apply2, new SubscriptionWrapper(hash128, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, k));
                return;
            }
            Object apply3 = ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeyExtractor.apply(change.newValue);
            if (apply3 == null) {
                ForeignJoinSubscriptionSendProcessorSupplier.LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", change.newValue, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.droppedRecordsSensor.record();
            } else {
                if (!Arrays.equals(ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(this.foreignKeySerdeTopic, apply3), ForeignJoinSubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(this.foreignKeySerdeTopic, apply2))) {
                    context().forward(apply2, new SubscriptionWrapper(hash128, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE, k));
                }
                context().forward(apply3, new SubscriptionWrapper(hash128, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, k));
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((UnbindChangeProcessor) obj, (Change) obj2);
        }
    }

    public ForeignJoinSubscriptionSendProcessorSupplier(Function<V, KO> function, Supplier<String> supplier, Supplier<String> supplier2, Serde<KO> serde, Serializer<V> serializer, boolean z) {
        this.foreignKeyExtractor = function;
        this.foreignKeySerdeTopicSupplier = supplier;
        this.valueSerdeTopicSupplier = supplier2;
        this.valueSerializer = serializer;
        this.leftJoin = z;
        this.foreignKeySerializer = serde == null ? null : serde.serializer();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, Change<V>> get() {
        return new UnbindChangeProcessor();
    }
}
