forked from citusdata/cstore_fdw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cstore_reader.c
1341 lines (1116 loc) · 41.4 KB
/
cstore_reader.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* cstore_reader.c
*
* This file contains function definitions for reading cstore files. This
* includes the logic for reading file level metadata, reading row stripes,
* and skipping unrelated row blocks and columns.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cstore_fdw.h"
#include "cstore_metadata_serialization.h"
#include "access/nbtree.h"
#include "access/skey.h"
#include "commands/defrem.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "port.h"
#include "storage/fd.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
/* static function declarations */
static StripeBuffers * LoadFilteredStripeBuffers(FILE *tableFile,
StripeMetadata *stripeMetadata,
TupleDesc tupleDescriptor,
List *projectedColumnList,
List *whereClauseList);
static void ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
uint64 blockIndex, uint64 blockRowIndex,
ColumnBlockData **blockDataArray,
Datum *columnValues, bool *columnNulls);
static ColumnBuffers * LoadColumnBuffers(FILE *tableFile,
ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset,
uint64 valueFileOffset,
Form_pg_attribute attributeForm);
static StripeFooter * LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 columnCount);
static StripeSkipList * LoadStripeSkipList(FILE *tableFile,
StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter,
uint32 columnCount,
Form_pg_attribute *attributeFormArray);
static bool * SelectedBlockMask(StripeSkipList *stripeSkipList,
List *projectedColumnList, List *whereClauseList);
static List * BuildRestrictInfoList(List *whereClauseList);
static Node * BuildBaseConstraint(Var *variable);
static OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static void UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue);
static StripeSkipList * SelectedBlockSkipList(StripeSkipList *stripeSkipList,
bool *selectedBlockMask);
static uint32 StripeSkipListRowCount(StripeSkipList *stripeSkipList);
static bool * ProjectedColumnMask(uint32 columnCount, List *projectedColumnList);
static void DeserializeBoolArray(StringInfo boolArrayBuffer, bool *boolArray,
uint32 boolArrayLength);
static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray,
uint32 datumCount, bool datumTypeByValue,
int datumTypeLength, char datumTypeAlign,
Datum *datumArray);
static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
Form_pg_attribute *attributeFormArray,
uint32 rowCount, ColumnBlockData **blockDataArray,
TupleDesc tupleDescriptor);
static Datum ColumnDefaultValue(TupleConstr *tupleConstraints,
Form_pg_attribute attributeForm);
static int64 FileSize(FILE *file);
static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size);
static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray,
uint32 columnCount);
static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata);
/*
* CStoreBeginRead initializes a cstore read operation. This function returns a
* read handle that's used during reading rows and finishing the read operation.
*/
TableReadState *
CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor,
List *projectedColumnList, List *whereClauseList)
{
TableReadState *readState = NULL;
TableFooter *tableFooter = NULL;
FILE *tableFile = NULL;
MemoryContext stripeReadContext = NULL;
uint32 columnCount = 0;
bool *projectedColumnMask = NULL;
ColumnBlockData **blockDataArray = NULL;
StringInfo tableFooterFilename = makeStringInfo();
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
tableFooter = CStoreReadFooter(tableFooterFilename);
pfree(tableFooterFilename->data);
pfree(tableFooterFilename);
tableFile = AllocateFile(filename, PG_BINARY_R);
if (tableFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
filename)));
}
/*
* We allocate all stripe specific data in the stripeReadContext, and reset
* this memory context before loading a new stripe. This is to avoid memory
* leaks.
*/
stripeReadContext = AllocSetContextCreate(CurrentMemoryContext,
"Stripe Read Memory Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
columnCount = tupleDescriptor->natts;
projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
blockDataArray = CreateEmptyBlockDataArray(columnCount, projectedColumnMask,
tableFooter->blockRowCount);
readState = palloc0(sizeof(TableReadState));
readState->tableFile = tableFile;
readState->tableFooter = tableFooter;
readState->projectedColumnList = projectedColumnList;
readState->whereClauseList = whereClauseList;
readState->stripeBuffers = NULL;
readState->readStripeCount = 0;
readState->stripeReadRowCount = 0;
readState->tupleDescriptor = tupleDescriptor;
readState->stripeReadContext = stripeReadContext;
readState->blockDataArray = blockDataArray;
readState->deserializedBlockIndex = -1;
return readState;
}
/*
* CStoreReadFooter reads the cstore file footer from the given file. First, the
* function reads the last byte of the file as the postscript size. Then, the
* function reads the postscript. Last, the function reads and deserializes the
* footer.
*/
TableFooter *
CStoreReadFooter(StringInfo tableFooterFilename)
{
TableFooter *tableFooter = NULL;
FILE *tableFooterFile = NULL;
uint64 footerOffset = 0;
uint64 footerLength = 0;
StringInfo postscriptBuffer = NULL;
StringInfo postscriptSizeBuffer = NULL;
uint64 postscriptSizeOffset = 0;
uint8 postscriptSize = 0;
uint64 footerFileSize = 0;
uint64 postscriptOffset = 0;
StringInfo footerBuffer = NULL;
int freeResult = 0;
tableFooterFile = AllocateFile(tableFooterFilename->data, PG_BINARY_R);
if (tableFooterFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
tableFooterFilename->data),
errhint("Try copying in data to the table.")));
}
footerFileSize = FileSize(tableFooterFile);
if (footerFileSize < CSTORE_POSTSCRIPT_SIZE_LENGTH)
{
ereport(ERROR, (errmsg("invalid cstore file")));
}
postscriptSizeOffset = footerFileSize - CSTORE_POSTSCRIPT_SIZE_LENGTH;
postscriptSizeBuffer = ReadFromFile(tableFooterFile, postscriptSizeOffset,
CSTORE_POSTSCRIPT_SIZE_LENGTH);
memcpy(&postscriptSize, postscriptSizeBuffer->data, CSTORE_POSTSCRIPT_SIZE_LENGTH);
if (postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize)
{
ereport(ERROR, (errmsg("invalid postscript size")));
}
postscriptOffset = footerFileSize - (CSTORE_POSTSCRIPT_SIZE_LENGTH + postscriptSize);
postscriptBuffer = ReadFromFile(tableFooterFile, postscriptOffset, postscriptSize);
DeserializePostScript(postscriptBuffer, &footerLength);
if (footerLength + postscriptSize + CSTORE_POSTSCRIPT_SIZE_LENGTH > footerFileSize)
{
ereport(ERROR, (errmsg("invalid footer size")));
}
footerOffset = postscriptOffset - footerLength;
footerBuffer = ReadFromFile(tableFooterFile, footerOffset, footerLength);
tableFooter = DeserializeTableFooter(footerBuffer);
freeResult = FreeFile(tableFooterFile);
if (freeResult != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not close file: %m")));
}
return tableFooter;
}
/*
* CStoreReadNextRow tries to read a row from the cstore file. On success, it sets
* column values and nulls, and returns true. If there are no more rows to read,
* the function returns false.
*/
bool
CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNulls)
{
uint32 blockIndex = 0;
uint32 blockRowIndex = 0;
TableFooter *tableFooter = readState->tableFooter;
Form_pg_attribute *attributeFormArray = readState->tupleDescriptor->attrs;
MemoryContext oldContext = NULL;
/*
* If no stripes are loaded, load the next non-empty stripe. Note that when
* loading stripes, we skip over blocks whose contents can be filtered with
* the query's restriction qualifiers. So, even when a stripe is physically
* not empty, we may end up loading it as an empty stripe.
*/
while (readState->stripeBuffers == NULL)
{
StripeBuffers *stripeBuffers = NULL;
StripeMetadata *stripeMetadata = NULL;
List *stripeMetadataList = tableFooter->stripeMetadataList;
uint32 stripeCount = list_length(stripeMetadataList);
/* if we have read all stripes, return false */
if (readState->readStripeCount == stripeCount)
{
return false;
}
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
MemoryContextReset(readState->stripeReadContext);
stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount);
stripeBuffers = LoadFilteredStripeBuffers(readState->tableFile, stripeMetadata,
readState->tupleDescriptor,
readState->projectedColumnList,
readState->whereClauseList);
readState->readStripeCount++;
MemoryContextSwitchTo(oldContext);
if (stripeBuffers->rowCount != 0)
{
readState->stripeBuffers = stripeBuffers;
readState->stripeReadRowCount = 0;
readState->deserializedBlockIndex = -1;
ResetUncompressedBlockData(readState->blockDataArray,
stripeBuffers->columnCount);
break;
}
}
blockIndex = readState->stripeReadRowCount / tableFooter->blockRowCount;
blockRowIndex = readState->stripeReadRowCount % tableFooter->blockRowCount;
if (blockIndex != readState->deserializedBlockIndex)
{
uint32 lastBlockIndex = 0;
uint32 blockRowCount = 0;
uint32 stripeRowCount = 0;
stripeRowCount = readState->stripeBuffers->rowCount;
lastBlockIndex = stripeRowCount / tableFooter->blockRowCount;
if (blockIndex == lastBlockIndex)
{
blockRowCount = stripeRowCount % tableFooter->blockRowCount;
}
else
{
blockRowCount = tableFooter->blockRowCount;
}
oldContext = MemoryContextSwitchTo(readState->stripeReadContext);
DeserializeBlockData(readState->stripeBuffers, blockIndex, attributeFormArray,
blockRowCount, readState->blockDataArray,
readState->tupleDescriptor);
MemoryContextSwitchTo(oldContext);
readState->deserializedBlockIndex = blockIndex;
}
ReadStripeNextRow(readState->stripeBuffers, readState->projectedColumnList,
blockIndex, blockRowIndex, readState->blockDataArray,
columnValues, columnNulls);
/*
* If we finished reading the current stripe, set stripe data to NULL. That
* way, we will load a new stripe the next time this function gets called.
*/
readState->stripeReadRowCount++;
if (readState->stripeReadRowCount == readState->stripeBuffers->rowCount)
{
readState->stripeBuffers = NULL;
}
return true;
}
/* Finishes a cstore read operation. */
void
CStoreEndRead(TableReadState *readState)
{
int columnCount = readState->tupleDescriptor->natts;
MemoryContextDelete(readState->stripeReadContext);
FreeFile(readState->tableFile);
list_free_deep(readState->tableFooter->stripeMetadataList);
FreeColumnBlockDataArray(readState->blockDataArray, columnCount);
pfree(readState->tableFooter);
pfree(readState);
}
/*
* CreateEmptyBlockDataArray creates data buffers to keep deserialized exist and
* value arrays for requested columns in columnMask.
*/
ColumnBlockData **
CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask, uint32 blockRowCount)
{
uint32 columnIndex = 0;
ColumnBlockData **blockDataArray = palloc0(columnCount * sizeof(ColumnBlockData*));
/* allocate block memory for deserialized data */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
if (columnMask[columnIndex])
{
ColumnBlockData *blockData = palloc0(sizeof(ColumnBlockData));
blockData->existsArray = palloc0(blockRowCount * sizeof(bool));
blockData->valueArray = palloc0(blockRowCount * sizeof(Datum));
blockData->valueBuffer = NULL;
blockDataArray[columnIndex] = blockData;
}
}
return blockDataArray;
}
/*
* FreeColumnBlockDataArray deallocates data buffers to keep deserialized exist and
* value arrays for requested columns in columnMask.
* ColumnBlockData->serializedValueBuffer lives in memory read/write context
* so it is deallocated automatically when the context is deleted.
*/
void
FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount)
{
uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
ColumnBlockData *blockData = blockDataArray[columnIndex];
if (blockData != NULL)
{
pfree(blockData->existsArray);
pfree(blockData->valueArray);
pfree(blockData);
}
}
pfree(blockDataArray);
}
/* CStoreTableRowCount returns the exact row count of a table using skiplists */
uint64
CStoreTableRowCount(const char *filename)
{
TableFooter *tableFooter = NULL;
FILE *tableFile;
ListCell *stripeMetadataCell = NULL;
uint64 totalRowCount = 0;
StringInfo tableFooterFilename = makeStringInfo();
appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX);
tableFooter = CStoreReadFooter(tableFooterFilename);
pfree(tableFooterFilename->data);
pfree(tableFooterFilename);
tableFile = AllocateFile(filename, PG_BINARY_R);
if (tableFile == NULL)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m", filename)));
}
foreach(stripeMetadataCell, tableFooter->stripeMetadataList)
{
StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell);
totalRowCount += StripeRowCount(tableFile, stripeMetadata);
}
FreeFile(tableFile);
return totalRowCount;
}
/*
* StripeRowCount reads serialized stripe footer, the first column's
* skip list, and returns number of rows for given stripe.
*/
static uint64
StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata)
{
uint64 rowCount = 0;
StripeFooter *stripeFooter = NULL;
StringInfo footerBuffer = NULL;
StringInfo firstColumnSkipListBuffer = NULL;
uint64 footerOffset = 0;
footerOffset += stripeMetadata->fileOffset;
footerOffset += stripeMetadata->skipListLength;
footerOffset += stripeMetadata->dataLength;
footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength);
stripeFooter = DeserializeStripeFooter(footerBuffer);
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]);
rowCount = DeserializeRowCount(firstColumnSkipListBuffer);
return rowCount;
}
/*
* LoadFilteredStripeBuffers reads serialized stripe data from the given file.
* The function skips over blocks whose rows are refuted by restriction qualifiers,
* and only loads columns that are projected in the query.
*/
static StripeBuffers *
LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata,
TupleDesc tupleDescriptor, List *projectedColumnList,
List *whereClauseList)
{
StripeBuffers *stripeBuffers = NULL;
ColumnBuffers **columnBuffersArray = NULL;
uint64 currentColumnFileOffset = 0;
uint32 columnIndex = 0;
Form_pg_attribute *attributeFormArray = tupleDescriptor->attrs;
uint32 columnCount = tupleDescriptor->natts;
StripeFooter *stripeFooter = LoadStripeFooter(tableFile, stripeMetadata,
columnCount);
StripeSkipList *stripeSkipList = LoadStripeSkipList(tableFile, stripeMetadata,
stripeFooter, columnCount,
attributeFormArray);
bool *projectedColumnMask = ProjectedColumnMask(columnCount, projectedColumnList);
bool *selectedBlockMask = SelectedBlockMask(stripeSkipList, projectedColumnList,
whereClauseList);
StripeSkipList *selectedBlockSkipList = SelectedBlockSkipList(stripeSkipList,
selectedBlockMask);
/* load column data for projected columns */
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
currentColumnFileOffset = stripeMetadata->fileOffset + stripeMetadata->skipListLength;
for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++)
{
uint64 existsSize = stripeFooter->existsSizeArray[columnIndex];
uint64 valueSize = stripeFooter->valueSizeArray[columnIndex];
uint64 existsFileOffset = currentColumnFileOffset;
uint64 valueFileOffset = currentColumnFileOffset + existsSize;
if (projectedColumnMask[columnIndex])
{
ColumnBlockSkipNode *blockSkipNode =
selectedBlockSkipList->blockSkipNodeArray[columnIndex];
Form_pg_attribute attributeForm = attributeFormArray[columnIndex];
uint32 blockCount = selectedBlockSkipList->blockCount;
ColumnBuffers *columnBuffers = LoadColumnBuffers(tableFile, blockSkipNode,
blockCount,
existsFileOffset,
valueFileOffset,
attributeForm);
columnBuffersArray[columnIndex] = columnBuffers;
}
currentColumnFileOffset += existsSize;
currentColumnFileOffset += valueSize;
}
stripeBuffers = palloc0(sizeof(StripeBuffers));
stripeBuffers->columnCount = columnCount;
stripeBuffers->rowCount = StripeSkipListRowCount(selectedBlockSkipList);
stripeBuffers->columnBuffersArray = columnBuffersArray;
return stripeBuffers;
}
/*
* ReadStripeNextRow reads the next row from the given stripe, finds the projected
* column values within this row, and accordingly sets the column values and nulls.
* Note that this function sets the values for all non-projected columns to null.
*/
static void
ReadStripeNextRow(StripeBuffers *stripeBuffers, List *projectedColumnList,
uint64 blockIndex, uint64 blockRowIndex,
ColumnBlockData **blockDataArray, Datum *columnValues,
bool *columnNulls)
{
ListCell *projectedColumnCell = NULL;
/* set all columns to null by default */
memset(columnNulls, 1, stripeBuffers->columnCount * sizeof(bool));
foreach(projectedColumnCell, projectedColumnList)
{
Var *projectedColumn = lfirst(projectedColumnCell);
uint32 projectedColumnIndex = projectedColumn->varattno - 1;
ColumnBlockData *blockData = blockDataArray[projectedColumnIndex];
if (blockData->existsArray[blockRowIndex])
{
columnValues[projectedColumnIndex] = blockData->valueArray[blockRowIndex];
columnNulls[projectedColumnIndex] = false;
}
}
}
/*
* LoadColumnBuffers reads serialized column data from the given file. These
* column data are laid out as sequential blocks in the file; and block positions
* and lengths are retrieved from the column block skip node array.
*/
static ColumnBuffers *
LoadColumnBuffers(FILE *tableFile, ColumnBlockSkipNode *blockSkipNodeArray,
uint32 blockCount, uint64 existsFileOffset, uint64 valueFileOffset,
Form_pg_attribute attributeForm)
{
ColumnBuffers *columnBuffers = NULL;
uint32 blockIndex = 0;
ColumnBlockBuffers **blockBuffersArray =
palloc0(blockCount * sizeof(ColumnBlockBuffers *));
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
blockBuffersArray[blockIndex] = palloc0(sizeof(ColumnBlockBuffers));
}
/*
* We first read the "exists" blocks. We don't read "values" array here,
* because "exists" blocks are stored sequentially on disk, and we want to
* minimize disk seeks.
*/
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
uint64 existsOffset = existsFileOffset + blockSkipNode->existsBlockOffset;
StringInfo rawExistsBuffer = ReadFromFile(tableFile, existsOffset,
blockSkipNode->existsLength);
blockBuffersArray[blockIndex]->existsBuffer = rawExistsBuffer;
}
/* then read "values" blocks, which are also stored sequentially on disk */
for (blockIndex = 0; blockIndex < blockCount; blockIndex++)
{
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
CompressionType compressionType = blockSkipNode->valueCompressionType;
uint64 valueOffset = valueFileOffset + blockSkipNode->valueBlockOffset;
StringInfo rawValueBuffer = ReadFromFile(tableFile, valueOffset,
blockSkipNode->valueLength);
blockBuffersArray[blockIndex]->valueBuffer = rawValueBuffer;
blockBuffersArray[blockIndex]->valueCompressionType = compressionType;
}
columnBuffers = palloc0(sizeof(ColumnBuffers));
columnBuffers->blockBuffersArray = blockBuffersArray;
return columnBuffers;
}
/* Reads and returns the given stripe's footer. */
static StripeFooter *
LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata,
uint32 columnCount)
{
StripeFooter *stripeFooter = NULL;
StringInfo footerBuffer = NULL;
uint64 footerOffset = 0;
footerOffset += stripeMetadata->fileOffset;
footerOffset += stripeMetadata->skipListLength;
footerOffset += stripeMetadata->dataLength;
footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength);
stripeFooter = DeserializeStripeFooter(footerBuffer);
if (stripeFooter->columnCount > columnCount)
{
ereport(ERROR, (errmsg("stripe footer column count and table column count "
"don't match")));
}
return stripeFooter;
}
/* Reads the skip list for the given stripe. */
static StripeSkipList *
LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata,
StripeFooter *stripeFooter, uint32 columnCount,
Form_pg_attribute *attributeFormArray)
{
StripeSkipList *stripeSkipList = NULL;
ColumnBlockSkipNode **blockSkipNodeArray = NULL;
StringInfo firstColumnSkipListBuffer = NULL;
uint64 currentColumnSkipListFileOffset = 0;
uint32 columnIndex = 0;
uint32 stripeBlockCount = 0;
uint32 stripeColumnCount = stripeFooter->columnCount;
/* deserialize block count */
firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset,
stripeFooter->skipListSizeArray[0]);
stripeBlockCount = DeserializeBlockCount(firstColumnSkipListBuffer);
/* deserialize column skip lists */
blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
currentColumnSkipListFileOffset = stripeMetadata->fileOffset;
for (columnIndex = 0; columnIndex < stripeColumnCount; columnIndex++)
{
uint64 columnSkipListSize = stripeFooter->skipListSizeArray[columnIndex];
Form_pg_attribute attributeForm = attributeFormArray[columnIndex];
StringInfo columnSkipListBuffer =
ReadFromFile(tableFile, currentColumnSkipListFileOffset, columnSkipListSize);
ColumnBlockSkipNode *columnSkipList =
DeserializeColumnSkipList(columnSkipListBuffer, attributeForm->attbyval,
attributeForm->attlen, stripeBlockCount);
blockSkipNodeArray[columnIndex] = columnSkipList;
currentColumnSkipListFileOffset += columnSkipListSize;
}
/* table contains additional columns added after this stripe is created */
for (columnIndex = stripeColumnCount; columnIndex < columnCount; columnIndex++)
{
ColumnBlockSkipNode *columnSkipList = NULL;
uint32 blockIndex = 0;
/* create empty ColumnBlockSkipNode for missing columns*/
columnSkipList = palloc0(stripeBlockCount * sizeof(ColumnBlockSkipNode));
for (blockIndex = 0; blockIndex < stripeBlockCount; blockIndex++)
{
columnSkipList->rowCount = 0;
columnSkipList->hasMinMax = false;
columnSkipList->minimumValue = 0;
columnSkipList->maximumValue = 0;
columnSkipList->existsBlockOffset = 0;
columnSkipList->valueBlockOffset = 0;
columnSkipList->existsLength = 0;
columnSkipList->valueLength = 0;
columnSkipList->valueCompressionType = COMPRESSION_NONE;
}
blockSkipNodeArray[columnIndex] = columnSkipList;
}
stripeSkipList = palloc0(sizeof(StripeSkipList));
stripeSkipList->blockSkipNodeArray = blockSkipNodeArray;
stripeSkipList->columnCount = columnCount;
stripeSkipList->blockCount = stripeBlockCount;
return stripeSkipList;
}
/*
* SelectedBlockMask walks over each column's blocks and checks if a block can
* be filtered without reading its data. The filtering happens when all rows in
* the block can be refuted by the given qualifier conditions.
*/
static bool *
SelectedBlockMask(StripeSkipList *stripeSkipList, List *projectedColumnList,
List *whereClauseList)
{
bool *selectedBlockMask = NULL;
ListCell *columnCell = NULL;
uint32 blockIndex = 0;
List *restrictInfoList = BuildRestrictInfoList(whereClauseList);
selectedBlockMask = palloc0(stripeSkipList->blockCount * sizeof(bool));
memset(selectedBlockMask, true, stripeSkipList->blockCount * sizeof(bool));
foreach(columnCell, projectedColumnList)
{
Var *column = lfirst(columnCell);
uint32 columnIndex = column->varattno - 1;
FmgrInfo *comparisonFunction = NULL;
Node *baseConstraint = NULL;
/* if this column's data type doesn't have a comparator, skip it */
comparisonFunction = GetFunctionInfoOrNull(column->vartype, BTREE_AM_OID,
BTORDER_PROC);
if (comparisonFunction == NULL)
{
continue;
}
baseConstraint = BuildBaseConstraint(column);
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
{
bool predicateRefuted = false;
List *constraintList = NIL;
ColumnBlockSkipNode *blockSkipNodeArray =
stripeSkipList->blockSkipNodeArray[columnIndex];
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
/*
* A column block with comparable data type can miss min/max values
* if all values in the block are NULL.
*/
if (!blockSkipNode->hasMinMax)
{
continue;
}
UpdateConstraint(baseConstraint, blockSkipNode->minimumValue,
blockSkipNode->maximumValue);
constraintList = list_make1(baseConstraint);
predicateRefuted = predicate_refuted_by(constraintList, restrictInfoList);
if (predicateRefuted)
{
selectedBlockMask[blockIndex] = false;
}
}
}
return selectedBlockMask;
}
/*
* GetFunctionInfoOrNull first resolves the operator for the given data type,
* access method, and support procedure. The function then uses the resolved
* operator's identifier to fill in a function manager object, and returns
* this object. This function is based on a similar function from CitusDB's code.
*/
FmgrInfo *
GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId)
{
FmgrInfo *functionInfo = NULL;
Oid operatorClassId = InvalidOid;
Oid operatorFamilyId = InvalidOid;
Oid operatorId = InvalidOid;
/* get default operator class from pg_opclass for datum type */
operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
if (operatorClassId == InvalidOid)
{
return NULL;
}
operatorFamilyId = get_opclass_family(operatorClassId);
if (operatorFamilyId == InvalidOid)
{
return NULL;
}
operatorId = get_opfamily_proc(operatorFamilyId, typeId, typeId, procedureId);
if (operatorId != InvalidOid)
{
functionInfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
/* fill in the FmgrInfo struct using the operatorId */
fmgr_info(operatorId, functionInfo);
}
return functionInfo;
}
/*
* BuildRestrictInfoList builds restrict info list using the selection criteria,
* and then return this list. The function is copied from CitusDB's shard pruning
* logic.
*/
static List *
BuildRestrictInfoList(List *whereClauseList)
{
List *restrictInfoList = NIL;
ListCell *qualCell = NULL;
foreach(qualCell, whereClauseList)
{
RestrictInfo *restrictInfo = NULL;
Node *qualNode = (Node *) lfirst(qualCell);
restrictInfo = make_simple_restrictinfo((Expr *) qualNode);
restrictInfoList = lappend(restrictInfoList, restrictInfo);
}
return restrictInfoList;
}
/*
* BuildBaseConstraint builds and returns a base constraint. This constraint
* implements an expression in the form of (var <= max && var >= min), where
* min and max values represent a block's min and max values. These block
* values are filled in after the constraint is built. This function is based
* on a similar function from CitusDB's shard pruning logic.
*/
static Node *
BuildBaseConstraint(Var *variable)
{
Node *baseConstraint = NULL;
OpExpr *lessThanExpr = NULL;
OpExpr *greaterThanExpr = NULL;
lessThanExpr = MakeOpExpression(variable, BTLessEqualStrategyNumber);
greaterThanExpr = MakeOpExpression(variable, BTGreaterEqualStrategyNumber);
baseConstraint = make_and_qual((Node *) lessThanExpr, (Node *) greaterThanExpr);
return baseConstraint;
}
/*
* MakeOpExpression builds an operator expression node. This operator expression
* implements the operator clause as defined by the variable and the strategy
* number. The function is copied from CitusDB's shard pruning logic.
*/
static OpExpr *
MakeOpExpression(Var *variable, int16 strategyNumber)
{
Oid typeId = variable->vartype;
Oid typeModId = variable->vartypmod;
Oid collationId = variable->varcollid;
Oid accessMethodId = BTREE_AM_OID;
Oid operatorId = InvalidOid;
Const *constantValue = NULL;
OpExpr *expression = NULL;
/* Load the operator from system catalogs */
operatorId = GetOperatorByType(typeId, accessMethodId, strategyNumber);
constantValue = makeNullConst(typeId, typeModId, collationId);
/* Now make the expression with the given variable and a null constant */
expression = (OpExpr *) make_opclause(operatorId,
InvalidOid, /* no result type yet */
false, /* no return set */
(Expr *) variable,
(Expr *) constantValue,
InvalidOid, collationId);
/* Set implementing function id and result type */
expression->opfuncid = get_opcode(operatorId);
expression->opresulttype = get_func_rettype(expression->opfuncid);
return expression;
}
/*
* GetOperatorByType returns operator Oid for the given type, access method,
* and strategy number. Note that this function incorrectly errors out when
* the given type doesn't have its own operator but can use another compatible
* type's default operator. The function is copied from CitusDB's shard pruning
* logic.
*/
static Oid
GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber)
{
/* Get default operator class from pg_opclass */
Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
Oid operatorFamily = get_opclass_family(operatorClassId);
Oid operatorId = get_opfamily_member(operatorFamily, typeId, typeId, strategyNumber);
return operatorId;
}
/*
* UpdateConstraint updates the base constraint with the given min/max values.
* The function is copied from CitusDB's shard pruning logic.
*/
static void
UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue)
{
BoolExpr *andExpr = (BoolExpr *) baseConstraint;
Node *lessThanExpr = (Node *) linitial(andExpr->args);
Node *greaterThanExpr = (Node *) lsecond(andExpr->args);
Node *minNode = get_rightop((Expr *) greaterThanExpr);
Node *maxNode = get_rightop((Expr *) lessThanExpr);
Const *minConstant = NULL;
Const *maxConstant = NULL;
Assert(IsA(minNode, Const));
Assert(IsA(maxNode, Const));
minConstant = (Const *) minNode;
maxConstant = (Const *) maxNode;
minConstant->constvalue = minValue;
maxConstant->constvalue = maxValue;
minConstant->constisnull = false;
maxConstant->constisnull = false;
minConstant->constbyval = true;
maxConstant->constbyval = true;
}
/*
* SelectedBlockSkipList constructs a new StripeSkipList in which the
* non-selected blocks are removed from the given stripeSkipList.
*/
static StripeSkipList *
SelectedBlockSkipList(StripeSkipList *stripeSkipList, bool *selectedBlockMask)
{
StripeSkipList *SelectedBlockSkipList = NULL;
ColumnBlockSkipNode **selectedBlockSkipNodeArray = NULL;
uint32 selectedBlockCount = 0;
uint32 blockIndex = 0;
uint32 columnIndex = 0;
uint32 columnCount = stripeSkipList->columnCount;
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
{
if (selectedBlockMask[blockIndex])
{
selectedBlockCount++;
}
}
selectedBlockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
uint32 selectedBlockIndex = 0;
selectedBlockSkipNodeArray[columnIndex] = palloc0(selectedBlockCount *
sizeof(ColumnBlockSkipNode));
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
{
if (selectedBlockMask[blockIndex])
{
selectedBlockSkipNodeArray[columnIndex][selectedBlockIndex] =
stripeSkipList->blockSkipNodeArray[columnIndex][blockIndex];
selectedBlockIndex++;
}
}
}
SelectedBlockSkipList = palloc0(sizeof(StripeSkipList));
SelectedBlockSkipList->blockSkipNodeArray = selectedBlockSkipNodeArray;