package org.apache.hadoop.hdfs;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.archive.format.warc.WARCConstants;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.2.0.jar:org/apache/hadoop/hdfs/RemoteBlockReader2.class */
public class RemoteBlockReader2 implements BlockReader {
    static final Log LOG;
    private final Peer peer;
    private final DatanodeID datanodeID;
    private final PeerCache peerCache;
    private final ReadableByteChannel in;
    private DataChecksum checksum;
    private long startOffset;
    private final String filename;
    private final int bytesPerChecksum;
    private final int checksumSize;
    private long bytesNeededToFinish;
    private final boolean isLocal;
    private final boolean verifyChecksum;
    static final /* synthetic */ boolean $assertionsDisabled;
    private PacketReceiver packetReceiver = new PacketReceiver(true);
    private ByteBuffer curDataSlice = null;
    private long lastSeqNo = -1;
    private boolean sentStatusCode = false;
    byte[] skipBuf = null;
    ByteBuffer checksumBytes = null;
    int dataLeft = 0;

    @VisibleForTesting
    public Peer getPeer() {
        return this.peer;
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        if (this.curDataSlice == null || (this.curDataSlice.remaining() == 0 && this.bytesNeededToFinish > 0)) {
            readNextPacket();
        }
        if (this.curDataSlice.remaining() == 0) {
            return -1;
        }
        int min = Math.min(this.curDataSlice.remaining(), i2);
        this.curDataSlice.get(bArr, i, min);
        return min;
    }

    @Override // org.apache.hadoop.fs.ByteBufferReadable
    public int read(ByteBuffer byteBuffer) throws IOException {
        if (this.curDataSlice == null || (this.curDataSlice.remaining() == 0 && this.bytesNeededToFinish > 0)) {
            readNextPacket();
        }
        if (this.curDataSlice.remaining() == 0) {
            return -1;
        }
        int min = Math.min(this.curDataSlice.remaining(), byteBuffer.remaining());
        ByteBuffer duplicate = this.curDataSlice.duplicate();
        duplicate.limit(duplicate.position() + min);
        byteBuffer.put(duplicate);
        this.curDataSlice.position(duplicate.position());
        return min;
    }

