/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.server.util.TopicFilter;
import org.apache.kafka.tools.ToolsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaVerificationTool {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicaVerificationTool.class);
    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

    public static void main(String[] args) {
        try {
            LOG.warn("This tool is deprecated and may be removed in a future major release.");
            ReplicaVerificationToolOptions options = new ReplicaVerificationToolOptions(args);
            LOG.info("Getting topic metadata...");
            String brokerList = options.brokerHostsAndPorts();
            try (Admin adminClient = ReplicaVerificationTool.createAdminClient(brokerList);){
                Collection<TopicDescription> topicsMetadata = ReplicaVerificationTool.listTopicsMetadata(adminClient);
                Map<Integer, Node> brokerInfo = ReplicaVerificationTool.brokerDetails(adminClient);
                Map<String, Uuid> topicIds = topicsMetadata.stream().collect(Collectors.toMap(TopicDescription::name, TopicDescription::topicId));
                List filteredTopicMetadata = topicsMetadata.stream().filter(topicMetadata -> options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)).collect(Collectors.toList());
                if (filteredTopicMetadata.isEmpty()) {
                    LOG.error("No topics found. {} if specified, is either filtering out all topics or there is no topic.", (Object)options.topicsIncludeOpt);
                    Exit.exit((int)1);
                }
                List topicPartitionReplicas = filteredTopicMetadata.stream().flatMap(topicMetadata -> topicMetadata.partitions().stream().flatMap(partitionMetadata -> partitionMetadata.replicas().stream().map(node -> new TopicPartitionReplica(topicMetadata.name(), partitionMetadata.partition(), node.id())))).collect(Collectors.toList());
                LOG.debug("Selected topic partitions: {}", topicPartitionReplicas);
                Map brokerToTopicPartitions = topicPartitionReplicas.stream().collect(Collectors.groupingBy(TopicPartitionReplica::brokerId, Collectors.mapping(replica -> new TopicPartition(replica.topic(), replica.partition()), Collectors.toList())));
                LOG.debug("Topic partitions per broker: {}", brokerToTopicPartitions);
                Map<TopicPartition, Integer> expectedReplicasPerTopicPartition = topicPartitionReplicas.stream().collect(Collectors.groupingBy(replica -> new TopicPartition(replica.topic(), replica.partition()), Collectors.collectingAndThen(Collectors.toList(), List::size)));
                LOG.debug("Expected replicas per topic partition: {}", expectedReplicasPerTopicPartition);
                List<TopicPartition> topicPartitions = filteredTopicMetadata.stream().flatMap(topicMetadata -> topicMetadata.partitions().stream().map(partitionMetadata -> new TopicPartition(topicMetadata.name(), partitionMetadata.partition()))).collect(Collectors.toList());
                Properties consumerProps = ReplicaVerificationTool.consumerConfig(brokerList);
                ReplicaBuffer replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition, ReplicaVerificationTool.initialOffsets(topicPartitions, consumerProps, options.initialOffsetTime()), brokerToTopicPartitions.size(), options.reportInterval());
                int verificationBrokerId = brokerToTopicPartitions.entrySet().iterator().next().getKey();
                AtomicInteger counter = new AtomicInteger(0);
                List<ReplicaFetcher> fetcherThreads = brokerToTopicPartitions.entrySet().stream().map(entry -> {
                    int brokerId = (Integer)entry.getKey();
                    Iterable partitions = (Iterable)entry.getValue();
                    return new ReplicaFetcher("ReplicaFetcher-" + brokerId, (Node)brokerInfo.get(brokerId), partitions, topicIds, replicaBuffer, options.fetchSize(), options.maxWaitMs(), 1, brokerId == verificationBrokerId, consumerProps, counter.incrementAndGet());
                }).collect(Collectors.toList());
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    LOG.info("Stopping all fetchers");
                    fetcherThreads.forEach(replicaFetcher -> {
                        try {
                            replicaFetcher.shutdown();
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    });
                }, "ReplicaVerificationToolShutdownHook"));
                fetcherThreads.forEach(Thread::start);
                System.out.printf("%s: verification process is started%n", DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())));
            }
        }
        catch (Throwable e) {
            System.err.println(e.getMessage());
            System.err.println(Utils.stackTrace((Throwable)e));
            Exit.exit((int)1);
        }
    }

    private static Map<TopicPartition, Long> initialOffsets(List<TopicPartition> topicPartitions, Properties consumerConfig, long initialOffsetTime) {
        try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig);){
            if (-1L == initialOffsetTime) {
                Map<TopicPartition, Long> map = consumer.endOffsets(topicPartitions).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                return map;
            }
            if (-2L == initialOffsetTime) {
                Map<TopicPartition, Long> map = consumer.beginningOffsets(topicPartitions).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                return map;
            }
            Map timestampsToSearch = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> initialOffsetTime));
            Map<TopicPartition, Long> map = consumer.offsetsForTimes(timestampsToSearch).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndTimestamp)e.getValue()).offset()));
            return map;
        }
    }

    private static Properties consumerConfig(String brokerUrl) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerUrl);
        properties.put("group.id", "ReplicaVerification");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        return properties;
    }

    private static Map<Integer, Node> brokerDetails(Admin adminClient) throws ExecutionException, InterruptedException {
        return ((Collection)adminClient.describeCluster().nodes().get()).stream().collect(Collectors.toMap(Node::id, Function.identity()));
    }

    private static Collection<TopicDescription> listTopicsMetadata(Admin adminClient) throws ExecutionException, InterruptedException {
        Set topics = (Set)adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
        return ((Map)adminClient.describeTopics((Collection)topics).allTopicNames().get()).values();
    }

    private static Admin createAdminClient(String brokerList) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        return Admin.create((Properties)props);
    }

    private static class ReplicaVerificationToolOptions
    extends CommandDefaultOptions {
        private final OptionSpec<String> brokerListOpt;
        private final OptionSpec<Integer> fetchSizeOpt;
        private final OptionSpec<Integer> maxWaitMsOpt;
        private final OptionSpec<String> topicWhiteListOpt;
        private final OptionSpec<String> topicsIncludeOpt;
        private final OptionSpec<Long> initialOffsetTimeOpt;
        private final OptionSpec<Long> reportIntervalOpt;

        ReplicaVerificationToolOptions(String[] args) {
            super(args);
            this.brokerListOpt = this.parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.").withRequiredArg().describedAs("hostname:port,...,hostname:port").ofType(String.class);
            this.fetchSizeOpt = this.parser.accepts("fetch-size", "The fetch size of each request.").withRequiredArg().describedAs("bytes").ofType(Integer.class).defaultsTo((Object)0x100000, (Object[])new Integer[0]);
            this.maxWaitMsOpt = this.parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)1000, (Object[])new Integer[0]);
            this.topicWhiteListOpt = this.parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; ignored if --topics-include specified. List of topics to verify replica consistency.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class).defaultsTo((Object)".*", (Object[])new String[0]);
            this.topicsIncludeOpt = this.parser.accepts("topics-include", "List of topics to verify replica consistency.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class).defaultsTo((Object)".*", (Object[])new String[0]);
            this.initialOffsetTimeOpt = this.parser.accepts("time", "Timestamp for getting the initial offsets.").withRequiredArg().describedAs("timestamp/-1(latest)/-2(earliest)").ofType(Long.class).defaultsTo((Object)-1L, (Object[])new Long[0]);
            this.reportIntervalOpt = this.parser.accepts("report-interval-ms", "The reporting interval.").withRequiredArg().describedAs("ms").ofType(Long.class).defaultsTo((Object)30000L, (Object[])new Long[0]);
            this.options = this.parser.parse(args);
            if (args.length == 0 || this.options.has((OptionSpec)this.helpOpt)) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)"Validate that all replicas for a set of topics have the same data.");
            }
            if (this.options.has((OptionSpec)this.versionOpt)) {
                CommandLineUtils.printVersionAndExit();
            }
            CommandLineUtils.checkRequiredArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.brokerListOpt});
            CommandLineUtils.checkInvalidArgs((OptionParser)this.parser, (OptionSet)this.options, this.topicsIncludeOpt, (OptionSpec[])new OptionSpec[]{this.topicWhiteListOpt});
        }

        String brokerHostsAndPorts() {
            String brokerList = (String)this.options.valueOf(this.brokerListOpt);
            try {
                ToolsUtils.validateBootstrapServer(brokerList);
            }
            catch (IllegalArgumentException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)e.getMessage());
            }
            return brokerList;
        }

        TopicFilter.IncludeList topicsIncludeFilter() {
            String regex = (String)this.options.valueOf(this.options.has(this.topicsIncludeOpt) ? this.topicsIncludeOpt : this.topicWhiteListOpt);
            try {
                Pattern.compile(regex);
            }
            catch (PatternSyntaxException e) {
                throw new RuntimeException(String.format("%s is an invalid regex", regex));
            }
            return new TopicFilter.IncludeList(regex);
        }

        int fetchSize() {
            return (Integer)this.options.valueOf(this.fetchSizeOpt);
        }

        int maxWaitMs() {
            return (Integer)this.options.valueOf(this.maxWaitMsOpt);
        }

        long initialOffsetTime() {
            return (Long)this.options.valueOf(this.initialOffsetTimeOpt);
        }

        long reportInterval() {
            return (Long)this.options.valueOf(this.reportIntervalOpt);
        }
    }

    protected static class ReplicaBuffer {
        private final Map<TopicPartition, Integer> expectedReplicasPerTopicPartition;
        private final int expectedNumFetchers;
        private final long reportInterval;
        private final Map<TopicPartition, Long> fetchOffsetMap;
        private final Map<TopicPartition, Map<Integer, FetchResponseData.PartitionData>> recordsCache;
        private final AtomicReference<CountDownLatch> fetcherBarrier;
        private final AtomicReference<CountDownLatch> verificationBarrier;
        private volatile long lastReportTime;
        private long maxLag;
        private long offsetWithMaxLag;
        private TopicPartition maxLagTopicAndPartition;

        ReplicaBuffer(Map<TopicPartition, Integer> expectedReplicasPerTopicPartition, Map<TopicPartition, Long> initialOffsets, int expectedNumFetchers, long reportInterval) {
            this.expectedReplicasPerTopicPartition = expectedReplicasPerTopicPartition;
            this.expectedNumFetchers = expectedNumFetchers;
            this.reportInterval = reportInterval;
            this.fetchOffsetMap = new HashMap<TopicPartition, Long>();
            this.recordsCache = new HashMap<TopicPartition, Map<Integer, FetchResponseData.PartitionData>>();
            this.fetcherBarrier = new AtomicReference<CountDownLatch>(new CountDownLatch(expectedNumFetchers));
            this.verificationBarrier = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
            this.lastReportTime = Time.SYSTEM.milliseconds();
            this.maxLag = -1L;
            this.offsetWithMaxLag = -1L;
            for (TopicPartition topicPartition : expectedReplicasPerTopicPartition.keySet()) {
                this.recordsCache.put(topicPartition, new HashMap());
            }
            for (Map.Entry entry : initialOffsets.entrySet()) {
                TopicPartition tp = (TopicPartition)entry.getKey();
                Long offset = (Long)entry.getValue();
                this.fetchOffsetMap.put(tp, offset);
            }
        }

        void createNewFetcherBarrier() {
            this.fetcherBarrier.set(new CountDownLatch(this.expectedNumFetchers));
        }

        CountDownLatch getFetcherBarrier() {
            return this.fetcherBarrier.get();
        }

        void createNewVerificationBarrier() {
            this.verificationBarrier.set(new CountDownLatch(1));
        }

        CountDownLatch getVerificationBarrier() {
            return this.verificationBarrier.get();
        }

        void addFetchedData(TopicPartition topicPartition, int replicaId, FetchResponseData.PartitionData partitionData) {
            this.recordsCache.get(topicPartition).put(replicaId, partitionData);
        }

        long getOffset(TopicPartition topicPartition) {
            return this.fetchOffsetMap.get(topicPartition);
        }

        void verifyCheckSum(Consumer<String> println) {
            LOG.debug("Begin verification");
            this.maxLag = -1L;
            for (Map.Entry<TopicPartition, Map<Integer, FetchResponseData.PartitionData>> cacheEntry : this.recordsCache.entrySet()) {
                TopicPartition topicPartition = cacheEntry.getKey();
                Map<Integer, FetchResponseData.PartitionData> fetchResponsePerReplica = cacheEntry.getValue();
                LOG.debug("Verifying {}", (Object)topicPartition);
                assert (fetchResponsePerReplica.size() == this.expectedReplicasPerTopicPartition.get(topicPartition).intValue()) : "fetched " + fetchResponsePerReplica.size() + " replicas for " + topicPartition + ", but expected " + this.expectedReplicasPerTopicPartition.get(topicPartition) + " replicas";
                HashMap recordBatchIteratorMap = new HashMap();
                for (Map.Entry<Integer, FetchResponseData.PartitionData> fetchResEntry : fetchResponsePerReplica.entrySet()) {
                    int replicaId = fetchResEntry.getKey();
                    FetchResponseData.PartitionData fetchResponse = fetchResEntry.getValue();
                    Iterator recordIterator = FetchResponse.recordsOrFail((FetchResponseData.PartitionData)fetchResponse).batches().iterator();
                    recordBatchIteratorMap.put(replicaId, recordIterator);
                }
                long maxHw = fetchResponsePerReplica.values().stream().mapToLong(FetchResponseData.PartitionData::highWatermark).max().orElse(-1L);
                boolean isMessageInAllReplicas = true;
                while (isMessageInAllReplicas) {
                    Optional<MessageInfo> messageInfoFromFirstReplicaOpt = Optional.empty();
                    for (Map.Entry batchEntry : recordBatchIteratorMap.entrySet()) {
                        int replicaId = (Integer)batchEntry.getKey();
                        Iterator recordBatchIterator = (Iterator)batchEntry.getValue();
                        try {
                            if (recordBatchIterator.hasNext()) {
                                RecordBatch batch = (RecordBatch)recordBatchIterator.next();
                                if (batch.lastOffset() >= fetchResponsePerReplica.get(replicaId).highWatermark()) {
                                    isMessageInAllReplicas = false;
                                    continue;
                                }
                                if (!messageInfoFromFirstReplicaOpt.isPresent()) {
                                    messageInfoFromFirstReplicaOpt = Optional.of(new MessageInfo(replicaId, batch.lastOffset(), batch.nextOffset(), batch.checksum()));
                                    continue;
                                }
                                MessageInfo messageInfoFromFirstReplica = (MessageInfo)messageInfoFromFirstReplicaOpt.get();
                                if (messageInfoFromFirstReplica.offset != batch.lastOffset()) {
                                    println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) + ": partition " + topicPartition + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + messageInfoFromFirstReplica.offset + " doesn't match replica " + replicaId + "'s offset " + batch.lastOffset());
                                    Exit.exit((int)1);
                                }
                                if (messageInfoFromFirstReplica.checksum == batch.checksum()) continue;
                                println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) + ": partition " + topicPartition + " has unmatched checksum at offset " + batch.lastOffset() + "; replica " + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum + "; replica " + replicaId + "'s checksum " + batch.checksum());
                                continue;
                            }
                            isMessageInAllReplicas = false;
                        }
                        catch (Throwable t) {
                            throw new RuntimeException("Error in processing replica " + replicaId + " in partition " + topicPartition + " at offset " + this.fetchOffsetMap.get(topicPartition), t);
                        }
                    }
                    if (!isMessageInAllReplicas) continue;
                    long nextOffset = messageInfoFromFirstReplicaOpt.map(messageInfo -> messageInfo.nextOffset).orElse(-1L);
                    this.fetchOffsetMap.put(topicPartition, nextOffset);
                    LOG.debug("{} replicas match at offset {} for {}", new Object[]{this.expectedReplicasPerTopicPartition.get(topicPartition), nextOffset, topicPartition});
                }
                if (maxHw - this.fetchOffsetMap.get(topicPartition) > this.maxLag) {
                    this.offsetWithMaxLag = this.fetchOffsetMap.get(topicPartition);
                    this.maxLag = maxHw - this.offsetWithMaxLag;
                    this.maxLagTopicAndPartition = topicPartition;
                }
                fetchResponsePerReplica.clear();
            }
            long currentTimeMs = Time.SYSTEM.milliseconds();
            if (currentTimeMs - this.lastReportTime > this.reportInterval) {
                println.accept(DATE_FORMAT.format(new Date(currentTimeMs)) + ": max lag is " + this.maxLag + " for partition " + this.maxLagTopicAndPartition + " at offset " + this.offsetWithMaxLag + " among " + this.recordsCache.size() + " partitions");
                this.lastReportTime = currentTimeMs;
            }
        }
    }

    private static class ReplicaFetcher
    extends ShutdownableThread {
        private final Node sourceBroker;
        private final Iterable<TopicPartition> topicPartitions;
        private final Map<String, Uuid> topicIds;
        private final ReplicaBuffer replicaBuffer;
        private final int fetchSize;
        private final int maxWait;
        private final int minBytes;
        private final boolean doVerification;
        private final ReplicaFetcherBlockingSend fetchEndpoint;
        private final Map<Uuid, String> topicNames;

        public ReplicaFetcher(String name, Node sourceBroker, Iterable<TopicPartition> topicPartitions, Map<String, Uuid> topicIds, ReplicaBuffer replicaBuffer, int fetchSize, int maxWait, int minBytes, boolean doVerification, Properties consumerConfig, int fetcherId) {
            super(name);
            this.sourceBroker = sourceBroker;
            this.topicPartitions = topicPartitions;
            this.topicIds = topicIds;
            this.replicaBuffer = replicaBuffer;
            this.fetchSize = fetchSize;
            this.maxWait = maxWait;
            this.minBytes = minBytes;
            this.doVerification = doVerification;
            this.fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId, "broker--2-fetcher-" + fetcherId);
            this.topicNames = topicIds.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
        }

        public void doWork() {
            FetchResponse fetchResponse;
            CountDownLatch verificationBarrier;
            CountDownLatch fetcherBarrier;
            block11: {
                fetcherBarrier = this.replicaBuffer.getFetcherBarrier();
                verificationBarrier = this.replicaBuffer.getVerificationBarrier();
                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> requestMap = new LinkedHashMap<TopicPartition, FetchRequest.PartitionData>();
                for (TopicPartition topicPartition : this.topicPartitions) {
                    requestMap.put(topicPartition, new FetchRequest.PartitionData(this.topicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID), this.replicaBuffer.getOffset(topicPartition), 0L, this.fetchSize, Optional.empty()));
                }
                FetchRequest.Builder fetchRequestBuilder = FetchRequest.Builder.forReplica((short)ApiKeys.FETCH.latestVersion(), (int)-2, (long)-1L, (int)this.maxWait, (int)this.minBytes, requestMap);
                LOG.debug("Issuing fetch request");
                fetchResponse = null;
                try {
                    ClientResponse clientResponse = this.fetchEndpoint.sendRequest((AbstractRequest.Builder<? extends AbstractRequest>)fetchRequestBuilder);
                    fetchResponse = (FetchResponse)clientResponse.responseBody();
                }
                catch (Throwable t) {
                    if (this.isRunning()) break block11;
                    throw new RuntimeException(t);
                }
            }
            if (fetchResponse != null) {
                fetchResponse.responseData(this.topicNames, ApiKeys.FETCH.latestVersion()).forEach((tp, partitionData) -> this.replicaBuffer.addFetchedData((TopicPartition)tp, this.sourceBroker.id(), (FetchResponseData.PartitionData)partitionData));
            } else {
                for (TopicPartition topicAndPartition : this.topicPartitions) {
                    this.replicaBuffer.addFetchedData(topicAndPartition, this.sourceBroker.id(), FetchResponse.partitionResponse((int)topicAndPartition.partition(), (Errors)Errors.NONE));
                }
            }
            fetcherBarrier.countDown();
            LOG.debug("Done fetching");
            try {
                fetcherBarrier.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
            LOG.debug("Ready for verification");
            if (this.doVerification) {
                LOG.debug("Do verification");
                this.replicaBuffer.verifyCheckSum(System.out::println);
                this.replicaBuffer.createNewFetcherBarrier();
                this.replicaBuffer.createNewVerificationBarrier();
                LOG.debug("Created new barrier");
                verificationBarrier.countDown();
            }
            try {
                verificationBarrier.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
            LOG.debug("Done verification");
        }
    }

    private static class ReplicaFetcherBlockingSend {
        private final Node sourceNode;
        private final Time time;
        private final int socketTimeout;
        private final NetworkClient networkClient;

        ReplicaFetcherBlockingSend(final Node sourceNode, ConsumerConfig consumerConfig, Metrics metrics, Time time, final int fetcherId, String clientId) {
            this.sourceNode = sourceNode;
            this.time = time;
            this.socketTimeout = consumerConfig.getInt("request.timeout.ms");
            LogContext logContext = new LogContext();
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)consumerConfig, (Time)time, (LogContext)logContext);
            Selector selector = new Selector(-1, consumerConfig.getLong("connections.max.idle.ms").longValue(), metrics, time, "replica-fetcher", (Map)new HashMap<String, String>(){
                {
                    this.put("broker-id", sourceNode.idString());
                    this.put("fetcher-id", String.valueOf(fetcherId));
                }
            }, false, channelBuilder, logContext);
            this.networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater(), clientId, 1, 0L, 0L, -1, consumerConfig.getInt("receive.buffer.bytes").intValue(), consumerConfig.getInt("request.timeout.ms").intValue(), consumerConfig.getLong("socket.connection.setup.timeout.ms").longValue(), consumerConfig.getLong("socket.connection.setup.timeout.max.ms").longValue(), time, false, new ApiVersions(), logContext, MetadataRecoveryStrategy.forName((String)consumerConfig.getString("metadata.recovery.strategy")));
        }

        ClientResponse sendRequest(AbstractRequest.Builder<? extends AbstractRequest> requestBuilder) {
            try {
                if (!NetworkClientUtils.awaitReady((KafkaClient)this.networkClient, (Node)this.sourceNode, (Time)this.time, (long)this.socketTimeout)) {
                    throw new SocketTimeoutException("Failed to connect within " + this.socketTimeout + " ms");
                }
                ClientRequest clientRequest = this.networkClient.newClientRequest(this.sourceNode.idString(), requestBuilder, this.time.milliseconds(), true);
                return NetworkClientUtils.sendAndReceive((KafkaClient)this.networkClient, (ClientRequest)clientRequest, (Time)this.time);
            }
            catch (Throwable e) {
                this.networkClient.close(this.sourceNode.idString());
                throw new RuntimeException(e);
            }
        }
    }

    private static class MessageInfo {
        final int replicaId;
        final long offset;
        final long nextOffset;
        final long checksum;

        MessageInfo(int replicaId, long offset, long nextOffset, long checksum) {
            this.replicaId = replicaId;
            this.offset = offset;
            this.nextOffset = nextOffset;
            this.checksum = checksum;
        }
    }
}

