diff --git a/envelopes.go b/envelopes.go index 7bcacac..8b2d442 100644 --- a/envelopes.go +++ b/envelopes.go @@ -58,11 +58,6 @@ type Envelope interface { String() string } -type EventEnvelope struct { - SubscriptionID *string - Event -} - var ( _ Envelope = (*EventEnvelope)(nil) _ Envelope = (*ReqEnvelope)(nil) @@ -74,6 +69,11 @@ var ( _ Envelope = (*AuthEnvelope)(nil) ) +type EventEnvelope struct { + SubscriptionID *string + Event +} + func (_ EventEnvelope) Label() string { return "EVENT" } func (v *EventEnvelope) UnmarshalJSON(data []byte) error { @@ -96,7 +96,7 @@ func (v EventEnvelope) MarshalJSON() ([]byte, error) { if v.SubscriptionID != nil { w.RawString(`"` + *v.SubscriptionID + `",`) } - v.MarshalEasyJSON(&w) + v.Event.MarshalEasyJSON(&w) w.RawString(`]`) return w.BuildBytes() } diff --git a/go.mod b/go.mod index 67ff905..40f1b19 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/btcsuite/btcd/btcutil v1.1.3 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 - github.com/fiatjaf/eventstore v0.8.1 + github.com/fiatjaf/eventstore v0.9.0 github.com/fiatjaf/generic-ristretto v0.0.1 github.com/gobwas/httphead v0.1.0 github.com/gobwas/ws v1.3.1 @@ -28,12 +28,14 @@ require ( github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e // indirect github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect + github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/golang/glog v1.1.2 // indirect + github.com/greatroar/blobloom v0.8.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -44,3 +46,5 @@ require ( golang.org/x/sys v0.20.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/fiatjaf/eventstore => ../eventstore diff --git a/go.sum b/go.sum index 2bad03a..8e1290d 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e h1:ahyvB3q25Yn github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:kGUqhHd//musdITWjFvNTHn90WG9bMLBEPQZ17Cmlpw= github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec h1:1Qb69mGp/UtRPn422BH4/Y4Q3SLUrD9KHuDkm8iodFc= github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec/go.mod h1:CD8UlnlLDiqb36L110uqiP2iSflVjx9g/3U9hCI4q2U= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/bluekeyes/go-gitdiff v0.7.1 h1:graP4ElLRshr8ecu0UtqfNTCHrtSyZd3DABQm/DWesQ= github.com/bluekeyes/go-gitdiff v0.7.1/go.mod h1:QpfYYO1E0fTVHVZAZKiRjtSGY9823iCdvGXBcEzHGbM= @@ -29,6 +30,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e h1:0XBUw73chJ1VYSsfvcPvVT7auykAJce9FpRr10L6Qhw= @@ -76,6 +79,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/graph-gophers/dataloader/v7 v7.1.0 h1:Wn8HGF/q7MNXcvfaBnLEPEFJttVHR8zuEqP1obys/oc= github.com/graph-gophers/dataloader/v7 v7.1.0/go.mod h1:1bKE0Dm6OUcTB/OAuYVOZctgIz7Q3d0XrYtlIzTgg6Q= +github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4= +github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -108,9 +113,13 @@ github.com/puzpuzpuz/xsync/v3 v3.0.2/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPK github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.1.5-0.20170601210322-f6abca593680/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= diff --git a/interface.go b/interface.go index fec2b99..7149d2b 100644 --- a/interface.go +++ b/interface.go @@ -7,8 +7,9 @@ import ( ) type RelayStore interface { - Publish(ctx context.Context, event Event) error - QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) + Publish(context.Context, Event) error + QueryEvents(context.Context, Filter) (chan *Event, error) + QuerySync(context.Context, Filter) ([]*Event, error) } var ( @@ -26,11 +27,36 @@ func (multi MultiStore) Publish(ctx context.Context, event Event) error { return errors.Join(errs...) } -func (multi MultiStore) QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) { +func (multi MultiStore) QueryEvents(ctx context.Context, filter Filter) (chan *Event, error) { + multich := make(chan *Event) + + errs := make([]error, len(multi)) + var good bool + for i, s := range multi { + ch, err := s.QueryEvents(ctx, filter) + errs[i] = err + if err == nil { + good = true + go func(ch chan *Event) { + for evt := range ch { + multich <- evt + } + }(ch) + } + } + + if good { + return multich, nil + } else { + return nil, errors.Join(errs...) + } +} + +func (multi MultiStore) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) { errs := make([]error, len(multi)) - events := make([]*Event, 0, max(filter.Limit, 10)) + events := make([]*Event, 0, max(filter.Limit, 250)) for i, s := range multi { - res, err := s.QuerySync(ctx, filter, opts...) + res, err := s.QuerySync(ctx, filter) errs[i] = err events = append(events, res...) } diff --git a/nip77/envelopes.go b/nip77/envelopes.go new file mode 100644 index 0000000..4f48c29 --- /dev/null +++ b/nip77/envelopes.go @@ -0,0 +1,181 @@ +package nip77 + +import ( + "bytes" + "fmt" + + "github.com/mailru/easyjson" + jwriter "github.com/mailru/easyjson/jwriter" + "github.com/nbd-wtf/go-nostr" + "github.com/tidwall/gjson" +) + +func ParseNegMessage(message []byte) nostr.Envelope { + firstComma := bytes.Index(message, []byte{','}) + if firstComma == -1 { + return nil + } + label := message[0:firstComma] + + var v nostr.Envelope + switch { + case bytes.Contains(label, []byte("NEG-MSG")): + v = &MessageEnvelope{} + case bytes.Contains(label, []byte("NEG-OPEN")): + v = &OpenEnvelope{} + case bytes.Contains(label, []byte("NEG-ERR")): + v = &ErrorEnvelope{} + case bytes.Contains(label, []byte("NEG-CLOSE")): + v = &CloseEnvelope{} + default: + return nil + } + + if err := v.UnmarshalJSON(message); err != nil { + return nil + } + return v +} + +var ( + _ nostr.Envelope = (*OpenEnvelope)(nil) + _ nostr.Envelope = (*MessageEnvelope)(nil) + _ nostr.Envelope = (*CloseEnvelope)(nil) + _ nostr.Envelope = (*ErrorEnvelope)(nil) +) + +type OpenEnvelope struct { + SubscriptionID string + Filter nostr.Filter + Message string +} + +func (_ OpenEnvelope) Label() string { return "NEG-OPEN" } +func (v OpenEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) +} + +func (v *OpenEnvelope) UnmarshalJSON(data []byte) error { + r := gjson.ParseBytes(data) + arr := r.Array() + if len(arr) != 4 { + return fmt.Errorf("failed to decode NEG-OPEN envelope") + } + + v.SubscriptionID = arr[1].Str + v.Message = arr[3].Str + return easyjson.Unmarshal([]byte(arr[2].Raw), &v.Filter) +} + +func (v OpenEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message)+500)) + + res.WriteString(`["NEG-OPEN","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`",`) + + w := jwriter.Writer{} + v.Filter.MarshalEasyJSON(&w) + w.Buffer.DumpTo(res) + + res.WriteString(`,"`) + res.WriteString(v.Message) + res.WriteString(`"]`) + + return res.Bytes(), nil +} + +type MessageEnvelope struct { + SubscriptionID string + Message string +} + +func (_ MessageEnvelope) Label() string { return "NEG-MSG" } +func (v MessageEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) +} + +func (v *MessageEnvelope) UnmarshalJSON(data []byte) error { + r := gjson.ParseBytes(data) + arr := r.Array() + if len(arr) < 3 { + return fmt.Errorf("failed to decode NEG-MSG envelope") + } + v.SubscriptionID = arr[1].Str + v.Message = arr[2].Str + return nil +} + +func (v MessageEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message))) + + res.WriteString(`["NEG-MSG","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`","`) + res.WriteString(v.Message) + res.WriteString(`"]`) + + return res.Bytes(), nil +} + +type CloseEnvelope struct { + SubscriptionID string +} + +func (_ CloseEnvelope) Label() string { return "NEG-CLOSE" } +func (v CloseEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) +} + +func (v *CloseEnvelope) UnmarshalJSON(data []byte) error { + r := gjson.ParseBytes(data) + arr := r.Array() + if len(arr) < 2 { + return fmt.Errorf("failed to decode NEG-CLOSE envelope") + } + v.SubscriptionID = arr[1].Str + return nil +} + +func (v CloseEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 14+len(v.SubscriptionID))) + res.WriteString(`["NEG-CLOSE","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`"]`) + return res.Bytes(), nil +} + +type ErrorEnvelope struct { + SubscriptionID string + Reason string +} + +func (_ ErrorEnvelope) Label() string { return "NEG-ERROR" } +func (v ErrorEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) +} + +func (v *ErrorEnvelope) UnmarshalJSON(data []byte) error { + r := gjson.ParseBytes(data) + arr := r.Array() + if len(arr) < 3 { + return fmt.Errorf("failed to decode NEG-ERROR envelope") + } + v.SubscriptionID = arr[1].Str + v.Reason = arr[2].Str + return nil +} + +func (v ErrorEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 19+len(v.SubscriptionID)+len(v.Reason))) + res.WriteString(`["NEG-ERROR","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`","`) + res.WriteString(v.Reason) + res.WriteString(`"]`) + return res.Bytes(), nil +} diff --git a/nip77/example/example.go b/nip77/example/example.go new file mode 100644 index 0000000..db25ebb --- /dev/null +++ b/nip77/example/example.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/fiatjaf/eventstore" + "github.com/fiatjaf/eventstore/slicestore" + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77" +) + +func main() { + ctx := context.Background() + db := &slicestore.SliceStore{} + db.Init() + + sk := nostr.GeneratePrivateKey() + local := eventstore.RelayWrapper{Store: db} + + for { + for i := 0; i < 20; i++ { + { + evt := nostr.Event{ + Kind: 1, + Content: fmt.Sprintf("same old hello %d", i), + CreatedAt: nostr.Timestamp(i), + Tags: nostr.Tags{}, + } + evt.Sign(sk) + db.SaveEvent(ctx, &evt) + } + + { + evt := nostr.Event{ + Kind: 1, + Content: fmt.Sprintf("custom hello %d", i), + CreatedAt: nostr.Now(), + Tags: nostr.Tags{}, + } + evt.Sign(sk) + db.SaveEvent(ctx, &evt) + } + } + + err := nip77.NegentropySync(ctx, + local, "ws://localhost:7777", nostr.Filter{}) + if err != nil { + panic(err) + } + + data, err := local.QuerySync(ctx, nostr.Filter{}) + if err != nil { + panic(err) + } + + fmt.Println("total local events:", len(data)) + time.Sleep(time.Second * 10) + } +} diff --git a/nip77/idlistpool.go b/nip77/idlistpool.go new file mode 100644 index 0000000..6f3948c --- /dev/null +++ b/nip77/idlistpool.go @@ -0,0 +1,41 @@ +package nip77 + +import ( + "sync" +) + +type idlistpool struct { + initialsize int + pool [][]string + sync.Mutex +} + +func newidlistpool(initialsize int) *idlistpool { + ilp := idlistpool{ + initialsize: initialsize, + pool: make([][]string, 1, 2), + } + + ilp.pool[0] = make([]string, 0, initialsize) + + return &ilp +} + +func (ilp *idlistpool) grab() []string { + ilp.Lock() + defer ilp.Unlock() + + l := len(ilp.pool) + if l > 0 { + idlist := ilp.pool[l-1] + ilp.pool = ilp.pool[0 : l-1] + return idlist + } + idlist := make([]string, 0, ilp.initialsize) + return idlist +} + +func (ilp *idlistpool) giveback(idlist []string) { + idlist = idlist[:0] + ilp.pool = append(ilp.pool, idlist) +} diff --git a/nip77/negentropy/encoding.go b/nip77/negentropy/encoding.go new file mode 100644 index 0000000..08da9f9 --- /dev/null +++ b/nip77/negentropy/encoding.go @@ -0,0 +1,126 @@ +package negentropy + +import ( + "bytes" + "encoding/hex" + + "github.com/nbd-wtf/go-nostr" +) + +func (n *Negentropy) DecodeTimestampIn(reader *bytes.Reader) (nostr.Timestamp, error) { + t, err := decodeVarInt(reader) + if err != nil { + return 0, err + } + + timestamp := nostr.Timestamp(t) + if timestamp == 0 { + timestamp = maxTimestamp + } else { + timestamp-- + } + + timestamp += n.lastTimestampIn + if timestamp < n.lastTimestampIn { // Check for overflow + timestamp = maxTimestamp + } + n.lastTimestampIn = timestamp + return timestamp, nil +} + +func (n *Negentropy) DecodeBound(reader *bytes.Reader) (Bound, error) { + timestamp, err := n.DecodeTimestampIn(reader) + if err != nil { + return Bound{}, err + } + + length, err := decodeVarInt(reader) + if err != nil { + return Bound{}, err + } + + id := make([]byte, length) + if _, err = reader.Read(id); err != nil { + return Bound{}, err + } + + return Bound{Item{timestamp, hex.EncodeToString(id)}}, nil +} + +func (n *Negentropy) encodeTimestampOut(timestamp nostr.Timestamp) []byte { + if timestamp == maxTimestamp { + n.lastTimestampOut = maxTimestamp + return encodeVarInt(0) + } + temp := timestamp + timestamp -= n.lastTimestampOut + n.lastTimestampOut = temp + return encodeVarInt(int(timestamp + 1)) +} + +func (n *Negentropy) encodeBound(bound Bound) []byte { + var output []byte + + t := n.encodeTimestampOut(bound.Timestamp) + idlen := encodeVarInt(len(bound.ID) / 2) + output = append(output, t...) + output = append(output, idlen...) + id, _ := hex.DecodeString(bound.Item.ID) + + output = append(output, id...) + return output +} + +func getMinimalBound(prev, curr Item) Bound { + if curr.Timestamp != prev.Timestamp { + return Bound{Item{curr.Timestamp, ""}} + } + + sharedPrefixBytes := 0 + + for i := 0; i < 32; i++ { + if curr.ID[i:i+2] != prev.ID[i:i+2] { + break + } + sharedPrefixBytes++ + } + + // sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical. + return Bound{Item{curr.Timestamp, curr.ID[:(sharedPrefixBytes+1)*2]}} +} + +func decodeVarInt(reader *bytes.Reader) (int, error) { + var res int = 0 + + for { + b, err := reader.ReadByte() + if err != nil { + return 0, err + } + + res = (res << 7) | (int(b) & 127) + if (b & 128) == 0 { + break + } + } + + return res, nil +} + +func encodeVarInt(n int) []byte { + if n == 0 { + return []byte{0} + } + + var o []byte + for n != 0 { + o = append([]byte{byte(n & 0x7F)}, o...) + n >>= 7 + } + + for i := 0; i < len(o)-1; i++ { + o[i] |= 0x80 + } + + return o +} diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go new file mode 100644 index 0000000..e382d3f --- /dev/null +++ b/nip77/negentropy/negentropy.go @@ -0,0 +1,315 @@ +package negentropy + +import ( + "bytes" + "encoding/hex" + "fmt" + "math" + "os" + "unsafe" + + "github.com/nbd-wtf/go-nostr" +) + +const ( + protocolVersion byte = 0x61 // version 1 + maxTimestamp = nostr.Timestamp(math.MaxInt64) +) + +var infiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}} + +type Negentropy struct { + storage Storage + sealed bool + frameSizeLimit int + isInitiator bool + lastTimestampIn nostr.Timestamp + lastTimestampOut nostr.Timestamp + + Haves chan string + HaveNots chan string +} + +func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy { + return &Negentropy{ + storage: storage, + frameSizeLimit: frameSizeLimit, + } +} + +func (n *Negentropy) Insert(evt *nostr.Event) { + err := n.storage.Insert(evt.CreatedAt, evt.ID) + if err != nil { + panic(err) + } +} + +func (n *Negentropy) seal() { + if !n.sealed { + n.storage.Seal() + } + n.sealed = true +} + +func (n *Negentropy) Initiate() []byte { + n.seal() + n.isInitiator = true + + n.Haves = make(chan string, n.storage.Size()/2) + n.HaveNots = make(chan string, n.storage.Size()/2) + + output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32)) + output.WriteByte(protocolVersion) + n.SplitRange(0, n.storage.Size(), infiniteBound, output) + + return output.Bytes() +} + +func (n *Negentropy) Reconcile(msg []byte) (output []byte, err error) { + n.seal() + reader := bytes.NewReader(msg) + + output, err = n.reconcileAux(reader) + if err != nil { + return nil, err + } + + if len(output) == 1 && n.isInitiator { + close(n.Haves) + close(n.HaveNots) + return nil, nil + } + + return output, nil +} + +func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { + n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message + + fullOutput := bytes.NewBuffer(make([]byte, 0, 5000)) + fullOutput.WriteByte(protocolVersion) + + pv, err := reader.ReadByte() + if err != nil { + return nil, err + } + + if pv < 0x60 || pv > 0x6f { + return nil, fmt.Errorf("invalid protocol version byte") + } + if pv != protocolVersion { + if n.isInitiator { + return nil, fmt.Errorf("unsupported negentropy protocol version requested") + } + return fullOutput.Bytes(), nil + } + + var prevBound Bound + prevIndex := 0 + skip := false + + partialOutput := bytes.NewBuffer(make([]byte, 0, 100)) + for reader.Len() > 0 { + partialOutput.Reset() + + doSkip := func() { + if skip { + skip = false + encodedBound := n.encodeBound(prevBound) + partialOutput.Write(encodedBound) + partialOutput.WriteByte(SkipMode) + } + } + + currBound, err := n.DecodeBound(reader) + if err != nil { + return nil, err + } + modeVal, err := decodeVarInt(reader) + if err != nil { + return nil, err + } + mode := Mode(modeVal) + + lower := prevIndex + upper := n.storage.FindLowerBound(prevIndex, n.storage.Size(), currBound) + + switch mode { + case SkipMode: + skip = true + + case FingerprintMode: + var theirFingerprint [FingerprintSize]byte + _, err := reader.Read(theirFingerprint[:]) + if err != nil { + return nil, err + } + ourFingerprint, err := n.storage.Fingerprint(lower, upper) + if err != nil { + return nil, err + } + + if theirFingerprint == ourFingerprint { + skip = true + } else { + doSkip() + n.SplitRange(lower, upper, currBound, partialOutput) + } + + case IdListMode: + numIds, err := decodeVarInt(reader) + if err != nil { + return nil, err + } + + theirElems := make(map[string]struct{}) + var idb [32]byte + + for i := 0; i < numIds; i++ { + _, err := reader.Read(idb[:]) + if err != nil { + return nil, err + } + id := hex.EncodeToString(idb[:]) + theirElems[id] = struct{}{} + } + + n.storage.Iterate(lower, upper, func(item Item, _ int) bool { + id := item.ID + if _, exists := theirElems[id]; !exists { + if n.isInitiator { + n.Haves <- id + } + } else { + delete(theirElems, id) + } + return true + }) + + if n.isInitiator { + skip = true + for id := range theirElems { + n.HaveNots <- id + } + } else { + doSkip() + + responseIds := make([]byte, 0, 32*n.storage.Size()) + endBound := currBound + + n.storage.Iterate(lower, upper, func(item Item, index int) bool { + if n.frameSizeLimit-200 < fullOutput.Len()+len(responseIds) { + endBound = Bound{item} + upper = index + return false + } + + id, _ := hex.DecodeString(item.ID) + responseIds = append(responseIds, id...) + return true + }) + + encodedBound := n.encodeBound(endBound) + + partialOutput.Write(encodedBound) + partialOutput.WriteByte(IdListMode) + partialOutput.Write(encodeVarInt(len(responseIds) / 32)) + partialOutput.Write(responseIds) + + partialOutput.WriteTo(fullOutput) + partialOutput.Reset() + } + + default: + return nil, fmt.Errorf("unexpected mode %d", mode) + } + + if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() { + // frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range + remainingFingerprint, err := n.storage.Fingerprint(upper, n.storage.Size()) + if err != nil { + panic(err) + } + + fullOutput.Write(n.encodeBound(infiniteBound)) + fullOutput.WriteByte(FingerprintMode) + fullOutput.Write(remainingFingerprint[:]) + + break // stop processing further + } else { + // append the constructed output for this iteration + partialOutput.WriteTo(fullOutput) + } + + prevIndex = upper + prevBound = currBound + } + + return fullOutput.Bytes(), nil +} + +func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *bytes.Buffer) { + numElems := upper - lower + const buckets = 16 + + if numElems < buckets*2 { + // we just send the full ids here + boundEncoded := n.encodeBound(upperBound) + output.Write(boundEncoded) + output.WriteByte(IdListMode) + output.Write(encodeVarInt(numElems)) + + n.storage.Iterate(lower, upper, func(item Item, _ int) bool { + id, _ := hex.DecodeString(item.ID) + output.Write(id) + return true + }) + } else { + itemsPerBucket := numElems / buckets + bucketsWithExtra := numElems % buckets + curr := lower + + for i := 0; i < buckets; i++ { + bucketSize := itemsPerBucket + if i < bucketsWithExtra { + bucketSize++ + } + ourFingerprint, err := n.storage.Fingerprint(curr, curr+bucketSize) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + + curr += bucketSize + + var nextBound Bound + if curr == upper { + nextBound = upperBound + } else { + var prevItem, currItem Item + + n.storage.Iterate(curr-1, curr+1, func(item Item, index int) bool { + if index == curr-1 { + prevItem = item + } else { + currItem = item + } + return true + }) + + minBound := getMinimalBound(prevItem, currItem) + nextBound = minBound + } + + boundEncoded := n.encodeBound(nextBound) + output.Write(boundEncoded) + output.WriteByte(FingerprintMode) + output.Write(ourFingerprint[:]) + } + } +} + +func (n *Negentropy) Name() string { + p := unsafe.Pointer(n) + return fmt.Sprintf("%d", uintptr(p)&127) +} diff --git a/nip77/negentropy/types.go b/nip77/negentropy/types.go new file mode 100644 index 0000000..a552677 --- /dev/null +++ b/nip77/negentropy/types.go @@ -0,0 +1,111 @@ +package negentropy + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "fmt" + "strings" + + "github.com/nbd-wtf/go-nostr" +) + +const FingerprintSize = 16 + +type Mode int + +const ( + SkipMode = 0 + FingerprintMode = 1 + IdListMode = 2 +) + +type Storage interface { + Insert(nostr.Timestamp, string) error + Seal() + Size() int + Iterate(begin, end int, cb func(item Item, i int) bool) error + FindLowerBound(begin, end int, value Bound) int + GetBound(idx int) Bound + Fingerprint(begin, end int) ([FingerprintSize]byte, error) +} + +type Item struct { + Timestamp nostr.Timestamp + ID string +} + +func itemCompare(a, b Item) int { + if a.Timestamp != b.Timestamp { + return int(a.Timestamp - b.Timestamp) + } + return strings.Compare(a.ID, b.ID) +} + +func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i.ID) } + +type Bound struct{ Item } + +func (b Bound) String() string { + if b.Timestamp == infiniteBound.Timestamp { + return "Bound" + } + return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID) +} + +type Accumulator struct { + Buf []byte +} + +func (acc *Accumulator) SetToZero() { + acc.Buf = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} +} + +func (acc *Accumulator) Add(id string) { + b, _ := hex.DecodeString(id) + acc.AddBytes(b) +} + +func (acc *Accumulator) AddAccumulator(other Accumulator) { + acc.AddBytes(other.Buf) +} + +func (acc *Accumulator) AddBytes(other []byte) { + var currCarry, nextCarry uint32 + + if len(acc.Buf) < 32 { + newBuf := make([]byte, 32) + copy(newBuf, acc.Buf) + acc.Buf = newBuf + } + + for i := 0; i < 8; i++ { + offset := i * 4 + orig := binary.LittleEndian.Uint32(acc.Buf[offset:]) + otherV := binary.LittleEndian.Uint32(other[offset:]) + + next := orig + currCarry + otherV + if next < orig || next < otherV { + nextCarry = 1 + } + + binary.LittleEndian.PutUint32(acc.Buf[offset:], next&0xFFFFFFFF) + currCarry = nextCarry + nextCarry = 0 + } +} + +func (acc *Accumulator) SV() []byte { + return acc.Buf[:] +} + +func (acc *Accumulator) GetFingerprint(n int) [FingerprintSize]byte { + input := acc.SV() + input = append(input, encodeVarInt(n)...) + + hash := sha256.Sum256(input) + + var fingerprint [FingerprintSize]byte + copy(fingerprint[:], hash[:FingerprintSize]) + return fingerprint +} diff --git a/nip77/negentropy/vector.go b/nip77/negentropy/vector.go new file mode 100644 index 0000000..46aaff5 --- /dev/null +++ b/nip77/negentropy/vector.go @@ -0,0 +1,74 @@ +package negentropy + +import ( + "fmt" + "slices" + + "github.com/nbd-wtf/go-nostr" +) + +type Vector struct { + items []Item + sealed bool +} + +func NewVector() *Vector { + return &Vector{ + items: make([]Item, 0, 30), + } +} + +func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error { + if len(id)/2 != 32 { + return fmt.Errorf("bad id size for added item: expected %d, got %d", 32, len(id)/2) + } + + item := Item{createdAt, id} + v.items = append(v.items, item) + return nil +} + +func (v *Vector) Size() int { return len(v.items) } + +func (v *Vector) Seal() { + if v.sealed { + panic("trying to seal an already sealed vector") + } + v.sealed = true + slices.SortFunc(v.items, itemCompare) +} + +func (v *Vector) GetBound(idx int) Bound { + if idx < len(v.items) { + return Bound{v.items[idx]} + } + return infiniteBound +} + +func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error { + for i := begin; i < end; i++ { + if !cb(v.items[i], i) { + break + } + } + return nil +} + +func (v *Vector) FindLowerBound(begin, end int, bound Bound) int { + idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, itemCompare) + return begin + idx +} + +func (v *Vector) Fingerprint(begin, end int) ([FingerprintSize]byte, error) { + var out Accumulator + out.SetToZero() + + if err := v.Iterate(begin, end, func(item Item, _ int) bool { + out.Add(item.ID) + return true + }); err != nil { + return [FingerprintSize]byte{}, err + } + + return out.GetFingerprint(end - begin), nil +} diff --git a/nip77/negentropy/whatever_test.go b/nip77/negentropy/whatever_test.go new file mode 100644 index 0000000..0a6efb7 --- /dev/null +++ b/nip77/negentropy/whatever_test.go @@ -0,0 +1,180 @@ +package negentropy + +import ( + "encoding/hex" + "fmt" + "slices" + "strings" + "sync" + "testing" + + "github.com/nbd-wtf/go-nostr" + "github.com/stretchr/testify/require" +) + +func TestSuperSmall(t *testing.T) { + runTestWith(t, + 4, + [][]int{{0, 3}}, [][]int{{2, 4}}, + [][]int{{3, 4}}, [][]int{{0, 2}}, + ) +} + +func TestNoNeedToSync(t *testing.T) { + runTestWith(t, + 50, + [][]int{{0, 50}}, [][]int{{0, 50}}, + [][]int{}, [][]int{}, + ) +} + +func TestSmallNumbers(t *testing.T) { + runTestWith(t, + 20, + [][]int{{2, 15}}, [][]int{{0, 7}, {10, 20}}, + [][]int{{0, 2}, {15, 20}}, [][]int{{7, 10}}, + ) +} + +func TestBigNumbers(t *testing.T) { + runTestWith(t, + 200, + [][]int{{20, 150}}, [][]int{{0, 70}, {100, 200}}, + [][]int{{0, 20}, {150, 200}}, [][]int{{70, 100}}, + ) +} + +func TestMuchBiggerNumbersAndConfusion(t *testing.T) { + runTestWith(t, + 20000, + [][]int{{20, 150}, {1700, 3400}, {7000, 8100}, {13800, 13816}, {13817, 14950}, {19800, 20000}}, // n1 + [][]int{{0, 2000}, {3000, 3600}, {10000, 12200}, {13799, 13801}, {14800, 19900}}, // n2 + [][]int{{0, 20}, {150, 1700}, {3400, 3600}, {10000, 12200}, {13799, 13800}, {14950, 19800}}, // n1 need + [][]int{{2000, 3000}, {7000, 8100}, {13801, 13816}, {13817, 14800}, {19900, 20000}}, // n1 have + ) +} + +func runTestWith(t *testing.T, + totalEvents int, + n1Ranges [][]int, n2Ranges [][]int, + expectedN1NeedRanges [][]int, expectedN1HaveRanges [][]int, +) { + var err error + var q []byte + var n1 *Negentropy + var n2 *Negentropy + + events := make([]*nostr.Event, totalEvents) + for i := range events { + evt := nostr.Event{} + evt.Content = fmt.Sprintf("event %d", i) + evt.Kind = 1 + evt.CreatedAt = nostr.Timestamp(i) + evt.ID = fmt.Sprintf("%064d", i) + events[i] = &evt + } + + { + n1 = NewNegentropy(NewVector(), 1<<16) + for _, r := range n1Ranges { + for i := r[0]; i < r[1]; i++ { + n1.Insert(events[i]) + } + } + + q = n1.Initiate() + } + + { + n2 = NewNegentropy(NewVector(), 1<<16) + for _, r := range n2Ranges { + for i := r[0]; i < r[1]; i++ { + n2.Insert(events[i]) + } + } + + q, err = n2.Reconcile(q) + if err != nil { + t.Fatal(err) + return + } + } + + invert := map[*Negentropy]*Negentropy{ + n1: n2, + n2: n1, + } + i := 1 + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + wg.Done() + for n := n1; q != nil; n = invert[n] { + i++ + + q, err = n.Reconcile(q) + if err != nil { + t.Fatal(err) + return + } + + if q == nil { + return + } + } + }() + + go func() { + defer wg.Done() + expectedHave := make([]string, 0, 100) + for _, r := range expectedN1HaveRanges { + for i := r[0]; i < r[1]; i++ { + expectedHave = append(expectedHave, events[i].ID) + } + } + haves := make([]string, 0, 100) + for item := range n1.Haves { + if slices.Contains(haves, item) { + continue + } + haves = append(haves, item) + } + require.ElementsMatch(t, expectedHave, haves, "wrong have") + }() + + go func() { + defer wg.Done() + expectedNeed := make([]string, 0, 100) + for _, r := range expectedN1NeedRanges { + for i := r[0]; i < r[1]; i++ { + expectedNeed = append(expectedNeed, events[i].ID) + } + } + havenots := make([]string, 0, 100) + for item := range n1.HaveNots { + if slices.Contains(havenots, item) { + continue + } + havenots = append(havenots, item) + } + require.ElementsMatch(t, expectedNeed, havenots, "wrong need") + }() + + wg.Wait() +} + +func hexedBytes(o []byte) string { + s := strings.Builder{} + s.Grow(2 + 1 + len(o)*5) + s.WriteString("[ ") + for _, b := range o { + x := hex.EncodeToString([]byte{b}) + s.WriteString("0x") + s.WriteString(x) + s.WriteString(" ") + } + s.WriteString("]") + return s.String() +} diff --git a/nip77/nip77.go b/nip77/nip77.go new file mode 100644 index 0000000..7f9dff7 --- /dev/null +++ b/nip77/nip77.go @@ -0,0 +1,148 @@ +package nip77 + +import ( + "context" + "encoding/hex" + "fmt" + "sync" + + "github.com/cespare/xxhash" + "github.com/greatroar/blobloom" + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" +) + +func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error { + id := "go-nostr-tmp" // for now we can't have more than one subscription in the same connection + + data, err := store.QuerySync(ctx, filter) + if err != nil { + return fmt.Errorf("failed to query our local store: %w", err) + } + + neg := negentropy.NewNegentropy(negentropy.NewVector(), 1024*1024) + for _, evt := range data { + neg.Insert(evt) + } + + result := make(chan error) + + var r *nostr.Relay + r, err = nostr.RelayConnect(ctx, url, nostr.WithCustomHandler(func(data []byte) { + envelope := ParseNegMessage(data) + if envelope == nil { + return + } + switch env := envelope.(type) { + case *OpenEnvelope, *CloseEnvelope: + result <- fmt.Errorf("unexpected %s received from relay", env.Label()) + return + case *ErrorEnvelope: + result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) + return + case *MessageEnvelope: + msg, err := hex.DecodeString(env.Message) + if err != nil { + result <- fmt.Errorf("relay sent invalid message: %w", err) + return + } + + nextmsg, err := neg.Reconcile(msg) + if err != nil { + result <- fmt.Errorf("failed to reconcile: %w", err) + return + } + + if len(nextmsg) != 0 { + msgb, _ := MessageEnvelope{id, hex.EncodeToString(nextmsg)}.MarshalJSON() + r.Write(msgb) + } + } + })) + if err != nil { + return err + } + + msg := neg.Initiate() + open, _ := OpenEnvelope{id, filter, hex.EncodeToString(msg)}.MarshalJSON() + err = <-r.Write(open) + if err != nil { + return fmt.Errorf("failed to write to relay: %w", err) + } + + defer func() { + clse, _ := CloseEnvelope{id}.MarshalJSON() + r.Write(clse) + }() + + type direction struct { + label string + items chan string + source nostr.RelayStore + target nostr.RelayStore + } + + wg := sync.WaitGroup{} + pool := newidlistpool(50) + for _, dir := range []direction{ + {"up", neg.Haves, store, r}, + {"down", neg.HaveNots, r, store}, + } { + wg.Add(1) + go func(dir direction) { + defer wg.Done() + + seen := blobloom.NewOptimized(blobloom.Config{ + Capacity: 10000, + FPRate: 0.01, + }) + + doSync := func(ids []string) { + defer wg.Done() + defer pool.giveback(ids) + + if len(ids) == 0 { + return + } + evtch, err := dir.source.QueryEvents(ctx, nostr.Filter{IDs: ids}) + if err != nil { + result <- fmt.Errorf("error querying source on %s: %w", dir.label, err) + return + } + for evt := range evtch { + dir.target.Publish(ctx, *evt) + } + } + + ids := pool.grab() + for item := range dir.items { + h := xxhash.Sum64([]byte(item)) + if seen.Has(h) { + continue + } + + seen.Add(h) + ids = append(ids, item) + if len(ids) == 50 { + wg.Add(1) + go doSync(ids) + ids = pool.grab() + } + } + wg.Add(1) + doSync(ids) + }(dir) + } + + go func() { + wg.Wait() + result <- nil + }() + + err = <-result + if err != nil { + return err + } + + return nil +} diff --git a/relay.go b/relay.go index 7d37c2b..ce44268 100644 --- a/relay.go +++ b/relay.go @@ -35,6 +35,7 @@ type Relay struct { challenge string // NIP-42 challenge, we only keep the last noticeHandler func(string) // NIP-01 NOTICEs + customHandler func([]byte) // nonstandard unparseable messages okCallbacks *xsync.MapOf[string, func(bool, string)] writeQueue chan writeRequest subscriptionChannelCloseQueue chan *Subscription @@ -92,6 +93,7 @@ type RelayOption interface { var ( _ RelayOption = (WithNoticeHandler)(nil) _ RelayOption = (WithSignatureChecker)(nil) + _ RelayOption = (WithCustomHandler)(nil) ) // WithNoticeHandler just takes notices and is expected to do something with them. @@ -110,6 +112,14 @@ func (sc WithSignatureChecker) ApplyRelayOption(r *Relay) { r.signatureChecker = sc } +// WithCustomHandler must be a function that handles any relay message that couldn't be +// parsed as a standard envelope. +type WithCustomHandler func(data []byte) + +func (ch WithCustomHandler) ApplyRelayOption(r *Relay) { + r.customHandler = ch +} + // String just returns the relay URL. func (r *Relay) String() string { return r.URL @@ -185,6 +195,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error } case writeRequest := <-r.writeQueue: // all write requests will go through this to prevent races + debugLogf("{%s} sending %v\n", r.URL, string(writeRequest.msg)) if err := r.Connection.WriteMessage(r.connectionContext, writeRequest.msg); err != nil { writeRequest.answer <- err } @@ -212,6 +223,9 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error debugLogf("{%s} %v\n", r.URL, message) envelope := ParseMessage(message) if envelope == nil { + if r.customHandler != nil { + r.customHandler(message) + } continue } @@ -340,7 +354,6 @@ func (r *Relay) publish(ctx context.Context, id string, env Envelope) error { // publish event envb, _ := env.MarshalJSON() - debugLogf("{%s} sending %v\n", r.URL, envb) if err := <-r.Write(envb); err != nil { return err } @@ -416,14 +429,28 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . return sub } -func (r *Relay) QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) { - sub, err := r.Subscribe(ctx, Filters{filter}, opts...) +func (r *Relay) QueryEvents(ctx context.Context, filter Filter) (chan *Event, error) { + sub, err := r.Subscribe(ctx, Filters{filter}) if err != nil { return nil, err } - defer sub.Unsub() + go func() { + for { + select { + case <-sub.ClosedReason: + case <-sub.EndOfStoredEvents: + case <-ctx.Done(): + case <-r.Context().Done(): + } + sub.Unsub() + } + }() + return sub.Events, nil +} + +func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) { if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds var cancel context.CancelFunc @@ -431,21 +458,17 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter, opts ...Subscripti defer cancel() } - var events []*Event - for { - select { - case evt := <-sub.Events: - if evt == nil { - // channel is closed - return events, nil - } - events = append(events, evt) - case <-sub.EndOfStoredEvents: - return events, nil - case <-ctx.Done(): - return events, nil - } + events := make([]*Event, 0, max(filter.Limit, 250)) + ch, err := r.QueryEvents(ctx, filter) + if err != nil { + return nil, err } + + for evt := range ch { + events = append(events, evt) + } + + return events, nil } func (r *Relay) Count(ctx context.Context, filters Filters, opts ...SubscriptionOption) (int64, error) {