    private void readNextPacket() throws IOException {
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader header = this.packetReceiver.getHeader();
        this.curDataSlice = this.packetReceiver.getDataSlice();
        if (!$assertionsDisabled && this.curDataSlice.capacity() != header.getDataLen()) {
            throw new AssertionError();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("DFSClient readNextPacket got header " + header);
        }
        if (!header.sanityCheck(this.lastSeqNo)) {
            throw new IOException("BlockReader: error in packet header " + header);
        }
        if (header.getDataLen() > 0) {
            int dataLen = (1 + ((header.getDataLen() - 1) / this.bytesPerChecksum)) * this.checksumSize;
            if (!$assertionsDisabled && this.packetReceiver.getChecksumSlice().capacity() != dataLen) {
                throw new AssertionError("checksum slice capacity=" + this.packetReceiver.getChecksumSlice().capacity() + " checksumsLen=" + dataLen);
            }
            this.lastSeqNo = header.getSeqno();
            if (this.verifyChecksum && this.curDataSlice.remaining() > 0) {
                this.checksum.verifyChunkedSums(this.curDataSlice, this.packetReceiver.getChecksumSlice(), this.filename, header.getOffsetInBlock());
            }
            this.bytesNeededToFinish -= header.getDataLen();
        }
        if (header.getOffsetInBlock() < this.startOffset) {
            this.curDataSlice.position((int) (this.startOffset - header.getOffsetInBlock()));
        }
        if (this.bytesNeededToFinish <= 0) {
            readTrailingEmptyPacket();
            if (this.verifyChecksum) {
                sendReadResult(DataTransferProtos.Status.CHECKSUM_OK);
            } else {
                sendReadResult(DataTransferProtos.Status.SUCCESS);
            }
        }
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public synchronized long skip(long j) throws IOException {
        if (this.skipBuf == null) {
            this.skipBuf = new byte[this.bytesPerChecksum];
        }
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                return j3;
            }
            int read = read(this.skipBuf, 0, (int) Math.min(j - j3, this.skipBuf.length));
            if (read <= 0) {
                return j3;
            }
            j2 = j3 + read;
        }
    }

    private void readTrailingEmptyPacket() throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Reading empty packet at end of read");
        }
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader header = this.packetReceiver.getHeader();
        if (!header.isLastPacketInBlock() || header.getDataLen() != 0) {
            throw new IOException("Expected empty end-of-read packet! Header: " + header);
        }
    }

    protected RemoteBlockReader2(String str, String str2, long j, DataChecksum dataChecksum, boolean z, long j2, long j3, long j4, Peer peer, DatanodeID datanodeID, PeerCache peerCache) {
        this.isLocal = DFSClient.isLocalAddress(NetUtils.createSocketAddr(datanodeID.getXferAddr()));
        this.peer = peer;
        this.datanodeID = datanodeID;
        this.in = peer.getInputStreamChannel();
        this.checksum = dataChecksum;
        this.verifyChecksum = z;
        this.startOffset = Math.max(j2, 0L);
        this.filename = str;
        this.peerCache = peerCache;
        this.bytesNeededToFinish = j4 + (j2 - j3);
        this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
        this.checksumSize = this.checksum.getChecksumSize();
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public synchronized void close() throws IOException {
        this.packetReceiver.close();
        this.startOffset = -1L;
        this.checksum = null;
        if (this.peerCache == null || !this.sentStatusCode) {
            this.peer.close();
        } else {
            this.peerCache.put(this.datanodeID, this.peer);
        }
    }

    void sendReadResult(DataTransferProtos.Status status) {
        if (!$assertionsDisabled && this.sentStatusCode) {
            throw new AssertionError("already sent status code to " + this.peer);
        }
        try {
            writeReadResult(this.peer.getOutputStream(), status);
            this.sentStatusCode = true;
        } catch (IOException e) {
            LOG.info("Could not send read status (" + status + ") to datanode " + this.peer.getRemoteAddressString() + WARCConstants.COLON_SPACE + e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void writeReadResult(OutputStream outputStream, DataTransferProtos.Status status) throws IOException {
        DataTransferProtos.ClientReadStatusProto.newBuilder().setStatus(status).build().writeDelimitedTo(outputStream);
        outputStream.flush();
    }

    public static String getFileName(InetSocketAddress inetSocketAddress, String str, long j) {
        return inetSocketAddress.toString() + ":" + str + ":" + j;
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public int readAll(byte[] bArr, int i, int i2) throws IOException {
        return BlockReaderUtil.readAll(this, bArr, i, i2);
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public void readFully(byte[] bArr, int i, int i2) throws IOException {
        BlockReaderUtil.readFully(this, bArr, i, i2);
    }

    public static BlockReader newBlockReader(String str, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, long j, long j2, boolean z, String str2, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException {
        new Sender(new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()))).readBlock(extendedBlock, token, str2, j, j2, z, cachingStrategy);
        DataTransferProtos.BlockOpResponseProto parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(new DataInputStream(peer.getInputStream())));
        checkSuccess(parseFrom, peer, extendedBlock, str);
        DataTransferProtos.ReadOpChecksumInfoProto readOpChecksumInfo = parseFrom.getReadOpChecksumInfo();
        DataChecksum fromProto = DataTransferProtoUtil.fromProto(readOpChecksumInfo.getChecksum());
        long chunkOffset = readOpChecksumInfo.getChunkOffset();
        if (chunkOffset < 0 || chunkOffset > j || chunkOffset <= j - fromProto.getBytesPerChecksum()) {
            throw new IOException("BlockReader: error in first chunk offset (" + chunkOffset + ") startOffset is " + j + " for file " + str);
        }
        return new RemoteBlockReader2(str, extendedBlock.getBlockPoolId(), extendedBlock.getBlockId(), fromProto, z, j, chunkOffset, j2, peer, datanodeID, peerCache);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void checkSuccess(DataTransferProtos.BlockOpResponseProto blockOpResponseProto, Peer peer, ExtendedBlock extendedBlock, String str) throws IOException {
        if (blockOpResponseProto.getStatus() != DataTransferProtos.Status.SUCCESS) {
            if (blockOpResponseProto.getStatus() != DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                throw new IOException("Got error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + str + ", for pool " + extendedBlock.getBlockPoolId() + " block " + extendedBlock.getBlockId() + "_" + extendedBlock.getGenerationStamp());
            }
            throw new InvalidBlockTokenException("Got access token error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + str + ", for pool " + extendedBlock.getBlockPoolId() + " block " + extendedBlock.getBlockId() + "_" + extendedBlock.getGenerationStamp());
        }
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public int available() throws IOException {
        return 131072;
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public boolean isLocal() {
        return this.isLocal;
    }

    @Override // org.apache.hadoop.hdfs.BlockReader
    public boolean isShortCircuit() {
        return false;
    }

    static {
        $assertionsDisabled = !RemoteBlockReader2.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(RemoteBlockReader2.class);
    }
}
