Skip to content

Commit 7983675

Browse files
committed
Add BlobCompressor to compress blob and text fields on fly
1 parent 6af144e commit 7983675

File tree

5 files changed

+172
-8
lines changed

5 files changed

+172
-8
lines changed

compressor_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package gocql
22

33
import (
44
"bytes"
5+
"strings"
56
"testing"
67

78
"github.com/golang/snappy"
9+
"github.com/google/go-cmp/cmp"
10+
11+
"github.com/gocql/gocql/lz4"
812
)
913

1014
func TestSnappyCompressor(t *testing.T) {
@@ -36,3 +40,63 @@ func TestSnappyCompressor(t *testing.T) {
3640
t.Fatal("failed to match the expected decoded value with the result decoded value.")
3741
}
3842
}
43+
44+
func TestBlobCompressor(t *testing.T) {
45+
session := createSession(t)
46+
defer session.Close()
47+
// TypeVarchar, TypeAscii, TypeBlob, TypeText
48+
if err := createTable(session, `CREATE TABLE gocql_test.test_blob_compressor (
49+
testuuid timeuuid PRIMARY KEY,
50+
testvarchar varchar,
51+
testblob blob,
52+
testascii ascii,
53+
testtext text,
54+
)`); err != nil {
55+
t.Fatal("create table:", err)
56+
}
57+
m := make(map[string]interface{})
58+
59+
BlobCompressor = lz4.NewBlobCompressor(100)
60+
61+
originalBlob := strings.Repeat("1234567890", 20)
62+
63+
m["testuuid"] = TimeUUID()
64+
m["testvarchar"] = originalBlob
65+
m["testblob"] = []byte(originalBlob)
66+
m["testascii"] = originalBlob
67+
m["testtext"] = originalBlob
68+
sliceMap := []map[string]interface{}{m}
69+
if err := session.Query(`INSERT INTO gocql_test.test_blob_compressor (testuuid, testvarchar, testblob, testascii, testtext) VALUES (?, ?, ?, ?, ?)`,
70+
m["testuuid"], m["testvarchar"], m["testblob"], m["testascii"], m["testtext"]).Exec(); err != nil {
71+
t.Fatal("insert:", err)
72+
}
73+
if returned, retErr := session.Query(`SELECT * FROM test_blob_compressor`).Iter().SliceMap(); retErr != nil {
74+
t.Fatal("select:", retErr)
75+
} else {
76+
if diff := cmp.Diff(sliceMap, returned); diff != "" {
77+
t.Fatal("mismatch in returned map", diff)
78+
}
79+
}
80+
81+
// Test for Iter.MapScan()
82+
{
83+
testMap := make(map[string]interface{})
84+
if !session.Query(`SELECT * FROM test_blob_compressor`).Iter().MapScan(testMap) {
85+
t.Fatal("MapScan failed to work with one row")
86+
}
87+
if diff := cmp.Diff(sliceMap[0], testMap); diff != "" {
88+
t.Fatal("mismatch in returned map", diff)
89+
}
90+
}
91+
92+
// Test for Query.MapScan()
93+
{
94+
testMap := make(map[string]interface{})
95+
if session.Query(`SELECT * FROM test_blob_compressor`).MapScan(testMap) != nil {
96+
t.Fatal("MapScan failed to work with one row")
97+
}
98+
if diff := cmp.Diff(sliceMap[0], testMap); diff != "" {
99+
t.Fatal("mismatch in returned map", diff)
100+
}
101+
}
102+
}

go.mod

+7-5
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ module github.com/gocql/gocql
33
require (
44
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
55
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
6+
github.com/gocql/gocql/lz4 v0.0.0-20240625120741-974fa1211cce // indirect
67
github.com/golang/snappy v0.0.3
78
github.com/google/go-cmp v0.4.0
89
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
910
github.com/kr/pretty v0.1.0 // indirect
10-
github.com/stretchr/testify v1.3.0 // indirect
1111
golang.org/x/net v0.0.0-20220526153639-5463443f8c37
1212
gopkg.in/inf.v0 v0.9.1
1313
sigs.k8s.io/yaml v1.3.0
1414
)
1515

1616
retract (
17-
v1.8.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
18-
v1.8.1 // tag from kiwicom/gocql added by mistake to scylladb/gocql
19-
v1.9.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
20-
v1.10.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
17+
v1.10.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
18+
v1.9.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
19+
v1.8.1 // tag from kiwicom/gocql added by mistake to scylladb/gocql
20+
v1.8.0 // tag from kiwicom/gocql added by mistake to scylladb/gocql
2121
)
2222

23+
replace github.com/gocql/gocql/lz4 => ./lz4/
24+
2325
go 1.13

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
1616
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1717
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1818
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
19+
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
20+
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
1921
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2022
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2123
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2224
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
2325
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
26+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2427
golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8=
2528
golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
2629
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -36,5 +39,6 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
3639
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
3740
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
3841
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
42+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3943
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
4044
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

