forked from Jonathan-Rosenberg/delta-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathversion_log.go
112 lines (94 loc) · 2.3 KB
/
version_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package deltago
import (
"github.com/csimplestring/delta-go/action"
"github.com/csimplestring/delta-go/iter"
"github.com/csimplestring/delta-go/store"
)
type VersionLog interface {
Version() int64
Actions() ([]action.Action, error)
ActionIter() (iter.Iter[action.Action], error)
}
var _ VersionLog = &InMemVersionLog{}
var _ VersionLog = &MemOptimizedVersionLog{}
type InMemVersionLog struct {
version int64
actions []action.Action
}
func (v *InMemVersionLog) Version() int64 {
return v.version
}
func (v *InMemVersionLog) Actions() ([]action.Action, error) {
return v.actions, nil
}
func (v *InMemVersionLog) ActionIter() (iter.Iter[action.Action], error) {
return iter.FromSlice(v.actions), nil
}
type MemOptimizedVersionLog struct {
version int64
path string
store store.Store
}
func (m *MemOptimizedVersionLog) Version() int64 {
return m.version
}
func (m *MemOptimizedVersionLog) Actions() ([]action.Action, error) {
i, err := m.store.Read(m.path)
if err != nil {
return nil, err
}
defer i.Close()
return iter.Map(i, func(t string) (action.Action, error) {
return action.FromJson(t)
})
}
func (m *MemOptimizedVersionLog) ActionIter() (iter.Iter[action.Action], error) {
i, err := m.store.Read(m.path)
if err != nil {
return nil, err
}
defer i.Close()
mapIter := &iter.MapIter[string, action.Action]{
It: i,
Mapper: func(s string) (action.Action, error) {
return action.FromJson(s)
},
}
return mapIter, nil
}
type MemOptimizedCheckpoint struct {
version int64
path string
store store.Store
cr *checkpointReader
}
func (m *MemOptimizedCheckpoint) Version() int64 {
return m.version * -1
}
func (m *MemOptimizedCheckpoint) Actions() ([]action.Action, error) {
cr := *(m.cr)
i, err := cr.Read(m.path)
if err != nil {
return nil, err
}
defer i.Close()
return iter.Map(i, func(a action.Action) (action.Action, error) {
if a.Wrap().MetaData != nil {
md := a.Wrap().MetaData
if md.Configuration == nil {
md.Configuration = map[string]string{}
}
if md.PartitionColumns == nil {
md.PartitionColumns = []string{}
}
if md.Format.Options == nil {
md.Format.Options = map[string]string{}
}
}
return a, nil
})
}
func (m *MemOptimizedCheckpoint) ActionIter() (iter.Iter[action.Action], error) {
cr := *(m.cr)
return cr.Read(m.path)
}