package org.eclipse.dirigible.api.kafka;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.eclipse.dirigible.commons.api.helpers.GsonHelper;
import org.eclipse.dirigible.commons.api.scripting.ScriptingException;
import org.eclipse.dirigible.engine.api.script.ScriptEngineExecutorsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/dirigible-api-facade-kafka-5.5.0.jar:org/eclipse/dirigible/api/kafka/KafkaConsumerRunner.class */
public class KafkaConsumerRunner implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunner.class);
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE = "messaging/wrappers/onMessage";
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR = "messaging/wrappers/onError";
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final Consumer consumer;
    private String name;
    private String handler;
    private int timeout;

    public KafkaConsumerRunner(Consumer consumer, String str, String str2, int i) {
        this.timeout = 1000;
        this.consumer = consumer;
        this.name = str;
        this.handler = str2;
        this.timeout = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                logger.info("Starting a Kafka listener for {} ...", this.name);
                this.consumer.subscribe(Arrays.asList(this.name));
                while (!this.stopped.get()) {
                    Iterator it = this.consumer.poll(Duration.ofMillis(this.timeout)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        logger.trace(MessageFormat.format("Start processing a received record in [{0}] by [{1}] ...", this.name, this.handler));
                        if (this.handler != null) {
                            Map<Object, Object> createMessagingContext = createMessagingContext();
                            createMessagingContext.put("message", escapeCodeString(GsonHelper.GSON.toJson(consumerRecord)));
                            try {
                                ScriptEngineExecutorsManager.executeServiceModule("javascript", DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE, createMessagingContext);
                            } catch (ScriptingException e) {
                                logger.error(e.getMessage(), (Throwable) e);
                                try {
                                    createMessagingContext.put("error", escapeCodeString(e.getMessage()));
                                    ScriptEngineExecutorsManager.executeServiceModule("javascript", DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR, createMessagingContext);
                                } catch (ScriptingException e2) {
                                    logger.error(e2.getMessage(), (Throwable) e2);
                                }
                            }
                        } else {
                            logger.info(String.format("[Kafka Consumer] %s -  offset = %d, key = %s, value = %s%n", this.name, Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                        }
                        logger.trace(MessageFormat.format("Done processing the received record in [{0}] by [{1}]", this.name, this.handler));
                    }
                }
                this.consumer.close();
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        } catch (WakeupException e3) {
            if (!this.stopped.get()) {
                throw e3;
            }
            this.consumer.close();
        }
    }

    public void stop() {
        this.stopped.set(true);
        this.consumer.wakeup();
    }

    private Map<Object, Object> createMessagingContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("handler", this.handler);
        return hashMap;
    }

    private String escapeCodeString(String str) {
        return str.replace("'", "&amp;");
    }
}
