Skip to content

Commit d4b5d04

Browse files
dmygeroleg-jukovec
authored andcommitted
api: add support of a batch insert request
Add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW. Closes #399
1 parent cbc8150 commit d4b5d04

14 files changed

+802
-20
lines changed

CHANGELOG.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
99
## [Unreleased]
1010

1111
### Added
12-
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13-
connection and ctx is not canceled;
14-
also added logs for error case of `ConnectionPool.tryConnect()` calls in
12+
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13+
connection and ctx is not canceled;
14+
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
1717
- Implemented stringer methods for pool.Role (#405).
18+
- Support the IPROTO_INSERT_ARROW request (#399).
1819

1920
### Changed
2021

arrow/arrow.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package arrow
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Arrow MessagePack extension type.
11+
const arrowExtId = 8
12+
13+
// Arrow struct wraps a raw arrow data buffer.
14+
type Arrow struct {
15+
data []byte
16+
}
17+
18+
// MakeArrow returns a new arrow.Arrow object that contains
19+
// wrapped a raw arrow data buffer.
20+
func MakeArrow(arrow []byte) (Arrow, error) {
21+
return Arrow{arrow}, nil
22+
}
23+
24+
// Raw returns a []byte that contains Arrow raw data.
25+
func (a Arrow) Raw() []byte {
26+
return a.data
27+
}
28+
29+
func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
30+
arrow := Arrow{
31+
data: make([]byte, extLen),
32+
}
33+
n, err := d.Buffered().Read(arrow.data)
34+
if err != nil {
35+
return fmt.Errorf("arrowDecoder: can't read bytes on Arrow decode: %w", err)
36+
}
37+
if n < extLen || n != len(arrow.data) {
38+
return fmt.Errorf("arrowDecoder: unexpected end of stream after %d Arrow bytes", n)
39+
}
40+
41+
v.Set(reflect.ValueOf(arrow))
42+
return nil
43+
}
44+
45+
func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
46+
arr, ok := v.Interface().(Arrow)
47+
if !ok {
48+
return []byte{}, fmt.Errorf("arrowEncoder: not an Arrow type")
49+
}
50+
return arr.data, nil
51+
}
52+
53+
func init() {
54+
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
55+
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
56+
}

arrow/arrow_test.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package arrow_test
2+
3+
import (
4+
"bytes"
5+
"encoding/hex"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
"github.com/tarantool/go-tarantool/v2/arrow"
10+
"github.com/vmihailenco/msgpack/v5"
11+
)
12+
13+
var longArrow, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
14+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
15+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
16+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
17+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
18+
"01000000000000003400000008000000000000000200000000000000000000000000" +
19+
"00000000000000000000000000000800000000000000000000000100000001000000" +
20+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
21+
"00000000000000000000")
22+
23+
var tests = []struct {
24+
name string
25+
arr []byte
26+
enc []byte
27+
}{
28+
{
29+
"abc",
30+
[]byte{'a', 'b', 'c'},
31+
[]byte{0xc7, 0x3, 0x8, 'a', 'b', 'c'},
32+
},
33+
{
34+
"empty",
35+
[]byte{},
36+
[]byte{0xc7, 0x0, 0x8},
37+
},
38+
{
39+
"one",
40+
[]byte{1},
41+
[]byte{0xd4, 0x8, 0x1},
42+
},
43+
{
44+
"long",
45+
longArrow,
46+
[]byte{
47+
0xc8, 0x1, 0x10, 0x8, 0xff, 0xff, 0xff, 0xff, 0x70, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0,
48+
0x0, 0x9e, 0xff, 0xff, 0xff, 0x4, 0x0, 0x1, 0x0, 0x4, 0x0, 0x0, 0x0, 0xb6, 0xff, 0xff,
49+
0xff, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
50+
0x4, 0x0, 0x0, 0x0, 0xda, 0xff, 0xff, 0xff, 0x14, 0x0, 0x0, 0x0, 0x2, 0x2, 0x0, 0x0,
51+
0x4, 0x0, 0x0, 0x0, 0xf0, 0xff, 0xff, 0xff, 0x40, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
52+
0x61, 0x0, 0x0, 0x0, 0x6, 0x0, 0x8, 0x0, 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0x4, 0x0, 0x8,
53+
0x0, 0x9, 0x0, 0xc, 0x0, 0xc, 0x0, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8, 0x0,
54+
0xa, 0x0, 0xc, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xff, 0xff, 0xff, 0xff, 0x88, 0x0,
55+
0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8a, 0xff, 0xff, 0xff, 0x4, 0x0, 0x3, 0x0, 0x10, 0x0,
56+
0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xac, 0xff, 0xff,
57+
0xff, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x34, 0x0, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0,
58+
0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
59+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x8, 0x0,
60+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0,
61+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x14, 0x0,
62+
0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0xc, 0x0, 0x14, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xc,
63+
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
64+
},
65+
},
66+
}
67+
68+
func TestEncodeArrow(t *testing.T) {
69+
for _, tt := range tests {
70+
t.Run(tt.name, func(t *testing.T) {
71+
buf := bytes.NewBuffer([]byte{})
72+
enc := msgpack.NewEncoder(buf)
73+
74+
arr, err := arrow.MakeArrow(tt.arr)
75+
require.NoError(t, err)
76+
77+
err = enc.Encode(arr)
78+
require.NoError(t, err)
79+
80+
require.Equal(t, tt.enc, buf.Bytes())
81+
})
82+
83+
}
84+
}
85+
86+
func TestDecodeArrow(t *testing.T) {
87+
for _, tt := range tests {
88+
t.Run(tt.name, func(t *testing.T) {
89+
90+
buf := bytes.NewBuffer(tt.enc)
91+
dec := msgpack.NewDecoder(buf)
92+
93+
var arr arrow.Arrow
94+
err := dec.Decode(&arr)
95+
require.NoError(t, err)
96+
97+
require.Equal(t, tt.arr, arr.Raw())
98+
})
99+
}
100+
}

