Skip to content

Commit

Permalink
negentropy.
Browse files Browse the repository at this point in the history
- a way to handle custom messages from Relay (NEG-whatever etc)
- negentropy implementation (adapted from that other one)
- nip77 nostr negentropy extension
- QueryEvents method for RelayStore that returns a channel (makes negentropy syncing work more seamlessly)
  • Loading branch information
fiatjaf committed Sep 14, 2024
1 parent b5633b9 commit 9e53225
Show file tree
Hide file tree
Showing 14 changed files with 1,329 additions and 30 deletions.
12 changes: 6 additions & 6 deletions envelopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ type Envelope interface {
String() string
}

type EventEnvelope struct {
SubscriptionID *string
Event
}

var (
_ Envelope = (*EventEnvelope)(nil)
_ Envelope = (*ReqEnvelope)(nil)
Expand All @@ -74,6 +69,11 @@ var (
_ Envelope = (*AuthEnvelope)(nil)
)

type EventEnvelope struct {
SubscriptionID *string
Event
}

func (_ EventEnvelope) Label() string { return "EVENT" }

func (v *EventEnvelope) UnmarshalJSON(data []byte) error {
Expand All @@ -96,7 +96,7 @@ func (v EventEnvelope) MarshalJSON() ([]byte, error) {
if v.SubscriptionID != nil {
w.RawString(`"` + *v.SubscriptionID + `",`)
}
v.MarshalEasyJSON(&w)
v.Event.MarshalEasyJSON(&w)
w.RawString(`]`)
return w.BuildBytes()
}
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/fiatjaf/eventstore v0.8.1
github.com/fiatjaf/eventstore v0.9.0
github.com/fiatjaf/generic-ristretto v0.0.1
github.com/gobwas/httphead v0.1.0
github.com/gobwas/ws v1.3.1
Expand All @@ -28,12 +28,14 @@ require (
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e // indirect
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/golang/glog v1.1.2 // indirect
github.com/greatroar/blobloom v0.8.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -44,3 +46,5 @@ require (
golang.org/x/sys v0.20.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/fiatjaf/eventstore => ../eventstore
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e h1:ahyvB3q25Yn
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:kGUqhHd//musdITWjFvNTHn90WG9bMLBEPQZ17Cmlpw=
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec h1:1Qb69mGp/UtRPn422BH4/Y4Q3SLUrD9KHuDkm8iodFc=
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec/go.mod h1:CD8UlnlLDiqb36L110uqiP2iSflVjx9g/3U9hCI4q2U=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/bluekeyes/go-gitdiff v0.7.1 h1:graP4ElLRshr8ecu0UtqfNTCHrtSyZd3DABQm/DWesQ=
github.com/bluekeyes/go-gitdiff v0.7.1/go.mod h1:QpfYYO1E0fTVHVZAZKiRjtSGY9823iCdvGXBcEzHGbM=
Expand Down Expand Up @@ -29,6 +30,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e h1:0XBUw73chJ1VYSsfvcPvVT7auykAJce9FpRr10L6Qhw=
Expand Down Expand Up @@ -76,6 +79,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/graph-gophers/dataloader/v7 v7.1.0 h1:Wn8HGF/q7MNXcvfaBnLEPEFJttVHR8zuEqP1obys/oc=
github.com/graph-gophers/dataloader/v7 v7.1.0/go.mod h1:1bKE0Dm6OUcTB/OAuYVOZctgIz7Q3d0XrYtlIzTgg6Q=
github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4=
github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down Expand Up @@ -108,9 +113,13 @@ github.com/puzpuzpuz/xsync/v3 v3.0.2/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPK
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.1.5-0.20170601210322-f6abca593680/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
Expand Down
36 changes: 31 additions & 5 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
)

type RelayStore interface {
Publish(ctx context.Context, event Event) error
QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error)
Publish(context.Context, Event) error
QueryEvents(context.Context, Filter) (chan *Event, error)
QuerySync(context.Context, Filter) ([]*Event, error)
}

var (
Expand All @@ -26,11 +27,36 @@ func (multi MultiStore) Publish(ctx context.Context, event Event) error {
return errors.Join(errs...)
}

func (multi MultiStore) QuerySync(ctx context.Context, filter Filter, opts ...SubscriptionOption) ([]*Event, error) {
func (multi MultiStore) QueryEvents(ctx context.Context, filter Filter) (chan *Event, error) {
multich := make(chan *Event)

errs := make([]error, len(multi))
var good bool
for i, s := range multi {
ch, err := s.QueryEvents(ctx, filter)
errs[i] = err
if err == nil {
good = true
go func(ch chan *Event) {
for evt := range ch {
multich <- evt
}
}(ch)
}
}

if good {
return multich, nil
} else {
return nil, errors.Join(errs...)
}
}

func (multi MultiStore) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) {
errs := make([]error, len(multi))
events := make([]*Event, 0, max(filter.Limit, 10))
events := make([]*Event, 0, max(filter.Limit, 250))
for i, s := range multi {
res, err := s.QuerySync(ctx, filter, opts...)
res, err := s.QuerySync(ctx, filter)
errs[i] = err
events = append(events, res...)
}
Expand Down
181 changes: 181 additions & 0 deletions nip77/envelopes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package nip77

import (
"bytes"
"fmt"

"github.com/mailru/easyjson"
jwriter "github.com/mailru/easyjson/jwriter"
"github.com/nbd-wtf/go-nostr"
"github.com/tidwall/gjson"
)

func ParseNegMessage(message []byte) nostr.Envelope {
firstComma := bytes.Index(message, []byte{','})
if firstComma == -1 {
return nil
}
label := message[0:firstComma]

var v nostr.Envelope
switch {
case bytes.Contains(label, []byte("NEG-MSG")):
v = &MessageEnvelope{}
case bytes.Contains(label, []byte("NEG-OPEN")):
v = &OpenEnvelope{}
case bytes.Contains(label, []byte("NEG-ERR")):
v = &ErrorEnvelope{}
case bytes.Contains(label, []byte("NEG-CLOSE")):
v = &CloseEnvelope{}
default:
return nil
}

if err := v.UnmarshalJSON(message); err != nil {
return nil
}
return v
}

var (
_ nostr.Envelope = (*OpenEnvelope)(nil)
_ nostr.Envelope = (*MessageEnvelope)(nil)
_ nostr.Envelope = (*CloseEnvelope)(nil)
_ nostr.Envelope = (*ErrorEnvelope)(nil)
)

type OpenEnvelope struct {
SubscriptionID string
Filter nostr.Filter
Message string
}

func (_ OpenEnvelope) Label() string { return "NEG-OPEN" }
func (v OpenEnvelope) String() string {
b, _ := v.MarshalJSON()
return string(b)
}

func (v *OpenEnvelope) UnmarshalJSON(data []byte) error {
r := gjson.ParseBytes(data)
arr := r.Array()
if len(arr) != 4 {
return fmt.Errorf("failed to decode NEG-OPEN envelope")
}

v.SubscriptionID = arr[1].Str
v.Message = arr[3].Str
return easyjson.Unmarshal([]byte(arr[2].Raw), &v.Filter)
}

func (v OpenEnvelope) MarshalJSON() ([]byte, error) {
res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message)+500))

res.WriteString(`["NEG-OPEN","`)
res.WriteString(v.SubscriptionID)
res.WriteString(`",`)

w := jwriter.Writer{}
v.Filter.MarshalEasyJSON(&w)
w.Buffer.DumpTo(res)

res.WriteString(`,"`)
res.WriteString(v.Message)
res.WriteString(`"]`)

return res.Bytes(), nil
}

type MessageEnvelope struct {
SubscriptionID string
Message string
}

func (_ MessageEnvelope) Label() string { return "NEG-MSG" }
func (v MessageEnvelope) String() string {
b, _ := v.MarshalJSON()
return string(b)
}

func (v *MessageEnvelope) UnmarshalJSON(data []byte) error {
r := gjson.ParseBytes(data)
arr := r.Array()
if len(arr) < 3 {
return fmt.Errorf("failed to decode NEG-MSG envelope")
}
v.SubscriptionID = arr[1].Str
v.Message = arr[2].Str
return nil
}

func (v MessageEnvelope) MarshalJSON() ([]byte, error) {
res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message)))

