Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEVP-254] Prevent potential state inconsistency #7

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions src/prevayler_clj_aws/core.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
(ns prevayler-clj-aws.core
(:require
[base64-clj.core :as base64]
[clojure.java.io :as io]
[cognitect.aws.client.api :as aws]
[prevayler-clj.prevayler4 :refer [Prevayler]]
[taoensso.nippy :as nippy]
[prevayler-clj-aws.util :as util])
[base64-clj.core :as base64]
[clojure.java.io :as io]
[cognitect.aws.client.api :as aws]
[prevayler-clj.prevayler4 :refer [Prevayler]]
[taoensso.nippy :as nippy]
[prevayler-clj-aws.util :as util])
(:import
[clojure.lang IDeref]
[java.io ByteArrayOutputStream Closeable]))
[clojure.lang IDeref]
[java.io ByteArrayOutputStream Closeable]))

(defn- marshal [value]
(-> (nippy/freeze value)
Expand All @@ -23,56 +23,60 @@
nippy/thaw))

(defn- snapshot-exists? [s3-cli bucket snapshot-path]
(->> (util/aws-invoke s3-cli {:op :ListObjects
(->> (util/aws-invoke s3-cli {:op :ListObjects
:request {:Bucket bucket
:Prefix snapshot-path}})
:Contents
(some #(= snapshot-path (:Key %)))))

(defn- read-snapshot [s3-cli bucket snapshot-path]
(if (snapshot-exists? s3-cli bucket snapshot-path)
(-> (util/aws-invoke s3-cli {:op :GetObject
(-> (util/aws-invoke s3-cli {:op :GetObject
:request {:Bucket bucket
:Key snapshot-path}})
:Key snapshot-path}})
:Body
unmarshal)
{:partkey 0}))

(defn- save-snapshot! [s3-cli bucket snapshot-path snapshot]
(util/aws-invoke s3-cli {:op :PutObject
(util/aws-invoke s3-cli {:op :PutObject
:request {:Bucket bucket
:Key snapshot-path
:Body (marshal snapshot)}}))
:Key snapshot-path
:Body (marshal snapshot)}}))

(defn- read-items [dynamo-cli table partkey page-size]
(letfn [(read-page [exclusive-start-key]
(let [result (util/aws-invoke
dynamo-cli
{:op :Query
:request {:TableName table
:KeyConditionExpression "partkey = :partkey"
:ExpressionAttributeValues {":partkey" {:N (str partkey)}}
:Limit page-size
:ExclusiveStartKey exclusive-start-key}})
dynamo-cli
{:op :Query
:request {:TableName table
:KeyConditionExpression "partkey = :partkey"
:ExpressionAttributeValues {":partkey" {:N (str partkey)}}
:Limit page-size
:ExclusiveStartKey exclusive-start-key}})
{items :Items last-key :LastEvaluatedKey} result]
(lazy-cat
(map (comp unmarshal :B :content) items)
(if (seq last-key)
(read-page last-key)
[]))))]
(map (comp unmarshal :B :content) items)
(if (seq last-key)
(read-page last-key)
[]))))]
(read-page {:order {:N "0"} :partkey {:N (str partkey)}})))

(defn- restore-events! [dynamo-cli handler state-atom table partkey page-size]
(let [items (read-items dynamo-cli table partkey page-size)]
(doseq [[timestamp event] items]
(swap! state-atom handler event timestamp))))
(doseq [[timestamp event expected-state-hash] items]
(let [state (swap! state-atom handler event timestamp)]
(when (and expected-state-hash ; Old journals don't have this state hash saved (2023-10-25)
(not= (hash state) expected-state-hash))
(println "Inconsistent state detected after restoring event:\n" event)
(throw (IllegalStateException. "Inconsistent state detected during event journal replay. https://github.com/klauswuestefeld/prevayler-clj/blob/master/reference.md#inconsistent-state-detected")))))))

(defn- write-event! [dynamo-cli table partkey order event]
(util/aws-invoke dynamo-cli {:op :PutItem
(util/aws-invoke dynamo-cli {:op :PutItem
:request {:TableName table
:Item {:partkey {:N (str partkey)}
:order {:N (str order)}
:content {:B (marshal event)}}}}))
:Item {:partkey {:N (str partkey)}
:order {:N (str order)}
:content {:B (marshal event)}}}}))

(defn prevayler!
"Creates a new prevayler instance.
Expand All @@ -88,13 +92,13 @@
`:s3-bucket`: the name of the bucket where the snapshot will be stored
`:snapshot-path` (optional): the path inside the s3-bucket where the snapshot will be stored, default is \"snapshot\""
[{:keys [initial-state business-fn timestamp-fn aws-opts]
:or {initial-state {}
timestamp-fn #(System/currentTimeMillis)}}]
:or {initial-state {}
timestamp-fn #(System/currentTimeMillis)}}]
(let [{:keys [dynamodb-client s3-client dynamodb-table snapshot-path s3-bucket page-size]
:or {dynamodb-client (aws/client {:api :dynamodb})
s3-client (aws/client {:api :s3})
snapshot-path "snapshot"
page-size 1000}} aws-opts
:or {dynamodb-client (aws/client {:api :dynamodb})
s3-client (aws/client {:api :s3})
snapshot-path "snapshot"
page-size 1000}} aws-opts
{state :state old-partkey :partkey} (read-snapshot s3-client s3-bucket snapshot-path)
state-atom (atom (or state initial-state))
new-partkey (inc old-partkey)
Expand All @@ -109,11 +113,13 @@
(reify
Prevayler
(handle! [this event]
(locking this ; (I)solation: strict serializability.
(locking this ; (I)solation: strict serializability.
(let [timestamp (timestamp-fn)
new-state (business-fn @state-atom event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.)
(write-event! dynamodb-client dynamodb-table new-partkey (swap! order-atom inc) [timestamp event]) ; (D)urability
(reset! state-atom new-state)))) ; (A)tomicity
(write-event! dynamodb-client dynamodb-table new-partkey
(swap! order-atom inc)
[timestamp event (hash new-state)]) ; (D)urability
(reset! state-atom new-state)))) ; (A)tomicity
(timestamp [_] (timestamp-fn))

IDeref (deref [_] @state-atom)
Expand Down
10 changes: 5 additions & 5 deletions test/prevayler_clj_aws/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@
(testing "only replay events since last restart"
(let [opts (gen-opts :initial-state [] :business-fn (fn [state event _] (conj state event)))
prev1 (prev! opts)
_ (prevayler/handle! prev1 :previous-event)
_ (prevayler/handle! prev1 1)
prev2 (prev! opts)
_ (prevayler/handle! prev2 :latest-event)
prev3 (prev! (assoc opts :business-fn (fn [state event _] {:state state :event event})))]
(is (= {:state [:previous-event] :event :latest-event} @prev3))))
(testing "replay all events since last restart"
_ (prevayler/handle! prev2 2)
prev3 (prev! opts)]
(is (= [1 2] @prev3))))
(testing "replay more than one page"
(let [opts (gen-opts :initial-state []
:business-fn (fn [state event _] (conj state event))
:aws-opts {:page-size 1})
Expand Down