arrow/example_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Run Tarantool Enterprise Edition instance before example execution:
2+
//
3+
// Terminal 1:
4+
// $ cd arrow
5+
// $ TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool testdata/config-memcs.lua
6+
//
7+
// Terminal 2:
8+
// $ go test -v example_test.go
9+
package arrow_test
10+
11+
import (
12+
"context"
13+
"encoding/hex"
14+
"fmt"
15+
"log"
16+
"time"
17+
18+
"github.com/tarantool/go-tarantool/v2"
19+
"github.com/tarantool/go-tarantool/v2/arrow"
20+
)
21+
22+
var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
23+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
24+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
25+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
26+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
27+
"01000000000000003400000008000000000000000200000000000000000000000000" +
28+
"00000000000000000000000000000800000000000000000000000100000001000000" +
29+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
30+
"00000000000000000000")
31+
32+
func Example() {
33+
dialer := tarantool.NetDialer{
34+
Address: "127.0.0.1:3013",
35+
User: "test",
36+
Password: "test",
37+
}
38+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
39+
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
40+
cancel()
41+
if err != nil {
42+
log.Fatalf("Failed to connect: %s", err)
43+
}
44+
45+
arr, err := arrow.MakeArrow(arrowBinData)
46+
if err != nil {
47+
log.Fatalf("Failed prepare Arrow data: %s", err)
48+
}
49+
50+
req := arrow.NewInsertRequest("testArrow", arr)
51+
52+
resp, err := client.Do(req).Get()
53+
if err != nil {
54+
log.Fatalf("Failed insert Arrow: %s", err)
55+
}
56+
if len(resp) > 0 {
57+
log.Fatalf("Unexpected response")
58+
} else {
59+
fmt.Printf("Batch arrow inserted")
60+
}
61+
}

arrow/request.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package arrow
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/tarantool/go-iproto"
8+
"github.com/tarantool/go-tarantool/v2"
9+
"github.com/vmihailenco/msgpack/v5"
10+
)
11+
12+
// INSERT Arrow request.
13+
//
14+
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
15+
// https://github.com/tarantool/go-tarantool/issues/412
16+
const iprotoInsertArrowType = iproto.Type(17)
17+
18+
// The data in Arrow format.
19+
//
20+
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
21+
// https://github.com/tarantool/go-tarantool/issues/412
22+
const iprotoArrowKey = iproto.Key(0x36)
23+
24+
// InsertRequest helps you to create an insert request object for execution
25+
// by a Connection.
26+
type InsertRequest struct {
27+
arrow Arrow
28+
space interface{}
29+
ctx context.Context
30+
}
31+
32+
// NewInsertRequest returns a new InsertRequest.
33+
func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest {
34+
return &InsertRequest{
35+
space: space,
36+
arrow: arrow,
37+
}
38+
}
39+
40+
// Type returns a IPROTO_INSERT_ARROW type for the request.
41+
func (r *InsertRequest) Type() iproto.Type {
42+
return iprotoInsertArrowType
43+
}
44+
45+
// Async returns false to the request return a response.
46+
func (r *InsertRequest) Async() bool {
47+
return false
48+
}
49+
50+
// Ctx returns a context of the request.
51+
func (r *InsertRequest) Ctx() context.Context {
52+
return r.ctx
53+
}
54+
55+
// Context sets a passed context to the request.
56+
//
57+
// Pay attention that when using context with request objects,
58+
// the timeout option for Connection does not affect the lifetime
59+
// of the request. For those purposes use context.WithTimeout() as
60+
// the root context.
61+
func (r *InsertRequest) Context(ctx context.Context) *InsertRequest {
62+
r.ctx = ctx
63+
return r
64+
}
65+
66+
// Arrow sets the arrow for insertion the insert arrow request.
67+
// Note: default value is nil.
68+
func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest {
69+
r.arrow = arrow
70+
return r
71+
}
72+
73+
// Body fills an msgpack.Encoder with the insert arrow request body.
74+
func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error {
75+
if err := enc.EncodeMapLen(2); err != nil {
76+
return err
77+
}
78+
if err := tarantool.EncodeSpace(res, enc, r.space); err != nil {
79+
return err
80+
}
81+
if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
82+
return err
83+
}
84+
return enc.Encode(r.arrow)
85+
}
86+
87+
// Response creates a response for the InsertRequest.
88+
func (r *InsertRequest) Response(
89+
header tarantool.Header,
90+
body io.Reader,
91+
) (tarantool.Response, error) {
92+
return tarantool.DecodeBaseResponse(header, body)
93+
}

0 commit comments

Comments
 (0)