Skip to content

Commit 6a9878f

Browse files
committed
feat; v3.1.0
1 parent 45cc1f5 commit 6a9878f

File tree

2 files changed

+74
-45
lines changed

2 files changed

+74
-45
lines changed

README.md

+9-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ Processing Time: 6m53s. Tested with a Ryzen 5800x3d
2828

2929
Version 2.0 of the One Billion Row Challenge Processor introduces significant optimizations, leading to a substantial reduction in processing time. This release focuses on enhancing concurrency handling and reducing contention, along with other performance improvements.
3030

31-
## Performance Enhancements
32-
3331
- **Concurrent Map Implementation:** Introduced a sharded concurrent map to reduce lock contention. This allows for more efficient updates to the data structure in a multi-threaded environment.
3432
- **Hash-Based Sharding:** Implemented hash-based sharding for distributing data across multiple shards, further reducing the chance of lock conflicts.
3533
- **Optimized String Processing:** Refined the string handling logic to minimize overhead during file parsing.
@@ -40,14 +38,21 @@ Processing Time 5m19s. Tested with a Ryzen 5800x3d
4038

4139
## v3.0.0
4240

43-
## Key Enhancements
44-
4541
- **Parallel File Processing:** Implemented an advanced parallel processing approach where the input file is divided into chunks and processed independently in parallel, drastically reducing I/O bottleneck.
4642
- **Optimized Memory Management:** Refined memory usage by processing data in chunks and employing local maps for data aggregation to reduce memory overhead.
4743
- **Improved Data Aggregation:** Enhanced the efficiency of data aggregation through the use of sharded data structures, minimizing lock contention.
4844

4945
Processing Time: 1m3s. Tested with a Ryzen 5800x3d and 32 gigs Ram
5046

47+
## v3.1.0
48+
49+
- **Reduced Calls to global map**
50+
- **Implemented a function to determine chunk bounds**
51+
52+
Processing Time: 59.6s. Tested with a Ryzen 5800x3d and 32 gigs Ram
53+
54+
I got this down to 59 Seconds and achieved my goal of getting it to under 1 minute. I am pretty happy with that for a single day session of coding. Further improvements could be made, and if I would continue working on it I would probably directly use a syscall with mmap and use the 8-byte hash of id as a key for an unsafe maphash. And maybe write some tests.
55+
5156
## Requirements
5257

5358
- Go Runtime ofc (1.21)

main.go

+65-41
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"fmt"
66
"hash/fnv"
7-
"io"
87
"os"
98
"sort"
109
"strconv"
@@ -13,29 +12,28 @@ import (
1312
"time"
1413
)
1514

16-
// StationData holds the temperature data for a station.
15+
// StationData holds the temperature data for a specific station.
1716
type StationData struct {
1817
min, max, sum, count float64
1918
}
2019

21-
// Constants for the number of workers and shards.
2220
const (
23-
numWorkers = 16
24-
numShards = 32
21+
numWorkers = 16 // Number of concurrent workers
22+
numShards = 2048 // Number of shards for distributing data
2523
)
2624

27-
// Shard contains a map of station data and a mutex for concurrent access.
25+
// Shard represents a concurrent-safe structure holding station data.
2826
type Shard struct {
2927
data map[string]*StationData
3028
lock sync.Mutex
3129
}
3230

33-
// StationMap holds shards for concurrent access to station data.
31+
// StationMap aggregates multiple shards for station data.
3432
type StationMap struct {
3533
shards [numShards]*Shard
3634
}
3735

