Skip to content

Commit c858ef3

Browse files
committed
add: --hq-batch-concurrency
1 parent 6d48952 commit c858ef3

File tree

6 files changed

+56
-10
lines changed

6 files changed

+56
-10
lines changed

cmd/get_hq.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ func getHQCmdFlags(getHQCmd *cobra.Command) {
5050
getHQCmd.PersistentFlags().String("hq-key", "", "Crawl HQ key.")
5151
getHQCmd.PersistentFlags().String("hq-secret", "", "Crawl HQ secret.")
5252
getHQCmd.PersistentFlags().String("hq-project", "", "Crawl HQ project.")
53-
getHQCmd.PersistentFlags().Int64("hq-batch-size", 0, "Crawl HQ feeding batch size.")
5453
getHQCmd.PersistentFlags().Bool("hq-continuous-pull", false, "If turned on, the crawler will pull URLs from Crawl HQ continuously.")
5554
getHQCmd.PersistentFlags().String("hq-strategy", "lifo", "Crawl HQ feeding strategy.")
55+
getHQCmd.PersistentFlags().Int64("hq-batch-size", 0, "Crawl HQ feeding batch size.")
56+
getHQCmd.PersistentFlags().Int64("hq-batch-concurrency", 1, "Number of concurrent requests to do to get the --hq-batch-size, if batch size is 300 and batch-concurrency is 10, 30 requests will be done concurrently.")
5657
getHQCmd.PersistentFlags().Bool("hq-rate-limiting-send-back", false, "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.")
5758

5859
getHQCmd.MarkPersistentFlagRequired("hq-address")

config/config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
HQSecret string `mapstructure:"hq-secret"`
3232
HQProject string `mapstructure:"hq-project"`
3333
HQStrategy string `mapstructure:"hq-strategy"`
34+
HQBatchSize int64 `mapstructure:"hq-batch-size"`
35+
HQBatchConcurrency int `mapstructure:"hq-batch-concurrency"`
3436
LogFileOutputDir string `mapstructure:"log-file-output-dir"`
3537
ElasticSearchUsername string `mapstructure:"es-user"`
3638
ElasticSearchPassword string `mapstructure:"es-password"`
@@ -54,7 +56,6 @@ type Config struct {
5456
MinSpaceRequired int `mapstructure:"min-space-required"`
5557
WARCPoolSize int `mapstructure:"warc-pool-size"`
5658
WARCDedupeSize int `mapstructure:"warc-dedupe-size"`
57-
HQBatchSize int64 `mapstructure:"hq-batch-size"`
5859
KeepCookies bool `mapstructure:"keep-cookies"`
5960
Headless bool `mapstructure:"headless"`
6061
DisableSeencheck bool `mapstructure:"disable-seencheck"`

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/gosuri/uilive v0.0.4
1414
github.com/gosuri/uitable v0.0.4
1515
github.com/grafov/m3u8 v0.12.0
16-
github.com/internetarchive/gocrawlhq v1.2.19
16+
github.com/internetarchive/gocrawlhq v1.2.20
1717
github.com/paulbellamy/ratecounter v0.2.0
1818
github.com/philippgille/gokv/leveldb v0.7.0
1919
github.com/prometheus/client_golang v1.20.4

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ github.com/internetarchive/gocrawlhq v1.2.18 h1:PPe7UqJ2NNOljn70SmUhoKdgPreeqRUk
9494
github.com/internetarchive/gocrawlhq v1.2.18/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
9595
github.com/internetarchive/gocrawlhq v1.2.19 h1:bvDliaeWjt97x64bOf+rKXStQX7VE+ZON/I1FS3sQ6A=
9696
github.com/internetarchive/gocrawlhq v1.2.19/go.mod h1:gHrdMewIi5OBWE/xEZGqSrNHyTXPbt+h+XUWpp9fZek=
97+
github.com/internetarchive/gocrawlhq v1.2.20 h1:0mIIt9lhPacKr6L2JeISoopQ8EgzC3dISJ3ITGGbOp4=
98+
github.com/internetarchive/gocrawlhq v1.2.20/go.mod h1:gHrdMewIi5OBWE/xEZGqSrNHyTXPbt+h+XUWpp9fZek=
9799
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
98100
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
99101
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=

internal/pkg/crawl/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type Crawl struct {
114114
HQKey string
115115
HQSecret string
116116
HQStrategy string
117+
HQBatchConcurrency int
117118
HQBatchSize int
118119
HQContinuousPull bool
119120
HQClient *gocrawlhq.Client
@@ -319,6 +320,7 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
319320
c.HQSecret = config.HQSecret
320321
c.HQStrategy = config.HQStrategy
321322
c.HQBatchSize = int(config.HQBatchSize)
323+
c.HQBatchConcurrency = config.HQBatchConcurrency
322324
c.HQContinuousPull = config.HQContinuousPull
323325
c.HQRateLimitingSendBack = config.HQRateLimitSendBack
324326

internal/pkg/crawl/hq.go

+47-7
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,53 @@ func (c *Crawl) HQConsumer() {
177177

178178
// get batch from crawl HQ
179179
c.HQConsumerState = "waitingOnFeed"
180-
URLs, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
181-
if err != nil {
182-
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
183-
// "batchSize": HQBatchSize,
184-
// "err": err,
185-
// })).Debug("error getting new URLs from crawl HQ")
186-
continue
180+
var URLs []gocrawlhq.URL
181+
var err error
182+
if c.HQBatchConcurrency == 1 {
183+
URLs, err = c.HQClient.Get(HQBatchSize, c.HQStrategy)
184+
if err != nil {
185+
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
186+
// "batchSize": HQBatchSize,
187+
// "err": err,
188+
// })).Debug("error getting new URLs from crawl HQ")
189+
continue
190+
}
191+
} else {
192+
var mu sync.Mutex
193+
var wg sync.WaitGroup
194+
batchSize := HQBatchSize / c.HQBatchConcurrency
195+
URLsChan := make(chan []gocrawlhq.URL, c.HQBatchConcurrency)
196+
197+
// Start goroutines to get URLs from crawl HQ, each will request
198+
// HQBatchSize / HQConcurrentBatch URLs
199+
for i := 0; i < c.HQBatchConcurrency; i++ {
200+
wg.Add(1)
201+
go func() {
202+
defer wg.Done()
203+
URLs, err := c.HQClient.Get(batchSize, c.HQStrategy)
204+
if err != nil {
205+
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
206+
// "batchSize": batchSize,
207+
// "err": err,
208+
// })).Debug("error getting new URLs from crawl HQ")
209+
return
210+
}
211+
URLsChan <- URLs
212+
}()
213+
}
214+
215+
// Wait for all goroutines to finish
216+
go func() {
217+
wg.Wait()
218+
close(URLsChan)
219+
}()
220+
221+
// Collect all URLs from the channels
222+
for URLsFromChan := range URLsChan {
223+
mu.Lock()
224+
URLs = append(URLs, URLsFromChan...)
225+
mu.Unlock()
226+
}
187227
}
188228
c.HQConsumerState = "feedCompleted"
189229

0 commit comments

Comments
 (0)