Skip to content

Commit 22293f9

Browse files
committed
feat: v3.0.0
1 parent f05b114 commit 22293f9

File tree

3 files changed

+190
-122
lines changed

3 files changed

+190
-122
lines changed

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ Version 2.0 of the One Billion Row Challenge Processor introduces significant op
3838

3939
Processing Time 5m19s. Tested with a Ryzen 5800x3d
4040

41+
## v3.0.0
42+
43+
## Key Enhancements
44+
45+
- **Parallel File Processing:** Implemented an advanced parallel processing approach where the input file is divided into chunks and processed independently in parallel, drastically reducing I/O bottleneck.
46+
- **Optimized Memory Management:** Refined memory usage by processing data in chunks and employing local maps for data aggregation to reduce memory overhead.
47+
- **Improved Data Aggregation:** Enhanced the efficiency of data aggregation through the use of sharded data structures, minimizing lock contention.
48+
49+
Processing Time: 1m3s. Tested with a Ryzen 5800x3d and 32 gigs Ram
50+
4151
## Requirements
4252

4353
- Go Runtime ofc (1.21)

go.sum

Whitespace-only changes.

main.go

+180-122
Original file line numberDiff line numberDiff line change
@@ -1,152 +1,210 @@
11
package main
22

33
import (
4-
"bufio"
5-
"fmt"
6-
"hash/fnv"
7-
"os"
8-
"sort"
9-
"strconv"
10-
"strings"
11-
"sync"
12-
"time"
4+
"bufio"
5+
"fmt"
6+
"hash/fnv"
7+
"io"
8+
"os"
9+
"sort"
10+
"strconv"
11+
"strings"
12+
"sync"
13+
"time"
1314
)
1415

16+
// StationData holds the temperature data for a station.
1517
type StationData struct {
16-
min, max, sum, count float64
18+
min, max, sum, count float64
1719
}
1820

19-
const numWorkers = 16
20-
const numShards = 32 // Number of shards in the concurrent map
21+
// Constants for the number of workers and shards.
22+
const (
23+
numWorkers = 16
24+
numShards = 32
25+
)
26+
27+
// Shard contains a map of station data and a mutex for concurrent access.
28+
type Shard struct {
29+
data map[string]*StationData
30+
lock sync.Mutex
31+
}
2132