res.WriteString(`["NEG-MSG","`)
res.WriteString(v.SubscriptionID)
res.WriteString(`","`)
res.WriteString(v.Message)
res.WriteString(`"]`)

return res.Bytes(), nil
}

type CloseEnvelope struct {
SubscriptionID string
}

func (_ CloseEnvelope) Label() string { return "NEG-CLOSE" }
func (v CloseEnvelope) String() string {
b, _ := v.MarshalJSON()
return string(b)
}

func (v *CloseEnvelope) UnmarshalJSON(data []byte) error {
r := gjson.ParseBytes(data)
arr := r.Array()
if len(arr) < 2 {
return fmt.Errorf("failed to decode NEG-CLOSE envelope")
}
v.SubscriptionID = arr[1].Str
return nil
}

func (v CloseEnvelope) MarshalJSON() ([]byte, error) {
res := bytes.NewBuffer(make([]byte, 0, 14+len(v.SubscriptionID)))
res.WriteString(`["NEG-CLOSE","`)
res.WriteString(v.SubscriptionID)
res.WriteString(`"]`)
return res.Bytes(), nil
}

type ErrorEnvelope struct {
SubscriptionID string
Reason string
}

func (_ ErrorEnvelope) Label() string { return "NEG-ERROR" }
func (v ErrorEnvelope) String() string {
b, _ := v.MarshalJSON()
return string(b)
}

