Skip to content

Commit

Permalink
[feat]: add test case for write api
Browse files Browse the repository at this point in the history
  • Loading branch information
stone1100 committed Jul 31, 2022
1 parent d974430 commit df52202
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 16 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,12 @@ func main() {

// close write client
w.Close()
// sleep just for testing
time.Sleep(time.Minute)
}
```

### Reading background process errors

Write client doesn't log any error. Can use [Errors()](https://pkg.go.dev/github.com/lindb/client_go/api#Write.Errors) method, which returns the channel for reading errors occurring
Write client doesn't log any error. Can use [Errors()](https://pkg.go.dev/github.com/lindb/client_go/api#Write) method, which returns the channel for reading errors occurring
during async writes.

```go
Expand Down Expand Up @@ -130,8 +128,6 @@ func main() {

// close write client
w.Close()
// sleep just for testing
time.Sleep(time.Minute)
}
```

Expand Down
10 changes: 5 additions & 5 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ func (w *write) Errors() <-chan error {

// Close closes write client, before close try to send pending points.
func (w *write) Close() {
if w.closed {
return
}

w.mutex.Lock()
defer w.mutex.Unlock()

Expand Down Expand Up @@ -197,8 +193,12 @@ func (w *write) flushBuffer() {
w.buf.Reset() // reset batch buf
w.batchedSize = 0

// copy data
dst := make([]byte, len(data))
copy(dst, data)

// put data into send chan
w.sendCh <- data
w.sendCh <- dst
}

// batchPoint marshals point, if success put data into buffer.
Expand Down
119 changes: 119 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package api

import (
"context"
"fmt"
"math"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"

httppkg "github.com/lindb/client_go/internal/http"
)

func TestNewWrite(t *testing.T) {
w := NewWrite("http://localhost:9000", "test",
DefaultWriteOptions(), httppkg.DefaultOptions())
assert.NotNil(t, w)
errCh := w.Errors()
assert.NotNil(t, errCh)
w.Close()
w.Close() // ignore it
}

func TestWriteData(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`ok`))
}))
defer svr.Close()

w := NewWrite(svr.URL, "test",
DefaultWriteOptions().AddDefaultTag("key", "value"), httppkg.DefaultOptions())
for i := 0; i < 10; i++ {
w.AddPoint(context.TODO(), NewPoint("cpu").
AddTag("key1", "value1").AddField(NewLast("load", 10.0)))
}
w.Close()
}

func TestWriteData_Failure(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`error`))
}))
defer svr.Close()

w := NewWrite(svr.URL, "test",
DefaultWriteOptions().SetMaxRetries(2).SetBatchSize(1).
SetRetryBufferLimit(50), httppkg.DefaultOptions())
for i := 0; i < 100; i++ {
w.AddPoint(context.TODO(), NewPoint("cpu").
AddTag("key1", "value1").AddField(NewLast("load", 10.0)))
}
w.Close()
}

func TestAddPoint(t *testing.T) {
t.Run("invalid point", func(t *testing.T) {
w := write{}
w.AddPoint(context.TODO(), NewPoint("cpu"))
})
t.Run("add point timeout", func(t *testing.T) {
w := write{bufferCh: make(chan *Point)}
ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10)
defer cancel()
w.AddPoint(ctx, NewPoint("cpu").AddField(NewLast("load", 10.0)))
})
}

func TestAddWrongPoint(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`ok`))
}))
defer svr.Close()

t.Run("wrong common tags", func(t *testing.T) {
w := NewWrite(svr.URL, "test",
DefaultWriteOptions().AddDefaultTag("key", ""), httppkg.DefaultOptions())
w.AddPoint(context.TODO(), NewPoint("cpu").AddField(NewLast("load", 10.0)))
w.Close()
})
t.Run("wrong field data", func(t *testing.T) {
w := NewWrite(svr.URL, "test",
DefaultWriteOptions(), httppkg.DefaultOptions())
w.AddPoint(context.TODO(), NewPoint("cpu").AddField(NewSum("load", math.Inf(0))))
w.Close()
})
t.Run("wrong point tags", func(t *testing.T) {
w := NewWrite(svr.URL, "test",
DefaultWriteOptions(), httppkg.DefaultOptions())
w.AddPoint(context.TODO(), NewPoint("cpu").AddTag("key", "").AddField(NewLast("load", 10.0)))
errCh := w.Errors()
go func() {
for err := range errCh {
fmt.Println(err)
}
}()
w.Close()
})
}
3 changes: 0 additions & 3 deletions example/read_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package main
import (
"context"
"fmt"
"time"

lindb "github.com/lindb/client_go"
"github.com/lindb/client_go/api"
Expand Down Expand Up @@ -49,6 +48,4 @@ func readErrors() {

// close write client
w.Close()
// sleep just for testing
time.Sleep(time.Minute)
}
3 changes: 0 additions & 3 deletions example/write_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package main
import (
"context"
"fmt"
"time"

lindb "github.com/lindb/client_go"
"github.com/lindb/client_go/api"
Expand Down Expand Up @@ -61,6 +60,4 @@ func writeData() {

// close write client
w.Close()
// sleep just for testing
time.Sleep(time.Minute)
}

0 comments on commit df52202

Please sign in to comment.