@@ -30,6 +30,9 @@ impl Grouping {
30
30
}
31
31
32
32
/// The aggregate accumulator.
33
+ ///
34
+ /// This transfors disttributed aggregate functions
35
+ /// into a single value.
33
36
#[ derive( Debug ) ]
34
37
struct Accumulator < ' a > {
35
38
target : & ' a AggregateTarget ,
@@ -54,6 +57,7 @@ impl<'a> Accumulator<'a> {
54
57
. collect ( )
55
58
}
56
59
60
+ /// Transform COUNT(*), MIN, MAX, etc., from multiple shards into a single value.
57
61
fn accumulate ( & mut self , row : & DataRow , rd : & RowDescription ) -> Result < ( ) , Error > {
58
62
let column = row. get_column ( self . target . column ( ) , rd) ?;
59
63
if let Some ( column) = column {
@@ -77,6 +81,13 @@ impl<'a> Accumulator<'a> {
77
81
self . datum = column. value ;
78
82
}
79
83
}
84
+ AggregateFunction :: Sum => {
85
+ if !self . datum . is_null ( ) {
86
+ self . datum = self . datum . clone ( ) + column. value ;
87
+ } else {
88
+ self . datum = column. value ;
89
+ }
90
+ }
80
91
_ => ( ) ,
81
92
}
82
93
}
@@ -122,6 +133,15 @@ impl<'a> Aggregates<'a> {
122
133
123
134
let mut rows = VecDeque :: new ( ) ;
124
135
for ( grouping, accumulator) in self . mappings {
136
+ //
137
+ // Aggregate rules in Postgres dictate that the only
138
+ // columns present in the row are either:
139
+ //
140
+ // 1. part of the GROUP BY, which means they are
141
+ // stored in the grouping
142
+ // 2. are aggregate functions, which means they
143
+ // are stored in the accunmulator
144
+ //
125
145
let mut row = DataRow :: new ( ) ;
126
146
for ( idx, datum) in grouping. columns {
127
147
row. insert ( idx, datum) ;
0 commit comments