package org.eclipse.dirigible.api.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.dirigible.commons.api.helpers.GsonHelper;
import org.eclipse.dirigible.commons.api.scripting.IScriptingFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/dirigible-api-facade-kafka-5.5.0.jar:org/eclipse/dirigible/api/kafka/KafkaFacade.class */
public class KafkaFacade implements IScriptingFacade {
    private static final String DEFAULT_BOOTSTRAP_SERVER = "localhost:9092";
    private static final Logger logger = LoggerFactory.getLogger(KafkaFacade.class);
    private static Map<String, Producer<String, String>> PRODUCERS = Collections.synchronizedMap(new HashMap());
    private static Map<String, KafkaConsumerRunner> CONSUMERS = Collections.synchronizedMap(new HashMap());

    public static final void send(String str, String str2, String str3, String str4) {
        if (str4 == null) {
            str4 = "{}";
        }
        Map map = (Map) GsonHelper.GSON.fromJson(str4, Map.class);
        String obj = map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : DEFAULT_BOOTSTRAP_SERVER;
        Producer<String, String> producer = obj != null ? PRODUCERS.get(obj) : null;
        if (producer == null) {
            Properties properties = new Properties();
            for (Object obj2 : map.keySet()) {
                properties.put(obj2, map.get(obj2));
            }
            if (properties.get("bootstrap.servers") == null) {
                properties.put("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVER);
            }
            if (properties.get(ProducerConfig.ACKS_CONFIG) == null) {
                properties.put(ProducerConfig.ACKS_CONFIG, "all");
            }
            if (properties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            }
            if (properties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            }
            producer = new KafkaProducer(properties);
            PRODUCERS.put(obj, producer);
            logger.info("Kafka Producer [{}] created.", obj);
        }
        producer.send(new ProducerRecord<>(str, str2, str3));
    }

    public static final void closeProducer(String str) {
        if (str == null) {
            str = "{}";
        }
        Map map = (Map) GsonHelper.GSON.fromJson(str, Map.class);
        Producer<String, String> producer = null;
        String obj = map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : DEFAULT_BOOTSTRAP_SERVER;
        if (obj != null) {
            producer = PRODUCERS.get(obj);
        }
        if (producer == null) {
            logger.warn("Kafka Producer [{}] has not been started yet.", obj);
        } else {
            producer.close();
            PRODUCERS.remove(obj);
        }
    }

    public static final void startListening(String str, String str2, int i, String str3) {
        if (str3 == null) {
            str3 = "{}";
        }
        Map map = (Map) GsonHelper.GSON.fromJson(str3, Map.class);
        String createLocation = createLocation(str, map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : DEFAULT_BOOTSTRAP_SERVER);
        if (CONSUMERS.get(createLocation) != null) {
            logger.warn("Kafka Consumer [{}] has already been started.", createLocation);
            return;
        }
        Properties properties = new Properties();
        for (Object obj : map.keySet()) {
            properties.put(obj, map.get(obj));
        }
        if (properties.get("bootstrap.servers") == null) {
            properties.put("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVER);
        }
        if (properties.get("group.id") == null) {
            properties.put("group.id", str2 != null ? str2 : str);
        }
        if (properties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null) {
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        }
        if (properties.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG) == null) {
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        }
        if (properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        }
        if (properties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        }
        KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(new KafkaConsumer(properties), str, str2, i);
        Thread thread = new Thread(kafkaConsumerRunner);
        thread.setDaemon(false);
        thread.start();
        CONSUMERS.put(createLocation, kafkaConsumerRunner);
        logger.info("Kafka Consumer [{}] created.", createLocation);
    }

    public static final void stopListening(String str, String str2) {
        if (str2 == null) {
            str2 = "{}";
        }
        Map map = (Map) GsonHelper.GSON.fromJson(str2, Map.class);
        String createLocation = createLocation(str, map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : DEFAULT_BOOTSTRAP_SERVER);
        KafkaConsumerRunner kafkaConsumerRunner = CONSUMERS.get(createLocation);
        if (kafkaConsumerRunner == null) {
            logger.warn("Kafka Consumer [" + createLocation + "] has not been started yet.");
        } else {
            kafkaConsumerRunner.stop();
            CONSUMERS.remove(createLocation);
        }
    }

    private static String createLocation(String str, String str2) {
        return "[" + str2 + "]:[" + str + "]";
    }
}
