Skip to content

Commit

Permalink
Rework concurrency for pipeline stages (#209)
Browse files Browse the repository at this point in the history
* controler: make makeStageChannel() capable of creating buffered and unbuffered channels

* Rework preprocessor concurrency (#211)

* preprocessor: using fan-in-fan-out pattern instead of dynamic workers pattern ; controler: make the reactor output channel buffered of size WorkersCount

* preprocessor: log wording consistency

* Rework archiver concurrency (#212)

* archiver: using fan-in-fan-out pattern instead of dynamic workers pattern

* cmd,config,archiver: rename MaxConcurrentAssets to MaxConcurrentAssetsPerWorker to make it more explicit that this limit is (to be) enforced PER worker

* Revert "cmd,config,archiver: rename MaxConcurrentAssets to MaxConcurrentAssetsPerWorker to make it more explicit that this limit is (to be) enforced PER worker"

This reverts commit 175af1e.

* preprocessor: use struct pointer for worker() method instead of global variable

* preprocessor: replace preprocessor.run by preprocessor.worker in the fieldedLogger

* Rework postprocessor concurrency (#214)

* postprocessor: using fan-in-fan-out pattern instead of dynamic workers pattern

* controler: make archiver and preprocessor channel buffered by size of WorkersCount

* archiver: check if context is done before passing seeds to the next stage

* Rework finisher concurrency (#219)

* stats: add counters for Finisher routines

* controler: make postprocessor, finisherFinish and finisherProduce chans buffered by size WorkersCount ; consume and discard finisherFinish and finisherProduce when HQ is not used

* finisher: make the finisher concurrent using fan-in-fan-out pattern
  • Loading branch information
equals215 authored Feb 12, 2025
1 parent 6c4db80 commit 37df6c0
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 271 deletions.
93 changes: 44 additions & 49 deletions internal/pkg/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
logger *log.FieldedLogger
)

// This functions starts the archiver responsible for capturing the URLs
// Start initializes the internal archiver structure, start the WARC writer and start routines, should only be called once and returns an error if called more than once
func Start(inputChan, outputChan chan *models.Item) error {
var done bool

Expand All @@ -68,8 +68,13 @@ func Start(inputChan, outputChan chan *models.Item) error {
startWARCWriter()
go watchWARCWritingQueue(250 * time.Millisecond)

globalArchiver.wg.Add(1)
go run()
logger.Debug("WARC writer started")

for i := 0; i < config.Get().WorkersCount; i++ {
globalArchiver.wg.Add(1)
go globalArchiver.worker(strconv.Itoa(i))
}

logger.Info("started")
done = true
})
Expand All @@ -81,6 +86,7 @@ func Start(inputChan, outputChan chan *models.Item) error {
return nil
}

// Stop stops the archiver routines and the WARC writer
func Stop() {
if globalArchiver != nil {
globalArchiver.cancel()
Expand Down Expand Up @@ -113,75 +119,62 @@ func Stop() {
}
}

func run() {
func (a *archiver) worker(workerID string) {
defer a.wg.Done()

logger := log.NewFieldedLogger(&log.Fields{
"component": "archiver.run",
"component": "archiver.worker",
"worker_id": workerID,
})

defer globalArchiver.wg.Done()

// Create a context to manage goroutines
ctx, cancel := context.WithCancel(globalArchiver.ctx)
defer cancel()

// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup

// Guard to limit the number of concurrent archiver routines
guard := make(chan struct{}, config.Get().WorkersCount)
defer logger.Debug("worker stopped")

// Subscribe to the pause controler
controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)

stats.ArchiverRoutinesIncr()
defer stats.ArchiverRoutinesDecr()

for {
select {
case <-a.ctx.Done():
logger.Debug("shutting down")
return
case <-controlChans.PauseCh:
logger.Debug("received pause event")
controlChans.ResumeCh <- struct{}{}
logger.Debug("received resume event")
case item, ok := <-globalArchiver.inputCh:
case seed, ok := <-a.inputCh:
if ok {
logger.Debug("received item", "item", item.GetShortID(), "depth", item.GetDepth(), "hops", item.GetURL().GetHops())
guard <- struct{}{}
wg.Add(1)
stats.ArchiverRoutinesIncr()
go func(ctx context.Context) {
defer wg.Done()
defer func() { <-guard }()
defer stats.ArchiverRoutinesDecr()

if err := item.CheckConsistency(); err != nil {
panic(fmt.Sprintf("item consistency check failed with err: %s, item id %s", err.Error(), item.GetShortID()))
}

if item.GetStatus() != models.ItemPreProcessed && item.GetStatus() != models.ItemGotRedirected && item.GetStatus() != models.ItemGotChildren {
logger.Debug("skipping item", "item", item.GetShortID(), "depth", item.GetDepth(), "hops", item.GetURL().GetHops(), "status", item.GetStatus().String())
} else {
archive(item)
}

select {
case globalArchiver.outputCh <- item:
case <-ctx.Done():
logger.Debug("aborting item due to stop", "item", item.GetShortID(), "depth", item.GetDepth(), "hops", item.GetURL().GetHops())
return
}
}(ctx)
logger.Debug("received seed", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops())

if err := seed.CheckConsistency(); err != nil {
panic(fmt.Sprintf("seed consistency check failed with err: %s, seed id %s", err.Error(), seed.GetShortID()))
}

if seed.GetStatus() != models.ItemPreProcessed && seed.GetStatus() != models.ItemGotRedirected && seed.GetStatus() != models.ItemGotChildren {
logger.Debug("skipping seed", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops(), "status", seed.GetStatus().String())
} else {
archive(workerID, seed)
}

select {
case <-a.ctx.Done():
logger.Debug("aborting seed due to stop", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops())
return
case a.outputCh <- seed:
}
}
case <-globalArchiver.ctx.Done():
logger.Debug("shutting down")
wg.Wait()
return
}
stats.WarcWritingQueueSizeSet(int64(GetWARCWritingQueueSize()))
}
}

func archive(seed *models.Item) {
func archive(workerID string, seed *models.Item) {
// TODO: rate limiting handling
logger := log.NewFieldedLogger(&log.Fields{
"component": "archiver.archive",
"worker_id": workerID,
})

var (
Expand Down Expand Up @@ -251,4 +244,6 @@ func archive(seed *models.Item) {

// Wait for all goroutines to finish
wg.Wait()

return
}
14 changes: 12 additions & 2 deletions internal/pkg/controler/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ var (
stageChannels []chan *models.Item
)

func makeStageChannel() chan *models.Item {
ch := make(chan *models.Item)
func makeStageChannel(bufferSize ...int) chan *models.Item {
var parsedSize int

if len(bufferSize) == 0 {
parsedSize = 0
} else if len(bufferSize) == 1 {
parsedSize = bufferSize[0]
} else {
panic("makeStageChannel: too many arguments, variadic argument should be omitted or a single integer")
}

ch := make(chan *models.Item, parsedSize)
stageChannels = append(stageChannels, ch)
return ch
}
Expand Down
32 changes: 23 additions & 9 deletions internal/pkg/controler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func startPipeline() {
go watchers.WatchDiskSpace(config.Get().JobPath, 5*time.Second)

// Start the reactor that will receive
reactorOutputChan := makeStageChannel()
reactorOutputChan := makeStageChannel(config.Get().WorkersCount)
err = reactor.Start(config.Get().WorkersCount, reactorOutputChan)
if err != nil {
logger.Error("error starting reactor", "err", err.Error())
Expand All @@ -62,14 +62,14 @@ func startPipeline() {
}
}

preprocessorOutputChan := makeStageChannel()
preprocessorOutputChan := makeStageChannel(config.Get().WorkersCount)
err = preprocessor.Start(reactorOutputChan, preprocessorOutputChan)
if err != nil {
logger.Error("error starting preprocessor", "err", err.Error())
panic(err)
}

archiverOutputChan := makeStageChannel()
archiverOutputChan := makeStageChannel(config.Get().WorkersCount)
err = archiver.Start(preprocessorOutputChan, archiverOutputChan)
if err != nil {
logger.Error("error starting archiver", "err", err.Error())
Expand All @@ -79,25 +79,39 @@ func startPipeline() {
// Start the WARC writing queue watcher
go watchers.WatchWARCWritingQueue(5 * time.Second)

postprocessorOutputChan := makeStageChannel()
postprocessorOutputChan := makeStageChannel(config.Get().WorkersCount)
err = postprocessor.Start(archiverOutputChan, postprocessorOutputChan)
if err != nil {
logger.Error("error starting postprocessor", "err", err.Error())
panic(err)
}

var finisherFinishChan, finisherProduceChan chan *models.Item
finisherFinishChan := makeStageChannel(config.Get().WorkersCount)
finisherProduceChan := makeStageChannel(config.Get().WorkersCount)

if config.Get().UseHQ {
logger.Info("starting hq")

finisherFinishChan = makeStageChannel()
finisherProduceChan = makeStageChannel()

err = hq.Start(finisherFinishChan, finisherProduceChan)
if err != nil {
logger.Error("error starting hq source, retrying", "err", err.Error())
panic(err)
}
} else {
// Means we're using the to-be-implemented local queue, for the moment we're just gonna consume the channels
go func() {
for {
select {
case _, ok := <-finisherFinishChan:
if !ok {
return
}
case _, ok := <-finisherProduceChan:
if !ok {
return
}
}
}
}()
}

err = finisher.Start(postprocessorOutputChan, finisherFinishChan, finisherProduceChan)
Expand Down
101 changes: 56 additions & 45 deletions internal/pkg/finisher/finisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package finisher
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/internetarchive/Zeno/internal/pkg/config"
"github.com/internetarchive/Zeno/internal/pkg/controler/pause"
"github.com/internetarchive/Zeno/internal/pkg/log"
"github.com/internetarchive/Zeno/internal/pkg/reactor"
Expand Down Expand Up @@ -48,8 +50,10 @@ func Start(inputChan, sourceFinishedChan, sourceProducedChan chan *models.Item)
wg: sync.WaitGroup{},
}
logger.Debug("initialized")
globalFinisher.wg.Add(1)
go globalFinisher.run()
for i := 0; i < config.Get().WorkersCount; i++ {
globalFinisher.wg.Add(1)
go globalFinisher.worker(strconv.Itoa(i))
}
logger.Info("started")
done = true
})
Expand All @@ -73,68 +77,75 @@ func Stop() {
}
}

func (f *finisher) run() {
func (f *finisher) worker(workerID string) {
defer f.wg.Done()
logger := log.NewFieldedLogger(&log.Fields{
"component": "finisher.worker",
"worker_id": workerID,
})

controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)
defer f.wg.Done()

for {
select {
case <-f.ctx.Done():
logger.Debug("shutting down")
return
case <-controlChans.PauseCh:
logger.Debug("received pause event")
controlChans.ResumeCh <- struct{}{}
logger.Debug("received resume event")
case item := <-f.inputCh:
if item == nil {
panic("received nil item")
}
case seed, ok := <-f.inputCh:
if ok {
if seed == nil {
panic("received nil seed")
}

if !item.IsSeed() {
panic("received non-seed item")
}
if !seed.IsSeed() {
panic("received non-seed item")
}

logger.Debug("received item", "item", item.GetShortID())
logger.Debug("received seed", "seed", seed.GetShortID())

if err := item.CheckConsistency(); err != nil {
panic(fmt.Sprintf("item consistency check failed with err: %s, item id %s", err.Error(), item.GetShortID()))
}
if err := seed.CheckConsistency(); err != nil {
panic(fmt.Sprintf("seed consistency check failed with err: %s, seed id %s, worker id %s", err.Error(), seed.GetShortID(), workerID))
}

// If the item is fresh, send it to the source
if item.GetStatus() == models.ItemFresh {
logger.Debug("fresh item received", "item", item)
f.sourceProducedCh <- item
continue
}
// If the seed is fresh, send it to the source
if seed.GetStatus() == models.ItemFresh {
logger.Debug("fresh seed received", "seed", seed)
f.sourceProducedCh <- seed
continue
}

// If the item has fresh children, send it to feedback
isComplete := item.CompleteAndCheck()
if !isComplete {
logger.Debug("item has fresh children", "item", item.GetShortID())
err := reactor.ReceiveFeedback(item)
if err != nil && err != reactor.ErrReactorFrozen {
// If the seed has fresh children, send it to feedback
isComplete := seed.CompleteAndCheck()
if !isComplete {
logger.Debug("seed has fresh children", "seed", seed.GetShortID())
err := reactor.ReceiveFeedback(seed)
if err != nil && err != reactor.ErrReactorFrozen {
panic(err)
}
continue
}

// If the seed has no fresh redirection or children, mark it as finished
logger.Debug("seed has no fresh redirection or children", "seed", seed.GetShortID())
err := reactor.MarkAsFinished(seed)
if err != nil {
panic(err)
}
continue
}

// If the item has no fresh redirection or children, mark it as finished
logger.Debug("item has no fresh redirection or children", "item", item.GetShortID())
err := reactor.MarkAsFinished(item)
if err != nil {
panic(err)
}
// Notify the source that the seed has been finished
// E.g.: to delete the seed in Crawl HQ
if f.sourceFinishedCh != nil {
f.sourceFinishedCh <- seed
}

// Notify the source that the item has been finished
// E.g.: to delete the item in Crawl HQ
if f.sourceFinishedCh != nil {
f.sourceFinishedCh <- item
stats.SeedsFinishedIncr()
logger.Debug("seed finished", "seed", seed.GetShortID())
}

stats.SeedsFinishedIncr()
logger.Debug("item finished", "item", item.GetShortID())
case <-f.ctx.Done():
logger.Debug("shutting down")
return
}
}
}
Loading

0 comments on commit 37df6c0

Please sign in to comment.