-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathprojection.go
190 lines (169 loc) · 7.04 KB
/
projection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package dogma
import (
"context"
"time"
)
// A ProjectionMessageHandler builds a projection from events.
//
// The term "read-model" is often used interchangeably with "projection".
//
// Projections use an optimistic concurrency control (OCC) protocol to ensure
// that the engine applies each event to the projection exactly once.
//
// The OCC protocol uses a key/value store that associates engine-defined
// "resources" with their "version". These are they keys and values,
// respectively.
//
// The OCC store can be challenging to implement. The
// [github.com/dogmatiq/projectionkit] module provides adaptors that implement
// the OCC protocol using popular database systems, such as PostgreSQL and
// DynamoDB.
type ProjectionMessageHandler interface {
// Configure describes the handler's configuration to the engine.
Configure(ProjectionConfigurer)
// HandleEvent updates the projection to reflect the occurrence of an event.
//
// r, c and n are the inputs to the OCC store.
//
// - r is a key that identifies some engine-defined resource
// - c is engine's perception of the current version of r
// - n is the next version of r, made by handling this event
//
// If c is the current version of r in the OCC store, the method MUST
// attempt to atomically update the projection and the version of r to be n.
// On success, ok is true and err is nil.
//
// If c is not the current version of r an OCC conflict has occurred. The
// method MUST return with ok set to false and without updating the
// projection.
//
// r, c and n are engine-defined; the application SHOULD NOT infer any
// meaning from their content. The "current" version of a new resource is
// the empty byte-slice. nil and empty slices are interchangeable.
//
// The engine MAY provide specific guarantees about the order in which it
// supplies events to the handler. To maximize portability across engines,
// the handler SHOULD NOT assume any specific ordering. The engine MAY call
// this method concurrently from separate goroutines or operating system
// processes.
HandleEvent(
ctx context.Context,
r, c, n []byte,
s ProjectionEventScope,
e Event,
) (ok bool, err error)
// ResourceVersion returns the current version of a resource.
//
// It returns an empty slice if r is not in the OCC store.
ResourceVersion(ctx context.Context, r []byte) ([]byte, error)
// CloseResource informs the handler that the engine has no further use for
// a resource.
//
// If r is present in the OCC store the handler SHOULD remove it.
CloseResource(ctx context.Context, r []byte) error
// Compact attempts to reduce the size of the projection.
//
// For example, it may delete unused data, or merge overly granular data.
//
// The handler SHOULD compact the projection incrementally such that it
// makes some progress even if the context's deadline expires.
Compact(context.Context, ProjectionCompactScope) error
}
// A ProjectionConfigurer configures the engine for use with a specific
// projection message handler.
type ProjectionConfigurer interface {
// Identity configures the handler's identity.
//
// n is a short human-readable name. It MUST be unique within the
// application at any given time, but MAY change over the handler's
// lifetime. It MUST contain solely printable, non-space UTF-8 characters.
// It must be between 1 and 255 bytes (not characters) in length.
//
// k is a unique key used to associate engine state with the handler. The
// key SHOULD NOT change over the handler's lifetime. k MUST be an RFC 4122
// UUID, such as "5195fe85-eb3f-4121-84b0-be72cbc5722f".
//
// Use of hard-coded literals for both values is RECOMMENDED.
Identity(n string, k string)
// Routes configures the engine to route certain message types to and from
// the handler.
//
// Projection handlers support the HandlesEvent() route type.
Routes(...ProjectionRoute)
// DeliveryPolicy configures how the engine delivers events to the handler.
//
// The default policy is UnicastProjectionDeliveryPolicy.
DeliveryPolicy(ProjectionDeliveryPolicy)
// Disable prevents the handler from receiving any messages.
//
// The engine MUST NOT call any methods other than Configure() on a disabled
// handler.
//
// Disabling a handler is useful when the handler's configuration prevents
// it from operating, such as when it's missing a required dependency,
// without requiring the user to conditionally register the handler with the
// application.
Disable(...DisableOption)
}
// ProjectionEventScope performs engine operations within the context of a call
// to the HandleEvent() method of a [ProjectionMessageHandler].
type ProjectionEventScope interface {
// RecordedAt returns the time at which the event occurred.
RecordedAt() time.Time
// IsPrimaryDelivery returns true on one of the application instances that
// receive the event, and false on all other instances.
//
// This method is useful when the projection must perform some specific
// operation once per event, such as updating a shared resource that's used
// by all applications, while still delivering the event to all instances of
// the application.
IsPrimaryDelivery() bool
// Log records an informational message.
Log(format string, args ...any)
}
// ProjectionCompactScope performs engine operations within the context of a
// call to the Compact() method of a [ProjectionMessageHandler].
type ProjectionCompactScope interface {
// Now returns the current engine time.
//
// The handler SHOULD use the returned time to implement compaction logic
// that has some time-based component, such as removing data older than a
// certain age.
//
// Under normal operating conditions the engine SHOULD return the current
// local time. The engine MAY return a different time under some
// circumstances, such as when executing tests.
Now() time.Time
// Log records an informational message.
Log(format string, args ...any)
}
// NoCompactBehavior is an embeddable type for [ProjectionMessageHandler]
// implementations that do not require compaction.
type NoCompactBehavior struct{}
// Compact does nothing.
func (NoCompactBehavior) Compact(context.Context, ProjectionCompactScope) error {
return nil
}
type (
// A ProjectionDeliveryPolicy describes how to deliver events to a
// projection message handler on engines that support concurrent or
// distributed execution of a single Dogma application.
ProjectionDeliveryPolicy interface{ isProjectionDeliveryPolicy() }
// UnicastProjectionDeliveryPolicy is the default
// [ProjectionDeliveryPolicy]. It delivers each event to a single instance
// of the application.
UnicastProjectionDeliveryPolicy struct{}
// BroadcastProjectionDeliveryPolicy is a [ProjectionDeliveryPolicy] that
// delivers each event to a all instance of the application.
BroadcastProjectionDeliveryPolicy struct {
// PrimaryFirst defers "secondary delivery" of events until after the
// "primary delivery" has completed.
PrimaryFirst bool
}
)
// ProjectionRoute describes a message type that's routed to a
// [ProjectionMessageHandler].
type ProjectionRoute interface {
Route
isProjectionRoute()
}