38-
// NewStationMap initializes a new StationMap with the specified number of shards.
36+
// NewStationMap initializes a StationMap with predefined shards.
3937
func NewStationMap() *StationMap {
4038
sm := &StationMap{}
4139
for i := 0; i < numShards; i++ {
@@ -44,97 +42,123 @@ func NewStationMap() *StationMap {
4442
return sm
4543
}
4644

47-
// GetShard returns the shard for a given station key.
45+
// GetShard returns a specific shard based on the station key.
4846
func (sm *StationMap) GetShard(key string) *Shard {
4947
hash := fnv.New32a()
5048
hash.Write([]byte(key))
5149
return sm.shards[hash.Sum32()%numShards]
5250
}
5351

54-
// main is the entry point of the program.
5552
func main() {
5653
startTime := time.Now()
5754

5855
if len(os.Args) < 2 {
59-
fmt.Println("Usage: brc <file_path>")
56+
fmt.Println("Usage: <program_name> <file_path>")
6057
os.Exit(1)
6158
}
6259
fileName := os.Args[1]
6360

6461
stationMap := processFile(fileName)
65-
6662
printResults(stationMap)
6763

6864
duration := time.Since(startTime)
6965
fmt.Printf("Processing completed in %s\n", duration)
7066
}
7167

72-
// processFile processes the file and returns a populated StationMap.
68+
// processFile handles the file processing and returns a StationMap.
7369
func processFile(fileName string) *StationMap {
74-
fileInfo, err := os.Stat(fileName)
70+
file, err := os.Open(fileName)
71+
if err != nil {
72+
panic(err)
73+
}
74+
defer file.Close()
75+
76+
fileInfo, err := file.Stat()
7577
if err != nil {
7678
panic(err)
7779
}
7880

7981
fileSize := fileInfo.Size()
8082
chunkSize := fileSize / int64(numWorkers)
81-
var wg sync.WaitGroup
82-
8383
sMap := NewStationMap()
84+
var wg sync.WaitGroup
8485

8586
for i := 0; i < numWorkers; i++ {
8687
wg.Add(1)
8788
go func(chunkStart int64) {
8889
defer wg.Done()
89-
processChunk(fileName, chunkStart, chunkSize, sMap)
90+
f, err := os.Open(fileName)
91+
if err != nil {
92+
panic(err)
93+
}
94+
defer f.Close()
95+
96+
actualStart, actualEnd := determineChunkBounds(f, chunkStart, chunkSize)
97+
processChunk(f, actualStart, actualEnd, sMap)
9098
}(int64(i) * chunkSize)
9199
}
92100

93101
wg.Wait()
94102
return sMap
95103
}
96104

97-
// processChunk processes a chunk of the file.
98-
func processChunk(fileName string, offset, size int64, sMap *StationMap) {
99-
file, err := os.Open(fileName)
105+
// determineChunkBounds calculates the actual boundaries of a file chunk.
106+
func determineChunkBounds(f *os.File, chunkStart, chunkSize int64) (int64, int64) {
107+
var actualStart, actualEnd int64
108+
109+
if chunkStart != 0 {
110+
_, err := f.Seek(chunkStart, 0)
111+
if err != nil {
112+
panic(err)
113+
}
114+
115+
scanner := bufio.NewScanner(f)
116+
scanner.Scan()
117+
actualStart = chunkStart + int64(len(scanner.Bytes())) + 1
118+
}
119+
120+
_, err := f.Seek(chunkStart+chunkSize, 0)
100121
if err != nil {
101122
panic(err)
102123
}
103-
defer file.Close()
104124

105-
if _, err = file.Seek(offset, 0); err != nil {
125+
scanner := bufio.NewScanner(f)
126+
scanner.Scan()
127+
actualEnd = chunkStart + chunkSize + int64(len(scanner.Bytes())) + 1
128+
129+
return actualStart, actualEnd
130+
}
131+
132+
// processChunk handles the processing of a specific file chunk.
133+
func processChunk(f *os.File, start, end int64, sMap *StationMap) {
134+
_, err := f.Seek(start, 0)
135+
if err != nil {
106136
panic(err)
107137
}
108138

109-
reader := bufio.NewReader(file)
139+
scanner := bufio.NewScanner(f)
110140
localMap := make(map[string]*StationData)
111141

112-
if offset != 0 {
113-
_, err := reader.ReadString('\n')
114-
if err != nil && err != io.EOF {
115-
panic(err)
116-
}
117-
}
118-
119-
var bytesRead int64
120-
for {
121-
line, err := reader.ReadString('\n')
122-
bytesRead += int64(len(line))
142+
var currentPos int64 = start
143+
for scanner.Scan() {
144+
line := scanner.Text()
145+
currentPos += int64(len(line) + 1)
123146

124-
if err == io.EOF || (offset+bytesRead) >= (offset+size) {
147+
if currentPos > end {
125148
break
126149
}
127-
if err != nil {
128-
panic(err)
129-
}
130150

131151
processLine(strings.TrimSpace(line), localMap)
132152
}
133153

154+
if err := scanner.Err(); err != nil {
155+
panic(err)
156+
}
157+
134158
mergeLocalMap(localMap, sMap)
135159
}
136160

137-
// mergeLocalMap merges a local map of station data into the global StationMap.
161+
// mergeLocalMap merges local station data into the global StationMap.
138162
func mergeLocalMap(localMap map[string]*StationData, sm *StationMap) {
139163
for station, data := range localMap {
140164
shard := sm.GetShard(station)
@@ -151,7 +175,7 @@ func mergeLocalMap(localMap map[string]*StationData, sm *StationMap) {
151175
}
152176
}
153177

154-
// processLine processes a single line of input and updates the local map.
178+
// processLine processes a single line of the file.
155179
func processLine(line string, localMap map[string]*StationData) {
156180
parts := strings.SplitN(line, ";", 2)
157181
if len(parts) != 2 {
@@ -180,7 +204,7 @@ func processLine(line string, localMap map[string]*StationData) {
180204
}
181205
}
182206

183-
// printResults prints the aggregated results from the StationMap.
207+
// printResults outputs the aggregated station data.
184208
func printResults(sm *StationMap) {
185209
consolidatedData := make(map[string]*StationData)
186210
for _, shard := range sm.shards {

0 commit comments

Comments
 (0)