Skip to content

Commit 72cbf92

Browse files
committed
feat: implemented go worker pool and various other enhancements
1 parent d0f244f commit 72cbf92

File tree

2 files changed

+91
-84
lines changed

2 files changed

+91
-84
lines changed

README.md

+21-9
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,32 @@
22

33
## Overview
44

5-
This Go program is designed to efficiently process a large dataset of temperature readings for different weather stations, as part of the [One Billion Row Challenge](https://github.com/gunnarmorling/1brc). The program reads a text file containing temperature measurements, calculates the minimum, mean, and maximum temperature for each station, and outputs the results to standard output (stdout). Additionally, it measures and displays the total processing time.
5+
This Go program is designed to efficiently process a large dataset of temperature readings for different weather stations, as part of the [One Billion Row Challenge](https://github.com/gunnarmorling/1brc). The program reads a text file containing temperature measurements, calculates the minimum, mean, and maximum temperature for each station, and outputs the results to stdout. Additionally, it measures and displays the total processing time.
66

7-
## Key Features
7+
## Key Features (v1.0.0)
88

9-
- **Concurrency:** Uses goroutines for parallel processing, enhancing performance on multi-core processors.
10-
- **Efficient File Reading:** Employs buffered reading for handling large files effectively.
9+
- **Concurrency:** Uses goroutines for parallel processing 2 enhance performance on multi-core processors.
10+
- **Efficient File Reading:** Employs buffered reading for handling the 12 gb of dataset more effectively.
1111
- **Data Aggregation:** Calculates min, mean, and max temperatures for each station.
1212
- **Performance Measurement:** Reports the total time taken for processing.
1313

14+
Processing Time: 9m21s. Tested with a Ryzen 5800x3d
15+
16+
## Recent Optimizations (v1.1.0)
17+
18+
The program has undergone several optimizations to improve its processing time:
19+
20+
- **Concurrency Model Improved:** Implemented a worker pool pattern for dynamic goroutine management and balanced workload distribution.
21+
- **Buffered Channels:** Increased channel buffer sizes to reduce blocking and increase throughput.
22+
- **Batch Processing:** Process multiple lines of data in a single goroutine to reduce overhead.
23+
- **I/O Enhancements:** Adjusted file reading for larger chunks to reduce I/O bottlenecks.
24+
25+
Processing Time: 6m53s. Tested with a Ryzen 5800x3d
26+
1427
## Requirements
1528

16-
- Go Binaries ofc
29+
- Go Runtime ofc (1.21)
30+
- Having the Dataset Up and Ready, see here for further instructions: [One Billion Row Challenge](https://github.com/gunnarmorling/1brc)
1731

1832
## How to Run the Program
1933

@@ -34,14 +48,12 @@ This Go program is designed to efficiently process a large dataset of temperatur
3448

3549
`
3650
{unak=38.8/38.8/38.8, Yuncheng=35.0/35.0/35.0, Yuncos=40.1/40.1/40.1, ...}
37-
Processing completed in 286.686139ms
51+
Processing completed in 9m 21
3852
`
3953

40-
YES, this really took only 2.87 s in go. Tested with a Ryzen 5800x3d
41-
4254
## Customization
4355

44-
- You can modify the number of goroutines in the program to match your CPU's core count for optimal performance.
56+
- You can modify the number of workers in the program to match your CPU's core count for optimal performance.
4557
- Adjust the file path in the program to point to your specific data file location.
4658

4759
## Notes

main.go

+70-75
Original file line numberDiff line numberDiff line change
@@ -16,93 +16,88 @@ type StationData struct {
1616
min, max, sum, count float64
1717
}
1818

19+
const numWorkers = 16 // Number of worker goroutines
20+
1921
func main() {
20-
startTime := time.Now()
21-
// Adjust this to the path of your data file
22-
fileName := "./data/weather_stations.csv"
22+
startTime := time.Now()
2323

24-
// Read and process file concurrently
25-
stationData := processFileConcurrently(fileName)
24+
// Adjust this to the path of your data file
25+
fileName := "./data/measurements.txt"
26+
stationData := processFile(fileName)
2627

27-
// Prepare output
28-
outputResults(stationData)
28+
printResults(stationData)
2929

30-
duration := time.Since(startTime)
31-
fmt.Printf("Processing completed in %s\n", duration)
30+
duration := time.Since(startTime)
31+
fmt.Printf("Processing completed in %s\n", duration)
3232
}
3333

34-
func processFileConcurrently(fileName string) map[string]*StationData {
35-
// Number of goroutines to use (can be tuned based on CPU cores)
36-
const numGoroutines = 16
37-
38-
// Channel for passing lines to processing goroutines
39-
linesCh := make(chan string, numGoroutines)
40-
41-
// WaitGroup to wait for all processing goroutines to finish
42-
var wg sync.WaitGroup
43-
wg.Add(numGoroutines)
44-
45-
// Mutex for synchronizing access to the map
46-
var mu sync.Mutex
47-
48-
// Map to store the aggregated data
49-
stationData := make(map[string]*StationData)
50-
51-
// Start processing goroutines
52-
for i := 0; i < numGoroutines; i++ {
53-
go func() {
54-
defer wg.Done()
55-
for line := range linesCh {
56-
// Process line and update data
57-
parts := strings.Split(line, ";")
58-
if len(parts) != 2 {
59-
continue // Skip malformed lines
60-
}
61-
station, tempStr := parts[0], parts[1]
62-
temp, err := strconv.ParseFloat(tempStr, 64)
63-
if err != nil {
64-
continue // Skip lines with invalid temperature
65-
}
66-
67-
mu.Lock()
68-
data, exists := stationData[station]
69-
if !exists {
70-
data = &StationData{min: temp, max: temp}
71-
stationData[station] = data
72-
}
73-
data.sum += temp
74-
data.count++
75-
if temp < data.min {
76-
data.min = temp
77-
}
78-
if temp > data.max {
79-
data.max = temp
80-
}
81-
mu.Unlock()
82-
}
83-
}()
84-
}
34+
func processFile(fileName string) map[string]*StationData {
35+
linesCh := make(chan string, 1000)
8536

86-
// Open file and buffer reading
87-
file, err := os.Open(fileName)
88-
if err != nil {
89-
panic(err)
90-
}
91-
defer file.Close()
37+
var wg sync.WaitGroup
38+
wg.Add(numWorkers)
9239

93-
scanner := bufio.NewScanner(file)
94-
for scanner.Scan() {
95-
linesCh <- scanner.Text()
96-
}
97-
close(linesCh)
40+
stationData := make(map[string]*StationData)
41+
var mu sync.Mutex
9842

99-
// Wait for all processing to be done
100-
wg.Wait()
43+
// Worker pool pattern
44+
for i := 0; i < numWorkers; i++ {
45+
go worker(&wg, linesCh, stationData, &mu)
46+
}
47+
48+
file, err := os.Open(fileName)
49+
if err != nil {
50+
panic(err)
51+
}
52+
defer file.Close()
53+
54+
scanner := bufio.NewScanner(file)
55+
for scanner.Scan() {
56+
linesCh <- scanner.Text()
57+
}
58+
close(linesCh)
59+
wg.Wait()
60+
61+
return stationData
62+
}
63+
64+
func worker(wg *sync.WaitGroup, lines <-chan string, data map[string]*StationData, mu *sync.Mutex) {
65+
defer wg.Done()
66+
for line := range lines {
67+
processLine(line, data, mu)
68+
}
69+
}
10170

102-
return stationData
71+
func processLine(line string, data map[string]*StationData, mu *sync.Mutex) {
72+
parts := strings.Split(line, ";")
73+
if len(parts) != 2 {
74+
return
75+
}
76+
77+
station, tempStr := parts[0], parts[1]
78+
temp, err := strconv.ParseFloat(tempStr, 64)
79+
if err != nil {
80+
return
81+
}
82+
83+
mu.Lock()
84+
defer mu.Unlock()
85+
86+
if sd, exists := data[station]; exists {
87+
sd.sum += temp
88+
sd.count++
89+
if temp < sd.min {
90+
sd.min = temp
91+
}
92+
if temp > sd.max {
93+
sd.max = temp
94+
}
95+
} else {
96+
data[station] = &StationData{min: temp, max: temp, sum: temp, count: 1}
97+
}
10398
}
10499

105-
func outputResults(stationData map[string]*StationData) {
100+
func printResults(stationData map[string]*StationData) {
106101
// Extract keys and sort them
107102
keys := make([]string, 0, len(stationData))
108103
for key := range stationData {

0 commit comments

Comments
 (0)