Skip to content

Commit

Permalink
threadsafe sync map used to avoid concurrent access
Browse files Browse the repository at this point in the history
  • Loading branch information
chandanpasunoori committed Dec 23, 2023
1 parent 734311b commit 1a2d92a
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions pkg/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,8 @@ func writeToBlob(ctx context.Context, et time.Time, timeKey string, filter Filte
metrics.SyncEvent.Add(float64(items))
metrics.SyncEventWithLabel.With(prometheus.Labels{"type": filter.Name}).Add(float64(items))
}
writerMutex.Lock()
delete(timeKeyWriters, timeKey)
writerMutex.Unlock()

timeKeyWritersMap.Delete(timeKey)
close(ch)
log.Info().Msg("writer stopped")

Expand Down Expand Up @@ -405,8 +404,7 @@ func loadToBigQueryJob(job Job, log zerolog.Logger, blobs []string, filter Filte
}

var (
timeKeyWriters = make(map[string]chan *pubsub.Message)
writerMutex sync.Mutex
timeKeyWritersMap sync.Map
)

func WaitAndGoogleStorageSync(ctx context.Context, wg *sync.WaitGroup, job Job, eventChannel chan *pubsub.Message, loadToBigQuery bool) {
Expand Down Expand Up @@ -496,14 +494,12 @@ func WaitAndGoogleStorageSync(ctx context.Context, wg *sync.WaitGroup, job Job,
timeKey := fmt.Sprintf("%s-%s", filter.Name, et.Format("2006-01-02-15-04"))
timeKey = timeKey[:len(timeKey)-1] //10 minute buckets

wr, ok := timeKeyWriters[timeKey]
wr, ok := timeKeyWritersMap.Load(timeKey)
if !ok {
writerMutex.Lock()
wr = writeToBlob(ctx, et, timeKey, filter, job, storageClient, bulkSize, loadToBigQuery)
timeKeyWriters[timeKey] = wr
writerMutex.Unlock()
timeKeyWritersMap.Store(timeKey, wr)
}
wr <- mx
wr.(chan *pubsub.Message) <- mx
it++
}
}(filter)
Expand Down

0 comments on commit 1a2d92a

Please sign in to comment.