Skip to content

Commit

Permalink
HADOOP-19389: Optimize shell -text command I/O with multi-byte read.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnauroth committed Jan 17, 2025
1 parent d2095fa commit 46b95de
Show file tree
Hide file tree
Showing 2 changed files with 561 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ protected void processPath(PathData item) throws IOException {
}

protected class TextRecordInputStream extends InputStream {
SequenceFile.Reader r;
final SequenceFile.Reader r;
Object key;
Object val;

DataInputBuffer inbuf;
DataOutputBuffer outbuf;
final DataInputBuffer inbuf;
final DataOutputBuffer outbuf;

public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
Expand All @@ -237,30 +237,67 @@ public TextRecordInputStream(FileStatus f) throws IOException {
public int read() throws IOException {
int ret;
if (null == inbuf || -1 == (ret = inbuf.read())) {
key = r.next(key);
if (key == null) {
return -1;
if (!readNextFromSequenceFile()) {
ret = -1;
} else {
val = r.getCurrentValue(val);
ret = inbuf.read();
}
byte[] tmp = key.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
ret = inbuf.read();
}
return ret;
}

@Override
public int read(byte[] dest, int destPos, int destLen) throws IOException {
validateInputStreamReadArguments(dest, destPos, destLen);

if (destLen == 0) {
return 0;
}

int bytesRead = 0;
while (destLen > 0) {
// Attempt to copy buffered data.
int copyLen = inbuf.read(dest, destPos, destLen);
if (-1 == copyLen) {
// There was no buffered data.
if (!readNextFromSequenceFile()) {
// There is also no data remaining in the file.
break;
}
// Reattempt copy now that we have buffered data.
copyLen = inbuf.read(dest, destPos, destLen);
}
bytesRead += copyLen;
destPos += copyLen;
destLen -= copyLen;
}

return bytesRead > 0 ? bytesRead : -1;
}

@Override
public void close() throws IOException {
r.close();
super.close();
}

private boolean readNextFromSequenceFile() throws IOException {
key = r.next(key);
if (key == null) {
return false;
} else {
val = r.getCurrentValue(val);
}
byte[] tmp = key.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
return true;
}
}

/**
Expand All @@ -270,10 +307,11 @@ public void close() throws IOException {
protected static class AvroFileInputStream extends InputStream {
private int pos;
private byte[] buffer;
private ByteArrayOutputStream output;
private FileReader<?> fileReader;
private DatumWriter<Object> writer;
private JsonEncoder encoder;
private final ByteArrayOutputStream output;
private final FileReader<?> fileReader;
private final DatumWriter<Object> writer;
private final JsonEncoder encoder;
private final byte[] finalSeparator;

public AvroFileInputStream(FileStatus status) throws IOException {
pos = 0;
Expand All @@ -286,31 +324,96 @@ public AvroFileInputStream(FileStatus status) throws IOException {
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
encoder = EncoderFactory.get().jsonEncoder(schema, output);
finalSeparator = System.getProperty("line.separator").getBytes(StandardCharsets.UTF_8);
}

/**
* Read a single byte from the stream.
*/
@Override
public int read() throws IOException {
if (buffer == null) {
return -1;
}

if (pos < buffer.length) {
return buffer[pos++];
}

if (!fileReader.hasNext()) {
// Unset buffer to signal EOF on future calls.
buffer = null;
return -1;
}

writer.write(fileReader.next(), encoder);
encoder.flush();

if (!fileReader.hasNext()) {
// Write a new line after the last Avro record.
output.write(System.getProperty("line.separator")
.getBytes(StandardCharsets.UTF_8));
output.flush();
if (buffer.length > 0) {
// Write a new line after the last Avro record.
output.write(finalSeparator);
output.flush();
}
}

swapBuffer();
return read();
}

@Override
public int read(byte[] dest, int destPos, int destLen) throws IOException {
validateInputStreamReadArguments(dest, destPos, destLen);

if (destLen == 0) {
return 0;
}

if (buffer == null) {
return -1;
}

int bytesRead = 0;
while (destLen > 0 && buffer != null) {
if (pos < buffer.length) {
// We have buffered data available, either from the Avro file or the final separator.
int copyLen = Math.min(buffer.length - pos, destLen);
System.arraycopy(buffer, pos, dest, destPos, copyLen);
pos += copyLen;
bytesRead += copyLen;
destPos += copyLen;
destLen -= copyLen;
} else if (buffer == finalSeparator) {
// There is no buffered data, and the last buffer processed was the final separator.
// Unset buffer to signal EOF on future calls.
buffer = null;
} else if (!fileReader.hasNext()) {
if (buffer.length > 0) {
// There is no data remaining in the file. Get ready to write the final separator on
// the next iteration.
buffer = finalSeparator;
pos = 0;
} else {
// We never read data into the buffer. This must be an empty file.
// Immediate EOF, no separator needed.
buffer = null;
return -1;
}
} else {
// Read the next data from the file into the buffer.
writer.write(fileReader.next(), encoder);
encoder.flush();
swapBuffer();
}
}

return bytesRead;
}

private void swapBuffer() {
pos = 0;
buffer = output.toByteArray();
output.reset();
return read();
}

/**
Expand All @@ -323,4 +426,14 @@ public void close() throws IOException {
super.close();
}
}

private static void validateInputStreamReadArguments(byte[] dest, int destPos, int destLen)
throws IOException {
if (dest == null) {
throw new NullPointerException("null destination buffer");
} else if (destPos < 0 || destLen < 0 || destLen > dest.length - destPos) {
throw new IndexOutOfBoundsException(String.format(
"invalid destination buffer range: destPos = %d, destLen = %d", destPos, destLen));
}
}
}
Loading

0 comments on commit 46b95de

Please sign in to comment.