lz4/blob_compressor.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package lz4
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"fmt"
7+
8+
"github.com/pierrec/lz4/v4"
9+
)
10+
11+
type BlobCompressor struct {
12+
prefix []byte
13+
prefixPlusLen int
14+
limit int
15+
}
16+
17+
var defaultPrefix = []byte{0x01, 0x11, 0x22, 0x33}
18+
19+
func NewBlobCompressor(limit int) BlobCompressor {
20+
return BlobCompressor{
21+
prefix: defaultPrefix,
22+
prefixPlusLen: len(defaultPrefix) + 4,
23+
limit: limit,
24+
}
25+
}
26+
27+
func (c BlobCompressor) Compress(data []byte) ([]byte, error) {
28+
if len(data) < c.limit {
29+
return data, nil
30+
}
31+
buf := make([]byte, len(c.prefix)+lz4.CompressBlockBound(len(data)+4))
32+
copy(buf, c.prefix)
33+
34+
var compressor lz4.Compressor
35+
36+
n, err := compressor.CompressBlock(data, buf[c.prefixPlusLen:])
37+
// According to lz4.CompressBlock doc, it doesn't fail as long as the dst
38+
// buffer length is at least lz4.CompressBlockBound(len(data))) bytes, but
39+
// we check for error anyway just to be thorough.
40+
if err != nil {
41+
return nil, err
42+
}
43+
binary.BigEndian.PutUint32(buf[len(c.prefix):], uint32(len(data)))
44+
45+
return buf[:c.prefixPlusLen+n], nil
46+
}
47+
48+
func (c BlobCompressor) Decompress(data []byte) ([]byte, error) {
49+
if !bytes.HasPrefix(data, c.prefix) {
50+
return data, nil
51+
}
52+
if len(data) < 4+len(c.prefix) {
53+
return nil, fmt.Errorf("compressed data should be >4, got=%d", len(data))
54+
}
55+
uncompressedLength := binary.BigEndian.Uint32(data[len(c.prefix):])
56+
if uncompressedLength == 0 {
57+
return nil, nil
58+
}
59+
buf := make([]byte, uncompressedLength)
60+
n, err := lz4.UncompressBlock(data[c.prefixPlusLen:], buf)
61+
return buf[:n], err
62+
}

marshal.go

+35-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ type Unmarshaler interface {
4242
UnmarshalCQL(info TypeInfo, data []byte) error
4343
}
4444

45+
type blobCompressor interface {
46+
Compress([]byte) ([]byte, error)
47+
Decompress([]byte) ([]byte, error)
48+
}
49+
50+
var BlobCompressor blobCompressor
51+
4552
// Marshal returns the CQL encoding of the value for the Cassandra
4653
// internal type described by the info parameter.
4754
//
@@ -110,8 +117,10 @@ func Marshal(info TypeInfo, value interface{}) ([]byte, error) {
110117
}
111118

112119
switch info.Type() {
113-
case TypeVarchar, TypeAscii, TypeBlob, TypeText:
120+
case TypeBlob, TypeText:
114121
return marshalVarchar(info, value)
122+
case TypeAscii, TypeVarchar:
123+
return marshalVarcharRaw(info, value)
115124
case TypeBoolean:
116125
return marshalBool(info, value)
117126
case TypeTinyInt:
@@ -212,8 +221,10 @@ func Unmarshal(info TypeInfo, data []byte, value interface{}) error {
212221
}
213222

214223
switch info.Type() {
215-
case TypeVarchar, TypeAscii, TypeBlob, TypeText:
224+
case TypeBlob, TypeText:
216225
return unmarshalVarchar(info, data, value)
226+
case TypeVarchar, TypeAscii:
227+
return unmarshalVarcharRaw(info, data, value)
217228
case TypeBoolean:
218229
return unmarshalBool(info, data, value)
219230
case TypeInt:
@@ -289,6 +300,17 @@ func unmarshalNullable(info TypeInfo, data []byte, value interface{}) error {
289300
}
290301

291302
func marshalVarchar(info TypeInfo, value interface{}) ([]byte, error) {
303+
v, err := marshalVarcharRaw(info, value)
304+
if err != nil {
305+
return nil, err
306+
}
307+
if BlobCompressor == nil {
308+
return v, nil
309+
}
310+
return BlobCompressor.Compress(v)
311+
}
312+
313+
func marshalVarcharRaw(info TypeInfo, value interface{}) ([]byte, error) {
292314
switch v := value.(type) {
293315
case Marshaler:
294316
return v.MarshalCQL(info)
@@ -316,7 +338,17 @@ func marshalVarchar(info TypeInfo, value interface{}) ([]byte, error) {
316338
return nil, marshalErrorf("can not marshal %T into %s", value, info)
317339
}
318340

319-
func unmarshalVarchar(info TypeInfo, data []byte, value interface{}) error {
341+
func unmarshalVarchar(info TypeInfo, data []byte, value interface{}) (err error) {
342+
if BlobCompressor != nil {
343+
data, err = BlobCompressor.Decompress(data)
344+
if err != nil {
345+
return err
346+
}
347+
}
348+
return unmarshalVarcharRaw(info, data, value)
349+
}
350+
351+
func unmarshalVarcharRaw(info TypeInfo, data []byte, value interface{}) error {
320352
switch v := value.(type) {
321353
case Unmarshaler:
322354
return v.UnmarshalCQL(info, data)

0 commit comments

Comments
 (0)