/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.coordinator;

import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.coordinator.TaskManager;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class NodeManager {
    private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
    private static final long HEARTBEAT_DELAY_MS = 1000L;
    private final Node node;
    private final TaskManager taskManager;
    private final AgentClient client;
    private final Map<Long, ManagedWorker> workers;
    private final ScheduledExecutorService executor;
    private final NodeHeartbeat heartbeat;
    private ScheduledFuture<?> heartbeatFuture;

    NodeManager(Node node, TaskManager taskManager) {
        this.node = node;
        this.taskManager = taskManager;
        this.client = new AgentClient.Builder().maxTries(1).target(node.hostname(), Node.Util.getTrogdorAgentPort(node)).build();
        this.workers = new HashMap<Long, ManagedWorker>();
        this.executor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory((String)("NodeManager(" + node.name() + ")"), (boolean)false));
        this.heartbeat = new NodeHeartbeat();
        this.rescheduleNextHeartbeat(1000L);
    }

    void rescheduleNextHeartbeat(long initialDelayMs) {
        if (this.heartbeatFuture != null) {
            this.heartbeatFuture.cancel(false);
        }
        this.heartbeatFuture = this.executor.scheduleAtFixedRate(this.heartbeat, initialDelayMs, 1000L, TimeUnit.MILLISECONDS);
    }

    public void createWorker(long workerId, String taskId, TaskSpec spec) {
        this.executor.submit(new CreateWorker(workerId, taskId, spec));
    }

    public void stopWorker(long workerId) {
        this.executor.submit(new StopWorker(workerId));
    }

    public void destroyWorker(long workerId) {
        this.executor.submit(new DestroyWorker(workerId));
    }

    public void beginShutdown(boolean stopNode) {
        this.executor.shutdownNow();
        if (stopNode) {
            try {
                this.client.invokeShutdown();
            }
            catch (Exception e) {
                log.error("{}: Failed to send shutdown request", (Object)this.node.name(), (Object)e);
            }
        }
    }

    public void waitForShutdown() throws InterruptedException {
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
    }

    class NodeHeartbeat
    implements Runnable {
        NodeHeartbeat() {
        }

        @Override
        public void run() {
            NodeManager.this.rescheduleNextHeartbeat(1000L);
            try {
                AgentStatusResponse agentStatus;
                try {
                    agentStatus = NodeManager.this.client.status();
                }
                catch (ConnectException e) {
                    log.error("{}: failed to get agent status: ConnectException {}", (Object)NodeManager.this.node.name(), (Object)e.getMessage());
                    return;
                }
                catch (Exception e) {
                    log.error("{}: failed to get agent status", (Object)NodeManager.this.node.name(), (Object)e);
                    return;
                }
                if (log.isTraceEnabled()) {
                    log.trace("{}: got heartbeat status {}", (Object)NodeManager.this.node.name(), (Object)agentStatus);
                }
                this.handleMissingWorkers(agentStatus);
                this.handlePresentWorkers(agentStatus);
            }
            catch (Throwable e) {
                log.error("{}: Unhandled exception in NodeHeartbeatRunnable", (Object)NodeManager.this.node.name(), (Object)e);
            }
        }

        private void handleMissingWorkers(AgentStatusResponse agentStatus) {
            for (Map.Entry entry : NodeManager.this.workers.entrySet()) {
                ManagedWorker worker;
                Long workerId = (Long)entry.getKey();
                if (agentStatus.workers().containsKey(workerId) || !(worker = (ManagedWorker)entry.getValue()).shouldRun) continue;
                worker.tryCreate();
            }
        }

        private void handlePresentWorkers(AgentStatusResponse agentStatus) {
            for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet()) {
                long workerId = entry.getKey();
                WorkerState state = entry.getValue();
                ManagedWorker worker = (ManagedWorker)NodeManager.this.workers.get(workerId);
                if (worker == null) {
                    log.warn("{}: scheduling unknown worker with ID {} for stopping.", (Object)NodeManager.this.node.name(), (Object)workerId);
                    NodeManager.this.workers.put(workerId, new ManagedWorker(workerId, state.taskId(), state.spec(), false, state));
                    continue;
                }
                if ((state instanceof WorkerStarting || state instanceof WorkerRunning) && !worker.shouldRun) {
                    worker.tryStop();
                }
                if (worker.state.equals(state)) {
                    log.debug("{}: worker state is still {}", (Object)NodeManager.this.node.name(), (Object)worker.state);
                    continue;
                }
                log.info("{}: worker state changed from {} to {}", new Object[]{NodeManager.this.node.name(), worker.state, state});
                if (state instanceof WorkerDone || state instanceof WorkerStopping) {
                    worker.shouldRun = false;
                }
                worker.state = state;
                NodeManager.this.taskManager.updateWorkerState(NodeManager.this.node.name(), worker.workerId, state);
            }
        }
    }

    class CreateWorker
    implements Callable<Void> {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;

        CreateWorker(long workerId, String taskId, TaskSpec spec) {
            this.workerId = workerId;
            this.taskId = taskId;
            this.spec = spec;
        }

        @Override
        public Void call() throws Exception {
            ManagedWorker worker = (ManagedWorker)NodeManager.this.workers.get(this.workerId);
            if (worker != null) {
                log.error("{}: there is already a worker {} with ID {}.", new Object[]{NodeManager.this.node.name(), worker, this.workerId});
                return null;
            }
            worker = new ManagedWorker(this.workerId, this.taskId, this.spec, true, new WorkerReceiving(this.taskId, this.spec));
            log.info("{}: scheduling worker {} to start.", (Object)NodeManager.this.node.name(), (Object)worker);
            NodeManager.this.workers.put(this.workerId, worker);
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    class StopWorker
    implements Callable<Void> {
        private final long workerId;

        StopWorker(long workerId) {
            this.workerId = workerId;
        }

        @Override
        public Void call() throws Exception {
            ManagedWorker worker = (ManagedWorker)NodeManager.this.workers.get(this.workerId);
            if (worker == null) {
                log.error("{}: unable to locate worker to stop with ID {}.", (Object)NodeManager.this.node.name(), (Object)this.workerId);
                return null;
            }
            if (!worker.shouldRun) {
                log.error("{}: Worker {} is already scheduled to stop.", (Object)NodeManager.this.node.name(), (Object)worker);
                return null;
            }
            log.info("{}: scheduling worker {} to stop.", (Object)NodeManager.this.node.name(), (Object)worker);
            worker.shouldRun = false;
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    class DestroyWorker
    implements Callable<Void> {
        private final long workerId;

        DestroyWorker(long workerId) {
            this.workerId = workerId;
        }

        @Override
        public Void call() throws Exception {
            ManagedWorker worker = (ManagedWorker)NodeManager.this.workers.remove(this.workerId);
            if (worker == null) {
                log.error("{}: unable to locate worker to destroy with ID {}.", (Object)NodeManager.this.node.name(), (Object)this.workerId);
                return null;
            }
            NodeManager.this.rescheduleNextHeartbeat(0L);
            return null;
        }
    }

    class ManagedWorker {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;
        private boolean shouldRun;
        private WorkerState state;

        ManagedWorker(long workerId, String taskId, TaskSpec spec, boolean shouldRun, WorkerState state) {
            this.workerId = workerId;
            this.taskId = taskId;
            this.spec = spec;
            this.shouldRun = shouldRun;
            this.state = state;
        }

        void tryCreate() {
            try {
                NodeManager.this.client.createWorker(new CreateWorkerRequest(this.workerId, this.taskId, this.spec));
            }
            catch (Throwable e) {
                log.error("{}: error creating worker {}.", new Object[]{NodeManager.this.node.name(), this, e});
            }
        }

        void tryStop() {
            try {
                NodeManager.this.client.stopWorker(new StopWorkerRequest(this.workerId));
            }
            catch (Throwable e) {
                log.error("{}: error stopping worker {}.", new Object[]{NodeManager.this.node.name(), this, e});
            }
        }

        public String toString() {
            return String.format("%s_%d", this.taskId, this.workerId);
        }
    }
}

