Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task AB#1355841: [LevelDB] - Add ZlibRaw Compression Support #12

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(ZLIB::ZLIB deflate "" HAVE_ZLIB)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)

include(CheckCXXSymbolExists)
Expand Down Expand Up @@ -277,6 +278,9 @@ endif(HAVE_SNAPPY)
if(HAVE_ZSTD)
target_link_libraries(leveldb zstd)
endif(HAVE_ZSTD)
if(HAVE_ZLIB)
target_link_libraries(leveldb ZLIB::ZLIB)
endif(HAVE_ZLIB)
if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)
Expand Down
2 changes: 1 addition & 1 deletion include/leveldb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ LEVELDB_EXPORT void leveldb_options_set_max_file_size(leveldb_options_t*,
LEVELDB_EXPORT void leveldb_options_set_disable_seek_autocompaction(leveldb_options_t*,
uint8_t);

enum { leveldb_no_compression = 0, leveldb_snappy_compression = 1 };
enum { leveldb_no_compression = 0, leveldb_snappy_compression = 1, leveldb_zlib_raw_compression = 4};
LEVELDB_EXPORT void leveldb_options_set_compression(leveldb_options_t*, int);

/* Comparator */
Expand Down
5 changes: 5 additions & 0 deletions include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ enum CompressionType {
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZstdCompression = 0x2,
kZlibRawCompression = 0x4,
};

// Options to control the behavior of a database (passed to DB::Open)
Expand Down Expand Up @@ -135,6 +136,10 @@ struct LEVELDB_EXPORT Options {
// Currently only the range [-5,22] is supported. Default is 1.
int zstd_compression_level = 1;

// Compression level for zlib.
// Currently only range [0,9] and -1. Default is -1 which is equivalent to 6.
int zlib_compression_level = -1;

// EXPERIMENTAL: If true, append to existing MANIFEST and log files
// when a database is opened. This can significantly speed up open.
//
Expand Down
5 changes: 5 additions & 0 deletions port/port_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@
#cmakedefine01 HAVE_ZSTD
#endif // !defined(HAVE_ZSTD)

// Define to 1 if you have Zlib.
#if !defined(HAVE_ZLIB)
#cmakedefine01 HAVE_ZLIB
#endif // !defined(HAVE_ZLIB)

#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_
18 changes: 18 additions & 0 deletions port/port_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@ bool Zstd_GetUncompressedLength(const char* input, size_t length,
// Zstd_GetUncompressedLength.
bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output);

// Store the zlib compression of "input[0,input_length-1]" in *output.
// Returns false if zlib is not supported by this port.
// If raw is set this will perform raw compression with no header,
// trailer, and will not compute a check value.
bool Zlib_Compress(int level, const char* input, size_t input_length,
std::string* output, uint8_t raw = 0);

// Attempt to zlib uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid zlib
// compressed data.
// If raw is set the process will not look for a header, trailer,
// and check value.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
// where "n" is the length of uncompressed input.
bool Zlib_Uncompress(const char* input_data, size_t input_length,
std::string* output, uint8_t raw = 0);

// ------------------ Miscellaneous -------------------

// If heap profiling is not supported, returns false.
Expand Down
110 changes: 110 additions & 0 deletions port/port_stdcxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#define ZSTD_STATIC_LINKING_ONLY // For ZSTD_compressionParameters.
#include <zstd.h>
#endif // HAVE_ZSTD
#if HAVE_ZLIB
#include <zlib.h>
#endif // HAVE_ZLIB

#include <cassert>
#include <condition_variable> // NOLINT
Expand Down Expand Up @@ -200,6 +203,113 @@ inline bool Zstd_Uncompress(const char* input, size_t length, char* output) {
#endif // HAVE_ZSTD
}

inline bool Zlib_Compress(int level, const char* input, size_t length,
std::string* output, uint8_t raw = 0) {
#if HAVE_ZLIB
const size_t BUFSIZE = 128 * 1024;
unsigned char temp_buffer[BUFSIZE];
// reserve enough memory to not reallocate on the fly
output->reserve(output->size() + compressBound(length));
z_stream strm;
strm.zalloc = 0;
strm.zfree = 0;
strm.next_in = (unsigned char*)(input);
strm.avail_in = (uint32_t)length;
strm.next_out = temp_buffer;
strm.avail_out = BUFSIZE;
auto res = deflateInit2(&strm, level, Z_DEFLATED, raw ? -15 : 15, 8,
Z_DEFAULT_STRATEGY);
if (res != Z_OK) {
return false;
}
int deflate_res = Z_OK;
while (strm.avail_in != 0) {
int res = deflate(&strm, Z_NO_FLUSH);
if (res != Z_OK) {
return false;
}
if (strm.avail_out == 0) {
output->append(temp_buffer, temp_buffer + BUFSIZE);
strm.next_out = temp_buffer;
strm.avail_out = BUFSIZE;
}
}
while (deflate_res == Z_OK) {
if (strm.avail_out == 0) {
output->append(temp_buffer, temp_buffer + BUFSIZE);
strm.next_out = temp_buffer;
strm.avail_out = BUFSIZE;
}
deflate_res = deflate(&strm, Z_FINISH);
}
if (deflate_res != Z_STREAM_END) {
return false;
}
output->append(temp_buffer, temp_buffer + BUFSIZE - strm.avail_out);
deflateEnd(&strm);
return true;
#else
// Silence compiler warnings about unused arguments.
(void)level;
(void)input;
(void)length;
(void)output;
(void)raw;
return false;
#endif // HAVE_ZLIB
}

inline bool Zlib_Uncompress(const char* input, size_t length,
std::string* output, uint8_t raw = 0)
{
#if HAVE_ZLIB
const int CHUNK = 64 * 1024;
int ret;
size_t have;
z_stream strm;
unsigned char out[CHUNK];
/* allocate inflate state */
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = (uint32_t)length;
strm.next_in = (Bytef*)input;
ret = inflateInit2(&strm, (raw ? -15 : 15));
if (ret != Z_OK) {
return false;
}
/* decompress until deflate stream ends or end of file */
do {
/* run inflate() on input until output buffer not full */
do {
strm.avail_out = CHUNK;
strm.next_out = out;
ret = ::inflate(&strm, Z_NO_FLUSH);
if (ret == Z_NEED_DICT) {
ret = Z_DATA_ERROR;
}
if (ret < 0) {
(void)inflateEnd(&strm);
return false;
}
have = CHUNK - strm.avail_out;
output->append((char*)out, have);
} while (strm.avail_out == 0);
/* done when inflate() says it's done */
} while (ret != Z_STREAM_END);
/* clean up and return */
(void)inflateEnd(&strm);
return ret == Z_STREAM_END ? true : false;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
(void)raw;
return false;
#endif // HAVE_ZLIB
}

inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
// Silence compiler warnings about unused arguments.
(void)func;
Expand Down
15 changes: 15 additions & 0 deletions table/format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
result->cachable = true;
break;
}
case kZlibRawCompression: {
std::string buffer;
if (!port::Zlib_Uncompress(data, n, &buffer, true)) {
delete[] buf;
return Status::Corruption("corrupted zlib compressed block contents");
}
auto ubuf = new char[buffer.size()];
memcpy(ubuf, buffer.data(), buffer.size());
delete[] buf;
result->data = Slice(ubuf,buffer.size());
result->heap_allocated = true;
result->cachable = true;
break;
}
default:
delete[] buf;
return Status::Corruption("bad block type");
Expand All @@ -161,4 +175,5 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
return Status::OK();
}


} // namespace leveldb
15 changes: 15 additions & 0 deletions table/table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
}
break;
}

case kZlibRawCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zlib_Compress(r->options.zlib_compression_level, raw.data(),
raw.size(), compressed, true) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zlib not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
Expand Down