/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.server.common.serialization.RecordSerde;

public final class RecordsIterator<T>
implements Iterator<Batch<T>>,
AutoCloseable {
    private final Records records;
    private final RecordSerde<T> serde;
    private final BufferSupplier bufferSupplier;
    private final int batchSize;
    private final boolean doCrcValidation;
    private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator();
    private Optional<Batch<T>> nextBatch = Optional.empty();
    private Optional<ByteBuffer> allocatedBuffer = Optional.empty();
    private int bytesRead = 0;
    private boolean isClosed = false;

    public RecordsIterator(Records records, RecordSerde<T> serde, BufferSupplier bufferSupplier, int batchSize, boolean doCrcValidation) {
        this.records = records;
        this.serde = serde;
        this.bufferSupplier = bufferSupplier;
        this.batchSize = Math.max(batchSize, 17);
        this.doCrcValidation = doCrcValidation;
    }

    @Override
    public boolean hasNext() {
        this.ensureOpen();
        if (!this.nextBatch.isPresent()) {
            this.nextBatch = this.nextBatch();
        }
        return this.nextBatch.isPresent();
    }

    @Override
    public Batch<T> next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException("Batch iterator doesn't have any more elements");
        }
        Batch<T> batch = this.nextBatch.get();
        this.nextBatch = Optional.empty();
        return batch;
    }

    @Override
    public void close() {
        this.isClosed = true;
        this.allocatedBuffer.ifPresent(arg_0 -> ((BufferSupplier)this.bufferSupplier).release(arg_0));
        this.allocatedBuffer = Optional.empty();
    }

    private void ensureOpen() {
        if (this.isClosed) {
            throw new IllegalStateException("Serde record batch iterator was closed");
        }
    }

    private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer buffer) {
        int start = buffer.position();
        try {
            fileRecords.readInto(buffer, this.bytesRead);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to read records into memory", e);
        }
        this.bytesRead += buffer.limit() - start;
        return MemoryRecords.readableRecords((ByteBuffer)buffer.slice());
    }

    private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
        ByteBuffer buffer;
        if (this.allocatedBuffer.isPresent()) {
            buffer = this.allocatedBuffer.get();
            buffer.compact();
        } else {
            buffer = this.bufferSupplier.get(Math.min(this.batchSize, this.records.sizeInBytes()));
            this.allocatedBuffer = Optional.of(buffer);
        }
        MemoryRecords memoryRecords = this.readFileRecords(fileRecords, buffer);
        if (memoryRecords.firstBatchSize() <= buffer.remaining()) {
            return memoryRecords;
        }
        ByteBuffer newBuffer = this.bufferSupplier.get(memoryRecords.firstBatchSize().intValue());
        this.allocatedBuffer = Optional.of(newBuffer);
        newBuffer.put(buffer);
        this.bufferSupplier.release(buffer);
        return this.readFileRecords(fileRecords, newBuffer);
    }

    private Iterator<MutableRecordBatch> nextBatches() {
        int recordSize = this.records.sizeInBytes();
        if (this.bytesRead < recordSize) {
            MemoryRecords memoryRecords;
            if (this.records instanceof MemoryRecords) {
                this.bytesRead = recordSize;
                memoryRecords = (MemoryRecords)this.records;
            } else if (this.records instanceof FileRecords) {
                memoryRecords = this.createMemoryRecords((FileRecords)this.records);
            } else {
                throw new IllegalStateException(String.format("Unexpected Records type %s", this.records.getClass()));
            }
            return memoryRecords.batchIterator();
        }
        return Collections.emptyIterator();
    }

    private Optional<Batch<T>> nextBatch() {
        if (!this.nextBatches.hasNext()) {
            this.nextBatches = this.nextBatches();
        }
        if (this.nextBatches.hasNext()) {
            MutableRecordBatch nextBatch = this.nextBatches.next();
            this.allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes()));
            if (!(nextBatch instanceof DefaultRecordBatch)) {
                throw new IllegalStateException(String.format("DefaultRecordBatch expected by record type was %s", nextBatch.getClass()));
            }
            return Optional.of(this.readBatch((DefaultRecordBatch)nextBatch));
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Batch<T> readBatch(DefaultRecordBatch batch) {
        Batch<Object> result;
        Integer numRecords;
        if (this.doCrcValidation) {
            batch.ensureValid();
        }
        if ((numRecords = batch.countOrNull()) == null) {
            throw new IllegalStateException("Expected a record count for the records batch");
        }
        InputStream input = batch.recordInputStream(this.bufferSupplier);
        try {
            if (batch.isControlBatch()) {
                ArrayList<ControlRecord> records = new ArrayList<ControlRecord>(numRecords);
                for (int i = 0; i < numRecords; ++i) {
                    ControlRecord record = this.readRecord(input, batch.sizeInBytes(), RecordsIterator::decodeControlRecord);
                    records.add(record);
                }
                result = Batch.control(batch.baseOffset(), batch.partitionLeaderEpoch(), batch.maxTimestamp(), batch.sizeInBytes(), records);
            } else {
                ArrayList<Object> records = new ArrayList<Object>(numRecords);
                for (int i = 0; i < numRecords; ++i) {
                    Object record = this.readRecord(input, batch.sizeInBytes(), this::decodeDataRecord);
                    records.add(record);
                }
                result = Batch.data(batch.baseOffset(), batch.partitionLeaderEpoch(), batch.maxTimestamp(), batch.sizeInBytes(), records);
            }
        }
        finally {
            Utils.closeQuietly((AutoCloseable)input, (String)"BytesStream for input containing records");
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <U> U readRecord(InputStream stream, int totalBatchSize, BiFunction<Optional<ByteBuffer>, Optional<ByteBuffer>, U> decoder) {
        int size;
        try {
            size = ByteUtils.readVarint((InputStream)stream);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Unable to read record size", e);
        }
        if (size <= 0) {
            throw new RuntimeException("Invalid non-positive frame size: " + size);
        }
        if (size > totalBatchSize) {
            throw new RuntimeException("Specified frame size, " + size + ", is larger than the entire size of the batch, which is " + totalBatchSize);
        }
        ByteBuffer buf = this.bufferSupplier.get(size);
        buf.limit(size - 1);
        try {
            int bytesRead = stream.read(buf.array(), 0, size);
            if (bytesRead != size) {
                throw new RuntimeException("Unable to read " + size + " bytes, only read " + bytesRead);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to read record bytes", e);
        }
        try {
            ByteBufferAccessor input = new ByteBufferAccessor(buf);
            input.readByte();
            long timestampDelta = input.readVarlong();
            if (timestampDelta != 0L) {
                throw new IllegalArgumentException("Got timestamp delta of " + timestampDelta + ", but this is invalid because it is not 0 as expected.");
            }
            input.readVarint();
            int keySize = input.readVarint();
            Optional<Object> key = Optional.empty();
            if (keySize >= 0) {
                key = Optional.of(input.readByteBuffer(keySize));
            }
            int valueSize = input.readVarint();
            Optional<Object> value = Optional.empty();
            if (valueSize >= 0) {
                value = Optional.of(input.readByteBuffer(valueSize));
            }
            U record = decoder.apply(key, value);
            byte numHeaders = buf.array()[size - 1];
            if (numHeaders != 0) {
                throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because it is not 0 as expected.");
            }
            U u = record;
            return u;
        }
        finally {
            this.bufferSupplier.release(buf);
        }
    }

    private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
        if (key.isPresent()) {
            throw new IllegalArgumentException("Got key in the record when no key was expected");
        }
        if (!value.isPresent()) {
            throw new IllegalArgumentException("Missing value in the record when a value was expected");
        }
        if (value.get().remaining() == 0) {
            throw new IllegalArgumentException("Got an unexpected empty value in the record");
        }
        ByteBuffer valueBuffer = value.get();
        return (T)this.serde.read((Readable)new ByteBufferAccessor(valueBuffer), valueBuffer.remaining());
    }

    private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
        LeaderChangeMessage message;
        if (!key.isPresent()) {
            throw new IllegalArgumentException("Missing key in the record when a key was expected");
        }
        if (key.get().remaining() == 0) {
            throw new IllegalArgumentException("Got an unexpected empty key in the record");
        }
        if (!value.isPresent()) {
            throw new IllegalArgumentException("Missing value in the record when a value was expected");
        }
        if (value.get().remaining() == 0) {
            throw new IllegalArgumentException("Got an unexpected empty value in the record");
        }
        ControlRecordType type = ControlRecordType.parse((ByteBuffer)key.get());
        switch (type) {
            case LEADER_CHANGE: {
                message = ControlRecordUtils.deserializeLeaderChangeMessage((ByteBuffer)value.get());
                break;
            }
            case SNAPSHOT_HEADER: {
                message = ControlRecordUtils.deserializeSnapshotHeaderRecord((ByteBuffer)value.get());
                break;
            }
            case SNAPSHOT_FOOTER: {
                message = ControlRecordUtils.deserializeSnapshotFooterRecord((ByteBuffer)value.get());
                break;
            }
            case KRAFT_VERSION: {
                message = ControlRecordUtils.deserializeKRaftVersionRecord((ByteBuffer)value.get());
                break;
            }
            case KRAFT_VOTERS: {
                message = ControlRecordUtils.deserializeVotersRecord((ByteBuffer)value.get());
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unknown control record type %s", type));
            }
        }
        return new ControlRecord(type, (ApiMessage)message);
    }
}

