Skip to content

Commit

Permalink
Merge pull request #1297 from nono/replicator-last-seq
Browse files Browse the repository at this point in the history
Use the last sequence number in the replicator
  • Loading branch information
nono authored Mar 23, 2018
2 parents 261e29f + 3aa98fa commit 01bfe35
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 13 deletions.
15 changes: 15 additions & 0 deletions pkg/couchdb/couchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,21 @@ func TestJSONDocClone(t *testing.T) {
assert.NotEqual(t, hdr1.Len, hdr4.Len)
}

func TestLocalDocuments(t *testing.T) {
id := "foo"
_, err := GetLocal(TestPrefix, TestDoctype, id)
assert.True(t, IsNotFoundError(err))

doc := map[string]interface{}{"bar": "baz"}
err = PutLocal(TestPrefix, TestDoctype, id, doc)
assert.NoError(t, err)
assert.NotEmpty(t, doc["_rev"])

out, err := GetLocal(TestPrefix, TestDoctype, id)
assert.NoError(t, err)
assert.Equal(t, "baz", out["bar"])
}

func assertGotEvent(t *testing.T, eventType, id string) bool {
receivedEventsMutex.Lock()
_, ok := receivedEvents[eventType+id]
Expand Down
29 changes: 29 additions & 0 deletions pkg/couchdb/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package couchdb

import (
"net/http"
"net/url"
)

// GetLocal fetch a local document from CouchDB
// http://docs.couchdb.org/en/2.1.1/api/local.html#get--db-_local-docid
func GetLocal(db Database, doctype, id string) (map[string]interface{}, error) {
var out map[string]interface{}
u := "_local/" + url.PathEscape(id)
if err := makeRequest(db, doctype, http.MethodGet, u, nil, &out); err != nil {
return nil, err
}
return out, nil
}

// PutLocal will put a local document in CouchDB.
// Note that you should put the last revision in `doc` to avoid conflicts.
func PutLocal(db Database, doctype, id string, doc map[string]interface{}) error {
u := "_local/" + url.PathEscape(id)
var out UpdateResponse
if err := makeRequest(db, doctype, http.MethodPut, u, doc, &out); err != nil {
return err
}
doc["_rev"] = out.Rev
return nil
}
88 changes: 75 additions & 13 deletions pkg/sharing/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ func (s *Sharing) Replicate(inst *instance.Instance) error {
// ReplicateTo starts a replicator on this sharing to the given member.
// http://docs.couchdb.org/en/2.1.1/replication/protocol.html
// https://github.com/pouchdb/pouchdb/blob/master/packages/node_modules/pouchdb-replication/src/replicate.js
// TODO check for errors
func (s *Sharing) ReplicateTo(inst *instance.Instance, m *Member) error {
if m.Instance == "" {
return ErrInvalidURL
}

// TODO get the last sequence number
lastSeq, err := s.getLastSeqNumber(inst, m)
if err != nil {
return err
}
fmt.Printf("lastSeq = %s\n", lastSeq)

changes, err := s.callChangesFeed(inst)
changes, seq, err := s.callChangesFeed(inst, lastSeq)
if err != nil {
return err
}
Expand All @@ -70,10 +75,64 @@ func (s *Sharing) ReplicateTo(inst *instance.Instance, m *Member) error {
fmt.Printf("docs = %#v\n", docs)

err = s.sendBulkDocs(m, docs)
return err
if err != nil {
return err
}

// TODO check for errors
// TODO save the sequence number
return s.UpdateLastSequenceNumber(inst, m, seq)
}

// getLastSeqNumber returns the last sequence number of the previous
// replication to this member
func (s *Sharing) getLastSeqNumber(inst *instance.Instance, m *Member) (string, error) {
id, err := s.replicationID(m)
if err != nil {
return "", err
}
result, err := couchdb.GetLocal(inst, consts.Shared, id)
if couchdb.IsNotFoundError(err) {
return "", nil
}
if err != nil {
return "", err
}
seq, _ := result["last_seq"].(string)
return seq, nil
}

// UpdateLastSequenceNumber updates the last sequence number for this
// replication if it's superior to the number in CouchDB
func (s *Sharing) UpdateLastSequenceNumber(inst *instance.Instance, m *Member, seq string) error {
id, err := s.replicationID(m)
if err != nil {
return err
}
result, err := couchdb.GetLocal(inst, consts.Shared, id)
if err != nil {
if !couchdb.IsNotFoundError(err) {
return err
}
result = make(map[string]interface{})
} else {
if prev, ok := result["last_seq"].(string); ok {
if RevGeneration(seq) <= RevGeneration(prev) {
return nil
}
}
}
result["last_seq"] = seq
return couchdb.PutLocal(inst, consts.Shared, id, result)
}

// replicationID gives an identifier for this replicator
func (s *Sharing) replicationID(m *Member) (string, error) {
for i := range s.Members {
if &s.Members[i] == m {
id := fmt.Sprintf("sharing-%s-%d", s.SID, i)
return id, nil
}
}
return "", ErrMemberNotFound
}

// Changes is a map of "doctype-docid" -> [revisions]
Expand All @@ -82,14 +141,16 @@ type Changes map[string][]string

// callChangesFeed fetches the last changes from the changes feed
// http://docs.couchdb.org/en/2.1.1/api/database/changes.html
// TODO add Limit, add Since, add a filter on the sharing
func (s *Sharing) callChangesFeed(inst *instance.Instance) (*Changes, error) {
// TODO add a filter on the sharing
func (s *Sharing) callChangesFeed(inst *instance.Instance, since string) (*Changes, string, error) {
response, err := couchdb.GetChanges(inst, &couchdb.ChangesRequest{
DocType: consts.Shared,
IncludeDocs: true,
Since: since,
Limit: 100,
})
if err != nil {
return nil, err
return nil, "", err
}
changes := make(Changes)
for _, r := range response.Results {
Expand All @@ -98,7 +159,7 @@ func (s *Sharing) callChangesFeed(inst *instance.Instance) (*Changes, error) {
changes[r.DocID][i] = c.Rev
}
}
return &changes, nil
return &changes, response.LastSeq, nil
}

// Missings is a struct for the response of _revs_diff
Expand All @@ -117,7 +178,8 @@ func (s *Sharing) callRevsDiff(m *Member, changes *Changes) (*Missings, error) {
if err != nil {
return nil, err
}
leafRevs := make(map[string][]string) // "doctype-docid" -> [leaf revisions]
// "doctype-docid" -> [leaf revisions]
leafRevs := make(map[string][]string, len(*changes))
for key, revs := range *changes {
leafRevs[key] = revs[len(revs)-1:]
}
Expand Down Expand Up @@ -158,13 +220,13 @@ func (s *Sharing) callRevsDiff(m *Member, changes *Changes) (*Missings, error) {
func computePossibleAncestors(wants []string, haves []string) []string {
// Build a sorted array of unique generation number for revisions of wants
var wgs []int
seen := make(map[int]bool)
seen := make(map[int]struct{})
for _, rev := range wants {
g := RevGeneration(rev)
if !seen[g] {
if _, ok := seen[g]; !ok {
wgs = append(wgs, g)
}
seen[g] = true
seen[g] = struct{}{}
}
sort.Ints(wgs)

Expand Down
67 changes: 67 additions & 0 deletions pkg/sharing/replicator_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package sharing

import (
"os"
"testing"

"github.com/cozy/cozy-stack/pkg/config"
"github.com/cozy/cozy-stack/pkg/couchdb"
"github.com/cozy/cozy-stack/pkg/instance"
"github.com/cozy/cozy-stack/tests/testutils"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)

var inst *instance.Instance

const testDoctype = "io.cozy.sharing.tests"

func TestRevGeneration(t *testing.T) {
assert.Equal(t, 1, RevGeneration("1-aaa"))
assert.Equal(t, 3, RevGeneration("3-123"))
Expand All @@ -31,3 +41,60 @@ func TestComputePossibleAncestors(t *testing.T) {
expected = []string{"3-a"}
assert.Equal(t, expected, pas)
}

func uuidv4() string {
id, _ := uuid.NewV4()
return id.String()
}

func createSharedRef(t *testing.T) {
ref := SharedRef{
SID: testDoctype + "/" + uuidv4(),
Revisions: []string{"1-aaa"},
}
err := couchdb.CreateNamedDocWithDB(inst, &ref)
assert.NoError(t, err)
}

func TestSequenceNumber(t *testing.T) {
nb := 5
for i := 0; i < nb; i++ {
createSharedRef(t)
}
s := &Sharing{SID: uuidv4(), Members: []Member{
{Status: MemberStatusOwner, Name: "Alice"},
{Status: MemberStatusReady, Name: "Bob"},
}}
m := &s.Members[1]

rid, err := s.replicationID(m)
assert.NoError(t, err)
assert.Equal(t, "sharing-"+s.SID+"-1", rid)

seq, err := s.getLastSeqNumber(inst, m)
assert.NoError(t, err)
assert.Empty(t, seq)
_, seq, err = s.callChangesFeed(inst, seq)
assert.NoError(t, err)
assert.NotEmpty(t, seq)
assert.Equal(t, nb, RevGeneration(seq))
err = s.UpdateLastSequenceNumber(inst, m, seq)
assert.NoError(t, err)
seq2, err := s.getLastSeqNumber(inst, m)
assert.NoError(t, err)
assert.Equal(t, seq, seq2)

err = s.UpdateLastSequenceNumber(inst, m, "2-abc")
assert.NoError(t, err)
seq3, err := s.getLastSeqNumber(inst, m)
assert.NoError(t, err)
assert.Equal(t, seq, seq3)
}

func TestMain(m *testing.M) {
config.UseTestFile()
testutils.NeedCouchdb()
setup := testutils.NewSetup(m, "sharing_test_repl")
inst = setup.GetTestInstance()
os.Exit(setup.Run())
}

0 comments on commit 01bfe35

Please sign in to comment.