func (v *ErrorEnvelope) UnmarshalJSON(data []byte) error {
r := gjson.ParseBytes(data)
arr := r.Array()
if len(arr) < 3 {
return fmt.Errorf("failed to decode NEG-ERROR envelope")
}
v.SubscriptionID = arr[1].Str
v.Reason = arr[2].Str
return nil
}

func (v ErrorEnvelope) MarshalJSON() ([]byte, error) {
res := bytes.NewBuffer(make([]byte, 0, 19+len(v.SubscriptionID)+len(v.Reason)))
res.WriteString(`["NEG-ERROR","`)
res.WriteString(v.SubscriptionID)
res.WriteString(`","`)
res.WriteString(v.Reason)
res.WriteString(`"]`)
return res.Bytes(), nil
}
61 changes: 61 additions & 0 deletions nip77/example/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"context"
"fmt"
"time"

"github.com/fiatjaf/eventstore"

Check failure on line 8 in nip77/example/example.go

View workflow job for this annotation

GitHub Actions / test

github.com/fiatjaf/[email protected]: replacement directory ../eventstore does not exist
"github.com/fiatjaf/eventstore/slicestore"

Check failure on line 9 in nip77/example/example.go

View workflow job for this annotation

GitHub Actions / test

github.com/fiatjaf/[email protected]: replacement directory ../eventstore does not exist
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip77"
)

func main() {
ctx := context.Background()
db := &slicestore.SliceStore{}
db.Init()

sk := nostr.GeneratePrivateKey()
local := eventstore.RelayWrapper{Store: db}

for {
for i := 0; i < 20; i++ {
{
evt := nostr.Event{
Kind: 1,
Content: fmt.Sprintf("same old hello %d", i),
CreatedAt: nostr.Timestamp(i),
Tags: nostr.Tags{},
}
evt.Sign(sk)
db.SaveEvent(ctx, &evt)
}

{
evt := nostr.Event{
Kind: 1,
Content: fmt.Sprintf("custom hello %d", i),
CreatedAt: nostr.Now(),
Tags: nostr.Tags{},
}
evt.Sign(sk)
db.SaveEvent(ctx, &evt)
}
}

err := nip77.NegentropySync(ctx,
local, "ws://localhost:7777", nostr.Filter{})
if err != nil {
panic(err)
}

data, err := local.QuerySync(ctx, nostr.Filter{})
if err != nil {
panic(err)
}

fmt.Println("total local events:", len(data))
time.Sleep(time.Second * 10)
}
}
Loading

0 comments on commit 9e53225

Please sign in to comment.