22-
type ConcurrentMap struct {
23-
shards [numShards]map[string]*StationData
24-
locks [numShards]*sync.Mutex
33+
// StationMap holds shards for concurrent access to station data.
34+
type StationMap struct {
35+
shards [numShards]*Shard
2536
}
2637

27-
func NewConcurrentMap() *ConcurrentMap {
28-
cMap := &ConcurrentMap{}
29-
for i := 0; i < numShards; i++ {
30-
cMap.shards[i] = make(map[string]*StationData)
31-
cMap.locks[i] = &sync.Mutex{}
32-
}
33-
return cMap
38+
// NewStationMap initializes a new StationMap with the specified number of shards.
39+
func NewStationMap() *StationMap {
40+
sm := &StationMap{}
41+
for i := 0; i < numShards; i++ {
42+
sm.shards[i] = &Shard{data: make(map[string]*StationData)}
43+
}
44+
return sm
3445
}
3546

36-
func (cMap *ConcurrentMap) GetShard(key string) (shard map[string]*StationData, lock *sync.Mutex) {
37-
hash := fnv.New32()
38-
hash.Write([]byte(key))
39-
shardIndex := hash.Sum32() % numShards
40-
return cMap.shards[shardIndex], cMap.locks[shardIndex]
47+
// GetShard returns the shard for a given station key.
48+
func (sm *StationMap) GetShard(key string) *Shard {
49+
hash := fnv.New32a()
50+
hash.Write([]byte(key))
51+
return sm.shards[hash.Sum32()%numShards]
4152
}
4253

54+
// main is the entry point of the program.
4355
func main() {
44-
startTime := time.Now()
56+
startTime := time.Now()
4557

46-
if len(os.Args) < 2 {
47-
fmt.Println("Usage: brc <file_path>")
48-
os.Exit(1)
49-
}
50-
fileName := os.Args[1]
58+
if len(os.Args) < 2 {
59+
fmt.Println("Usage: brc <file_path>")
60+
os.Exit(1)
61+
}
62+
fileName := os.Args[1]
5163

52-
stationData := processFile(fileName)
64+
stationMap := processFile(fileName)
5365

54-
printResults(stationData)
66+
printResults(stationMap)
5567

56-
duration := time.Since(startTime)
57-
fmt.Printf("Processing completed in %s\n", duration)
68+
duration := time.Since(startTime)
69+
fmt.Printf("Processing completed in %s\n", duration)
5870
}
5971

60-
func processFile(fileName string) *ConcurrentMap {
61-
linesCh := make(chan string, 1000)
62-
var wg sync.WaitGroup
63-
wg.Add(numWorkers)
64-
65-
cMap := NewConcurrentMap()
66-
67-
for i := 0; i < numWorkers; i++ {
68-
go worker(&wg, linesCh, cMap)
69-
}
70-
71-
file, err := os.Open(fileName)
72-
if err != nil {
73-
panic(err)
74-
}
75-
defer file.Close()
76-
77-
scanner := bufio.NewScanner(file)
78-
for scanner.Scan() {
79-
linesCh <- scanner.Text()
80-
}
81-
close(linesCh)
82-
wg.Wait()
72+
// processFile processes the file and returns a populated StationMap.
73+
func processFile(fileName string) *StationMap {
74+
fileInfo, err := os.Stat(fileName)
75+
if err != nil {
76+
panic(err)
77+
}
78+
79+
fileSize := fileInfo.Size()
80+
chunkSize := fileSize / int64(numWorkers)
81+
var wg sync.WaitGroup
82+
83+
sMap := NewStationMap()
84+
85+
for i := 0; i < numWorkers; i++ {
86+
wg.Add(1)
87+
go func(chunkStart int64) {
88+
defer wg.Done()
89+
processChunk(fileName, chunkStart, chunkSize, sMap)
90+
}(int64(i) * chunkSize)
91+
}
92+
93+
wg.Wait()
94+
return sMap
95+
}
8396

84-
return cMap
97+
// processChunk processes a chunk of the file.
98+
func processChunk(fileName string, offset, size int64, sMap *StationMap) {
99+
file, err := os.Open(fileName)
100+
if err != nil {
101+
panic(err)
102+
}
103+
defer file.Close()
104+
105+
if _, err = file.Seek(offset, 0); err != nil {
106+
panic(err)
107+
}
108+
109+
reader := bufio.NewReader(file)
110+
localMap := make(map[string]*StationData)
111+
112+
if offset != 0 {
113+
_, err := reader.ReadString('\n')
114+
if err != nil && err != io.EOF {
115+
panic(err)
116+
}
117+
}
118+
119+
var bytesRead int64
120+
for {
121+
line, err := reader.ReadString('\n')
122+
bytesRead += int64(len(line))
123+
124+
if err == io.EOF || (offset+bytesRead) >= (offset+size) {
125+
break
126+
}
127+
if err != nil {
128+
panic(err)
129+
}
130+
131+
processLine(strings.TrimSpace(line), localMap)
132+
}
133+
134+
mergeLocalMap(localMap, sMap)
85135
}
86136

87-
func worker(wg *sync.WaitGroup, lines <-chan string, cMap *ConcurrentMap) {
88-
defer wg.Done()
89-
for line := range lines {
90-
processLine(line, cMap)
91-
}
137+
// mergeLocalMap merges a local map of station data into the global StationMap.
138+
func mergeLocalMap(localMap map[string]*StationData, sm *StationMap) {
139+
for station, data := range localMap {
140+
shard := sm.GetShard(station)
141+
shard.lock.Lock()
142+
if sd, exists := shard.data[station]; exists {
143+
sd.sum += data.sum
144+
sd.count += data.count
145+
sd.min = min(sd.min, data.min)
146+
sd.max = max(sd.max, data.max)
147+
} else {
148+
shard.data[station] = data
149+
}
150+
shard.lock.Unlock()
151+
}
92152
}
93153

94-
func processLine(line string, cMap *ConcurrentMap) {
95-
parts := strings.Split(line, ";")
96-
if len(parts) != 2 {
97-
return
98-
}
99-
100-
station, tempStr := parts[0], parts[1]
101-
temp, err := strconv.ParseFloat(tempStr, 64)
102-
if err != nil {
103-
return
104-
}
105-
106-
shard, lock := cMap.GetShard(station)
107-
lock.Lock()
108-
data, exists := shard[station]
109-
if !exists {
110-
data = &StationData{min: temp, max: temp, sum: temp, count: 1}
111-
shard[station] = data
112-
} else {
113-
data.sum += temp
114-
data.count++
115-
if temp < data.min {
116-
data.min = temp
117-
}
118-
if temp > data.max {
119-
data.max = temp
120-
}
121-
}
122-
lock.Unlock()
154+
// processLine processes a single line of input and updates the local map.
155+
func processLine(line string, localMap map[string]*StationData) {
156+
parts := strings.SplitN(line, ";", 2)
157+
if len(parts) != 2 {
158+
return
159+
}
160+
161+
station, tempStr := parts[0], parts[1]
162+
temp, err := strconv.ParseFloat(tempStr, 64)
163+
if err != nil {
164+
return
165+
}
166+
167+
sd, exists := localMap[station]
168+
if !exists {
169+
sd = &StationData{min: temp, max: temp, sum: temp, count: 1}
170+
localMap[station] = sd
171+
} else {
172+
sd.sum += temp
173+
sd.count++
174+
if temp < sd.min {
175+
sd.min = temp
176+
}
177+
if temp > sd.max {
178+
sd.max = temp
179+
}
180+
}
123181
}
124182

125-
func printResults(cMap *ConcurrentMap) {
126-
// Consolidate data from shards
127-
consolidatedData := make(map[string]*StationData)
128-
for _, shard := range cMap.shards {
129-
for station, data := range shard {
130-
consolidatedData[station] = data
131-
}
132-
}
133-
134-
// Sort the station names
135-
keys := make([]string, 0, len(consolidatedData))
136-
for station := range consolidatedData {
137-
keys = append(keys, station)
138-
}
139-
sort.Strings(keys)
140-
141-
// Print sorted results
142-
fmt.Print("{")
143-
for i, key := range keys {
144-
data := consolidatedData[key]
145-
mean := data.sum / data.count
146-
fmt.Printf("%s=%.1f/%.1f/%.1f", key, data.min, mean, data.max)
147-
if i < len(keys)-1 {
148-
fmt.Print(", ")
149-
}
150-
}
151-
fmt.Println("}")
183+
// printResults prints the aggregated results from the StationMap.
184+
func printResults(sm *StationMap) {
185+
consolidatedData := make(map[string]*StationData)
186+
for _, shard := range sm.shards {
187+
shard.lock.Lock()
188+
for station, data := range shard.data {
189+
consolidatedData[station] = data
190+
}
191+
shard.lock.Unlock()
192+
}
193+
194+
var keys []string
195+
for k := range consolidatedData {
196+
keys = append(keys, k)
197+
}
198+
sort.Strings(keys)
199+
200+
fmt.Print("{")
201+
for i, key := range keys {
202+
sd := consolidatedData[key]
203+
mean := sd.sum / sd.count
204+
fmt.Printf("%s=%.1f/%.1f/%.1f", key, sd.min, mean, sd.max)
205+
if i < len(keys)-1 {
206+
fmt.Print(", ")
207+
}
208+
}
209+
fmt.Println("}")
152210
}

0 commit comments

Comments
 (0)