Skip to content

Commit

Permalink
Fix for deadlock caused when reading history logs
Browse files Browse the repository at this point in the history
  • Loading branch information
rkbalgi committed Apr 30, 2020
1 parent d01aad2 commit 741453a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/isosim/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main

var version = "0.8.0"
var build = "2287e50f"
var build = "d01aad2c"
101 changes: 53 additions & 48 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"isosim/internal/services/v0/data"
"time"
)
Expand Down Expand Up @@ -58,6 +59,7 @@ func Write(dbMsg DbMessage) error {
if err != nil {
return err
}
defer tx.Rollback()

bkt, err := tx.CreateBucketIfNotExists([]byte(fmt.Sprintf("%d_%d", dbMsg.SpecID, dbMsg.MsgID)))
if err != nil {
Expand All @@ -81,65 +83,68 @@ func Write(dbMsg DbMessage) error {
if err := tx.Commit(); err != nil {
return err
}

return nil

}

// ReadLast reads last 'n' messages for spec and msg
func ReadLast(specID int, msgID int, n int) ([]string, error) {

tx, err := bdb.Begin(false)
if err != nil {
return nil, err
}
bktName := fmt.Sprintf("%d_%d", specID, msgID)
bkt := tx.Bucket([]byte(bktName))
if bkt == nil {
log.Debugf("No bucket for spec/msg - %d:%d", specID, msgID)
return nil, nil
}

res := make([]string, 0)
retrieved := 0
now := time.Now()
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()
for {
//hourly buckets
bktName := now.Format(timeFormat)
tBkt := bkt.Bucket([]byte(bktName))
if tBkt != nil {
//start from the last on the latest bucket
c := tBkt.Cursor()
k, v := c.Last()

if k == nil || v == nil {
continue
}
for len(res) < n {
res = append(res, string(v))
retrieved++
if len(res) == n {
return res, nil
}
k, v = c.Prev()

err := bdb.View(func(tx *bolt.Tx) error {

bktName := fmt.Sprintf("%d_%d", specID, msgID)
bkt := tx.Bucket([]byte(bktName))
if bkt == nil {
log.Debugf("No bucket for spec/msg - %d:%d", specID, msgID)
return nil
}

retrieved := 0
now := time.Now()
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()
for {
//hourly buckets
bktName := now.Format(timeFormat)
tBkt := bkt.Bucket([]byte(bktName))
if tBkt != nil {
//start from the last on the latest bucket
c := tBkt.Cursor()
k, v := c.Last()

if k == nil || v == nil {
// nothing more in this hour,
// break out of this loop
goto PREV_HOUR
continue
}
for len(res) < n {
res = append(res, string(v))
retrieved++
if len(res) == n {
return nil
}
k, v = c.Prev()
if k == nil || v == nil {
// nothing more in this hour,
// break out of this loop
goto PREV_HOUR
}
}
}

}
PREV_HOUR:
// we cannot keep looking endlessly
select {
case <-ctx.Done():
return nil
default:
break
}
now = now.Add(-1 * time.Hour)
}
PREV_HOUR:
// we cannot keep looking endlessly
select {
case <-ctx.Done():
return res, nil
default:
break
}
now = now.Add(-1 * time.Hour)
}
})

return res, err

}

0 comments on commit 741453a

Please sign in to comment.