package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/processor/internals/StandbyTaskCreator.class */
public class StandbyTaskCreator {
    private final InternalTopologyBuilder builder;
    private final StreamsConfig config;
    private final StreamsMetricsImpl streamsMetrics;
    private final StateDirectory stateDirectory;
    private final ChangelogReader storeChangelogReader;
    private final ThreadCache dummyCache;
    private final Logger log;
    private final Sensor createTaskSensor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandbyTaskCreator(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ChangelogReader changelogReader, String str, Logger logger) {
        this.builder = internalTopologyBuilder;
        this.config = streamsConfig;
        this.streamsMetrics = streamsMetricsImpl;
        this.stateDirectory = stateDirectory;
        this.storeChangelogReader = changelogReader;
        this.log = logger;
        this.createTaskSensor = ThreadMetrics.createTaskSensor(str, streamsMetricsImpl);
        this.dummyCache = new ThreadCache(new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), 0L, streamsMetricsImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<Task> createTasks(Map<TaskId, Set<TopicPartition>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : map.entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            ProcessorTopology buildSubtopology = this.builder.buildSubtopology(key.topicGroupId);
            if (buildSubtopology.hasStateWithChangelogs()) {
                ProcessorStateManager processorStateManager = new ProcessorStateManager(key, Task.TaskType.STANDBY, StreamThread.eosEnabled(this.config), getLogContext(key), this.stateDirectory, this.storeChangelogReader, buildSubtopology.storeToChangelogTopic(), value);
                arrayList.add(createStandbyTask(key, value, buildSubtopology, processorStateManager, new ProcessorContextImpl(key, this.config, processorStateManager, this.streamsMetrics, this.dummyCache)));
            } else {
                this.log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", key, value);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandbyTask createStandbyTaskFromActive(StreamTask streamTask, Set<TopicPartition> set) {
        InternalProcessorContext processorContext = streamTask.processorContext();
        ProcessorStateManager processorStateManager = streamTask.stateMgr;
        streamTask.closeCleanAndRecycleState();
        processorStateManager.transitionTaskType(Task.TaskType.STANDBY, getLogContext(streamTask.id()));
        return createStandbyTask(streamTask.id(), set, this.builder.buildSubtopology(streamTask.id.topicGroupId), processorStateManager, processorContext);
    }

    StandbyTask createStandbyTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, ProcessorStateManager processorStateManager, InternalProcessorContext internalProcessorContext) {
        StandbyTask standbyTask = new StandbyTask(taskId, set, processorTopology, this.config, this.streamsMetrics, processorStateManager, this.stateDirectory, this.dummyCache, internalProcessorContext);
        this.log.trace("Created task {} with assigned partitions {}", taskId, set);
        this.createTaskSensor.record();
        return standbyTask;
    }

    public InternalTopologyBuilder builder() {
        return this.builder;
    }

    public StateDirectory stateDirectory() {
        return this.stateDirectory;
    }

    private LogContext getLogContext(TaskId taskId) {
        return new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()) + String.format("%s [%s] ", "standby-task", taskId));
    }
}
