From de5127d59ce9fa5b900adf9c1a01b7f0994626c0 Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Wed, 12 Feb 2025 12:22:01 -0500 Subject: [PATCH 1/9] Fix nested shadow updates --- Project.toml | 2 +- src/ShadowFramework.jl | 214 +++++++++++++++------------- test/shadow_framework_integ_test.jl | 175 ++++++++++++++++++++--- test/shadow_framework_test.jl | 133 ++++++++++------- 4 files changed, 351 insertions(+), 173 deletions(-) diff --git a/Project.toml b/Project.toml index 2b70376..d394737 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "AWSCRT" uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7" -version = "0.4.2" +version = "0.5.0" [deps] CountDownLatches = "621fb831-fdad-4fff-93ac-1af7b7ed19e3" diff --git a/src/ShadowFramework.jl b/src/ShadowFramework.jl index d984a3f..548a56d 100644 --- a/src/ShadowFramework.jl +++ b/src/ShadowFramework.jl @@ -1,3 +1,24 @@ +""" + shadow_document_property_pre_update_function(shadow_document::AbstractDict, key::String, value)::Bool + +A function invoked when updating a property in the shadow document. +This allows you to replace the update behavior in this package if you want to implement custom update behavior for +a given property. +The parent [`ShadowFramework`](@ref) will be locked when this function is invoked. The lock is reentrant. + + !!! warning "Warning" + This function may only update the given `key`. This function cannot modify other properties in the `shadow_document`. + +Arguments: + + - `shadow_document (AbstractDict)`: The shadow document being updated. + - `key (String)`: The key in the `shadow_document` for the value being updated. + - `value (Any)`: The new value for the `key`. + +Returns a `Bool` indicating whether an update was done. +""" +const ShadowDocumentPropertyPreUpdateFunction = Function + """ shadow_document_property_update_callback(value) @@ -6,7 +27,7 @@ The parent [`ShadowFramework`](@ref) will be locked when this callback is invoke Arguments: - - `value (Any)`: The value of the shadow property that was just set in the shadow document. + - `value (Any)`: The new value of the property in the shadow document. """ const ShadowDocumentPropertyUpdateCallback = Function @@ -23,7 +44,7 @@ Arguments: const ShadowDocumentPreUpdateCallback = Function """ - shadow_document_post_update_callback(shadow_document::T) + shadow_document_post_update_callback(shadow_document::AbstractDict) A callback invoked after the shadow document is updated. The parent [`ShadowFramework`](@ref) will not be locked when this callback is invoked. @@ -31,41 +52,36 @@ This is a good place to persist the shadow document to disk. Arguments: - - `shadow_document (T)`: The updated shadow document. + - `shadow_document (AbstractDict)`: The updated shadow document. """ const ShadowDocumentPostUpdateCallback = Function -mutable struct ShadowFramework{T} +mutable struct ShadowFramework const _id::Int const _shadow_document_lock::ReentrantLock const _shadow_client::Union{ShadowClient,Nothing} # set to nothing in unit tests - const _shadow_document::T + const _shadow_document::AbstractDict const _shadow_document_property_callbacks::Dict{String,ShadowDocumentPropertyUpdateCallback} const _shadow_document_pre_update_callback::ShadowDocumentPreUpdateCallback const _shadow_document_post_update_callback::ShadowDocumentPostUpdateCallback + const _shadow_document_property_pre_update_funcs::Dict{String,ShadowDocumentPropertyPreUpdateFunction} _sync_latch::CountDownLatch function ShadowFramework( id::Int, shadow_document_lock::ReentrantLock, shadow_client::Union{ShadowClient,Nothing}, - shadow_document::T, - shadow_document_property_callbacks::Dict{String,ShadowDocumentPropertyUpdateCallback}, + shadow_document::AbstractDict, + shadow_document_property_callbacks::Dict{String,<:ShadowDocumentPropertyUpdateCallback}, shadow_document_pre_update_callback::ShadowDocumentPreUpdateCallback, shadow_document_post_update_callback::ShadowDocumentPostUpdateCallback, - ) where {T} - if T <: AbstractDict - if !haskey(shadow_document, "version") - shadow_document["version"] = 1 - end - elseif !hasproperty(shadow_document, :version) - error("The given shadow document type $T must have a property `version::Int` used for storing the \ - shadow document version.") - elseif !ismutabletype(T) - error("The given shadow document type $T must be mutable.") + shadow_document_property_pre_update_funcs::Dict{String,<:ShadowDocumentPropertyPreUpdateFunction}, + ) + if !haskey(shadow_document, "version") + shadow_document["version"] = 1 end - return new{T}( + return new( id, shadow_document_lock, shadow_client, @@ -73,6 +89,7 @@ mutable struct ShadowFramework{T} shadow_document_property_callbacks, shadow_document_pre_update_callback, shadow_document_post_update_callback, + shadow_document_property_pre_update_funcs, CountDownLatch(1), ) end @@ -100,25 +117,23 @@ Arguments: - `connection (MQTTConnection)`: The connection. - `thing_name (String)`: Name of the Thing in AWS IoT under which the shadow document will exist. - `shadow_name (Union{String,Nothing})`: Shadow name for a named shadow document or `nothing` for an unnamed shadow document. - - `shadow_document (T)`: The local shadow document. This can be an `AbstractDict` or a mutable struct. - This must include keys/properties for all keys in the shadow documents published by the broker. - If this type is not an AbstractDict and is missing a property of the desired shadow document state, an error will - be logged and there will be a permanent difference between the reported and desired state. - This must also include a `version (Int)` key/property which will store the shadow document version. - It is recommended that you persist this to disk. - You can write the latest state to disk inside `shadow_document_post_update_callback`. - You should also then load it from disk and pass it as this parameter during the start of your application. + - `shadow_document (AbstractDict)`: The local shadow document. This must include all keys in the + shadow documents published by the broker. This must also include a `version (Int)` key which will store the + shadow document version. It is recommended that you persist this to disk. You can write the latest state to + disk inside `shadow_document_post_update_callback`. You should also then load it from disk and pass it as + this parameter during the start of your application. - `shadow_document_property_callbacks (Dict{String,ShadowDocumentPropertyUpdateCallback})`: An optional set of callbacks. A given callback will be fired for each update to the shadow document property matching the given key. Note that the callback is fired only when shadow properties are changed. A shadow property change occurs when the value of the shadow property is changed to a new value which is not equal to the prior value. This is implemented using `!isequal()`. - Please ensure a satisfactory definition (satifactory to your application's needs) of `isequal` for all types + Please ensure a satisfactory definition (satisfactory to your application's needs) of `isequal` for all types used in the shadow document. You will only need to worry about this if you are using custom JSON deserialization. - `shadow_document_pre_update_callback (ShadowDocumentPreUpdateCallback)`: An optional callback which will be fired immediately before updating any shadow document properties. This is always fired, even if no shadow properties will be changed. - `shadow_document_post_update_callback (ShadowDocumentPostUpdateCallback)`: An optional callback which will be fired immediately after updating any shadow document properties. This is fired only if shadow properties were changed. + - `shadow_document_property_pre_update_funcs (Dict{String,ShadowDocumentPropertyPreUpdateFunction})`: An optional set of functions which customize the update behavior of certain shadow document properties. See [`ShadowDocumentPropertyPreUpdateFunction`](@ref) for more information. - `id (Int)`: A unique ID which disambiguates log messages from multiple shadow frameworks. See also [`ShadowDocumentPropertyUpdateCallback`](@ref), [`ShadowDocumentPreUpdateCallback`](@ref), [`ShadowDocumentPostUpdateCallback`](@ref), [`MQTTConnection`](@ref). @@ -127,15 +142,19 @@ function ShadowFramework( connection::MQTTConnection, thing_name::String, shadow_name::Union{String,Nothing}, - shadow_document::T; + shadow_document::AbstractDict; shadow_document_property_callbacks::Dict{String,ShadowDocumentPropertyUpdateCallback} = Dict{ String, ShadowDocumentPropertyUpdateCallback, }(), shadow_document_pre_update_callback::ShadowDocumentPreUpdateCallback = v -> nothing, shadow_document_post_update_callback::ShadowDocumentPostUpdateCallback = v -> nothing, + shadow_document_property_pre_update_funcs::Dict{String,ShadowDocumentPropertyPreUpdateFunction} = Dict{ + String, + ShadowDocumentPropertyPreUpdateFunction, + }(), id = 1, -) where {T} +) return ShadowFramework( id, ReentrantLock(), @@ -144,6 +163,7 @@ function ShadowFramework( shadow_document_property_callbacks, shadow_document_pre_update_callback, shadow_document_post_update_callback, + shadow_document_property_pre_update_funcs, ) end @@ -176,7 +196,7 @@ Returns the [`ShadowClient`](@ref) for `sf`. shadow_client(sf::ShadowFramework) = sf._shadow_client """ - subscribe(sf::ShadowFramework{T}) where {T} + subscribe(sf::ShadowFramework) Subscribes to the shadow document's topics and begins processing updates. The `sf` is always locked before reading/writing from/to the shadow document. @@ -187,7 +207,7 @@ You can call `wait_until_synced(sf)` if you need to wait until this synchronizat $publish_return_docs """ -function subscribe(sf::ShadowFramework{T}) where {T} +function subscribe(sf::ShadowFramework) sf._sync_latch = CountDownLatch(1) callback = _create_sf_callback(sf) task, id = subscribe(sf._shadow_client, AWS_MQTT_QOS_AT_LEAST_ONCE, callback) @@ -202,7 +222,7 @@ function subscribe(sf::ShadowFramework{T}) where {T} end """ - unsubscribe(sf::ShadowFramework{T}) where {T} + unsubscribe(sf::ShadowFramework) Unsubscribes from the shadow document's topics and stops processing updates. After calling this, `wait_until_synced(sf)` will again block until the first publish in response to @@ -210,11 +230,13 @@ calling `subscribe(sf)`. $_iot_shadow_unsubscribe_return_docs """ -function unsubscribe(sf::ShadowFramework{T}) where {T} +function unsubscribe(sf::ShadowFramework) return unsubscribe(sf._shadow_client) end """ + publish_current_state(sf::ShadowFramework; include_version::Bool = true) + Publishes the current state of the shadow document. Arguments: @@ -222,7 +244,7 @@ Arguments: $publish_return_docs """ -function publish_current_state(sf::ShadowFramework{T}; include_version::Bool = true) where {T} +function publish_current_state(sf::ShadowFramework; include_version::Bool = true) current_state = _create_reported_state_payload(sf; include_version) @debug "SF-$(sf._id): publishing shadow update" current_state return publish(sf._shadow_client, "/update", current_state, AWS_MQTT_QOS_AT_LEAST_ONCE) @@ -255,7 +277,7 @@ function wait_until_synced(f::Function, sf::ShadowFramework) return nothing end -function _create_sf_callback(sf::ShadowFramework{T}) where {T} +function _create_sf_callback(sf::ShadowFramework) return function shadow_callback( shadow_client::ShadowClient, topic::String, @@ -268,7 +290,7 @@ function _create_sf_callback(sf::ShadowFramework{T}) where {T} if endswith(topic, "/get/accepted") # process any delta state from when we last reported our current state. if something changed, report our # current state again. there's a chance the delta state is permanent due to the user's configuration - # (isequals implementation, struct definition, etc.). we need to avoid endless communications. + # (isequals implementation, etc.). we need to avoid endless communications. updated = _update_local_shadow_from_get!(sf, payload) if updated publish_current_state(sf) @@ -287,7 +309,7 @@ function _create_sf_callback(sf::ShadowFramework{T}) where {T} # and publish our new state updated = _update_local_shadow_from_delta!(sf, payload) # we still need to check updated here, because there's a chance the delta state is permanent due to the - # user's configuration (isequals implementation, struct definition, etc.). we need to avoid endless communications. + # user's configuration (isequals implementation, etc.). we need to avoid endless communications. if updated publish_current_state(sf) else @@ -304,12 +326,12 @@ function _create_sf_callback(sf::ShadowFramework{T}) where {T} end """ - _update_local_shadow_from_get!(sf::ShadowFramework{T}, payload_str::String) where {T} + _update_local_shadow_from_get!(sf::ShadowFramework, payload_str::String) Performs a local shadow update using the delta state from a /get/accepted document. Returns `true` if the local shadow was updated. """ -function _update_local_shadow_from_get!(sf::ShadowFramework{T}, payload_str::String) where {T} +function _update_local_shadow_from_get!(sf::ShadowFramework, payload_str::String) payload = JSON.parse(payload_str) version = get(payload, "version", nothing) return lock(sf) do @@ -317,9 +339,20 @@ function _update_local_shadow_from_get!(sf::ShadowFramework{T}, payload_str::Str _set_version!(sf._shadow_document, version) state = get(payload, "state", nothing) if state !== nothing + # first sync with the previously reported state to ensure that if our local copy and what was previously + # reported somehow got out of sync, we can catch up even if there is no delta state + reported = get(state, "reported", nothing) delta = get(state, "delta", nothing) - if delta !== nothing - return _do_local_shadow_update!(sf, delta) + if reported !== nothing + # also sync with the delta state to avoid having multiple shadow updates + if delta !== nothing + _recursive_merge!(reported, delta) + end + return _do_local_shadow_update!(sf, reported) + else + if delta !== nothing + return _do_local_shadow_update!(sf, delta) + end end end else @@ -329,13 +362,26 @@ function _update_local_shadow_from_get!(sf::ShadowFramework{T}, payload_str::Str end end +function _recursive_merge!(d::AbstractDict, other::AbstractDict) + for (k, v) in other + if haskey(d, k) + d[k] = _recursive_merge!(d[k], v) + else + d[k] = v + end + end + return d +end + +_recursive_merge!(d, other) = other + """ - _update_local_shadow_from_delta!(sf::ShadowFramework{T}, payload_str::String) where {T} + _update_local_shadow_from_delta!(sf::ShadowFramework, payload_str::String) Performs a local shadow update using the delta state from an /update/delta document. Returns `true` if the local shadow was updated. """ -function _update_local_shadow_from_delta!(sf::ShadowFramework{T}, payload_str::String) where {T} +function _update_local_shadow_from_delta!(sf::ShadowFramework, payload_str::String) payload = JSON.parse(payload_str) version = get(payload, "version", nothing) return lock(sf) do @@ -353,21 +399,21 @@ function _update_local_shadow_from_delta!(sf::ShadowFramework{T}, payload_str::S end """ - _do_local_shadow_update!(sf::ShadowFramework{T}, state::Dict{String,<:Any}) where {T} + _do_local_shadow_update!(sf::ShadowFramework, state::Dict{String,<:Any}) 1. Fires the pre-update callback 2. Updates each shadow property from `state` and fires its callback if an update occured 3. Fires the post-update callback 4. Returns `true` if any updated occured. """ -function _do_local_shadow_update!(sf::ShadowFramework{T}, state::Dict{String,<:Any}) where {T} +function _do_local_shadow_update!(sf::ShadowFramework, state::Dict{String,<:Any}) return lock(sf) do _fire_pre_update_callback(sf, state) any_updates = false for (k, v) in state - updated = _update_shadow_property!(sf, k, v) + updated = _update_shadow_property!(sf, sf._shadow_document, k, v) if updated - _fire_callback(sf, k, v) + _fire_callback(sf, k, sf._shadow_document[k]) any_updates = true end end @@ -376,7 +422,7 @@ function _do_local_shadow_update!(sf::ShadowFramework{T}, state::Dict{String,<:A end end -function _fire_pre_update_callback(sf::ShadowFramework{T}, state::Dict{String,<:Any}) where {T} +function _fire_pre_update_callback(sf::ShadowFramework, state::Dict{String,<:Any}) try sf._shadow_document_pre_update_callback(state) catch ex @@ -385,7 +431,7 @@ function _fire_pre_update_callback(sf::ShadowFramework{T}, state::Dict{String,<: return nothing end -function _fire_post_update_callback(sf::ShadowFramework{T}) where {T} +function _fire_post_update_callback(sf::ShadowFramework) try sf._shadow_document_post_update_callback(sf._shadow_document) catch ex @@ -394,7 +440,7 @@ function _fire_post_update_callback(sf::ShadowFramework{T}) where {T} return nothing end -function _fire_callback(sf::ShadowFramework{T}, key::String, value) where {T} +function _fire_callback(sf::ShadowFramework, key::String, value) if haskey(sf._shadow_document_property_callbacks, key) callback = sf._shadow_document_property_callbacks[key] try @@ -407,7 +453,7 @@ function _fire_callback(sf::ShadowFramework{T}, key::String, value) where {T} return nothing end -function _create_reported_state_payload(sf::ShadowFramework{T}; include_version = true) where {T} +function _create_reported_state_payload(sf::ShadowFramework; include_version = true) return lock(sf) do d = Dict() d["state"] = Dict("reported" => Dict(_get_shadow_property_pairs(sf._shadow_document))) @@ -420,64 +466,40 @@ end _get_shadow_property_pairs(doc::AbstractDict) = filter(it -> it[1] != "version", collect(doc)) -function _get_shadow_property_pairs(doc::T) where {T} - names = fieldnames(T) - out = Vector{Pair{String,Any}}(undef, length(names) - 1) - for i in eachindex(names) - fieldname = names[i] - if fieldname != :version - out[i] = String(fieldname) => getfield(doc, fieldname) - end - end - return out -end - """ - _update_shadow_property!(sf::ShadowFramework{<:AbstractDict}, key::String, value) + _update_shadow_property!(sf::ShadowFramework, doc::AbstractDict, key::String, value) Updates the shadow property if the new `value` is not equal to the current value at the `key`. +If the `value` is an `AbstractDict`, it is merged into the `doc` instead of overwriting the `key`. Returns `true` if an update occured. """ -function _update_shadow_property!(sf::ShadowFramework{<:AbstractDict}, key::String, value) - if haskey(sf._shadow_document, key) - if !isequal(sf._shadow_document[key], value) - sf._shadow_document[key] = value - return true +function _update_shadow_property!(sf::ShadowFramework, doc::AbstractDict, key::String, value) + return if value isa AbstractDict + updated = false + for (k, v) in collect(value) + updated |= _update_shadow_property!(sf, doc[key], k, v) end + updated else - sf._shadow_document[key] = value - return true - end - return false -end - -""" - _update_shadow_property!(sf::ShadowFramework{<:Any}, key::String, value) - -Updates the shadow property if the new `value` is not equal to the current value at the `key`. -Returns `true` if an update occured. -""" -function _update_shadow_property!(sf::ShadowFramework{<:Any}, key::String, value) - try - sym = Symbol(key) - v = getproperty(sf._shadow_document, sym) - if !isequal(v, value) - setproperty!(sf._shadow_document, sym, value) - return true + if haskey(sf._shadow_document_property_pre_update_funcs, key) + sf._shadow_document_property_pre_update_funcs[key](doc, key, value) + else + if !haskey(doc, key) || !isequal(doc[key], value) + doc[key] = value + true + else + false + end end - catch ex - @error "SF-$(sf._id): failed to update shadow property key=$key value=$value (you probably need to extend \ - your struct type with an additional property for this key)" exception = (ex, catch_backtrace()) end - return false end """ - _sync_version!(doc::T, payload_str::String) where {T} + _sync_version!(doc::AbstractDict, payload_str::String) Updates the local shadow's version number using the version in the `payload`. """ -function _sync_version!(doc::T, payload_str::String) where {T} +function _sync_version!(doc::AbstractDict, payload_str::String) payload = JSON.parse(payload_str) version = get(payload, "version", nothing) if _version_allows_update(doc, version) @@ -486,8 +508,8 @@ function _sync_version!(doc::T, payload_str::String) where {T} return nothing end -_version_allows_update(doc::T, version::Int) where {T} = version >= _version(doc) -_version_allows_update(doc::T, version::Nothing) where {T} = false +_version_allows_update(doc::AbstractDict, version::Int) = version >= _version(doc) +_version_allows_update(doc::AbstractDict, version::Nothing) = false _version(doc::AbstractDict) = doc["version"] _version(doc) = doc.version diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index 5dc987f..e0647e7 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -49,11 +49,6 @@ function maybe_get(d, keys...) end end -mutable struct ShadowDocMissingProperty - foo::Int - version::Int -end - @testset "unsubscribe" begin connection = new_mqtt_connection() shadow_name = random_shadow_name() @@ -370,6 +365,145 @@ end end end +@testset "updating a field in a nested dict causes only that field to get updated" begin + connection = new_mqtt_connection() + shadow_name = random_shadow_name() + doc = Dict("foo" => Dict("bar" => 1)) + + values_foo = [] + foo_cb = x -> push!(values_foo, x) + + values_post_update = [] + post_update_cb = x -> begin + push!(values_post_update, x) + end + + sf = ShadowFramework( + connection, + THING1_NAME, + shadow_name, + doc; + shadow_document_post_update_callback = post_update_cb, + shadow_document_property_callbacks = Dict{String,Function}("foo" => foo_cb), + ) + sc = shadow_client(sf) + + oobc = new_mqtt_connection() + oobsc = OOBShadowClient(oobc, THING1_NAME, shadow_name) + + try + @info "subscribing" + # wait for the first publish to finish, otherwise we will race it with our next update, which could arrive + # first and break this test + wait_until_synced(sf) do + fetch(subscribe(sf)[1]) + end + + # add foo.baz=2. foo.bar=1 should remain + @info "publishing out of band /update" + fetch( + publish( + oobsc.shadow_client, + "/update", + json(Dict("state" => Dict("desired" => Dict("foo" => Dict("baz" => 2))))), + AWS_MQTT_QOS_AT_LEAST_ONCE, + )[1], + ) + wait_for(() -> !isempty(values_post_update)) + @test doc["foo"] == Dict("bar" => 1, "baz" => 2) + + empty!(values_post_update) + + # remove foo.baz. foo.bar=1 should remain + @info "publishing out of band /update" + fetch( + publish( + oobsc.shadow_client, + "/update", + json(Dict("state" => Dict("desired" => Dict("foo" => Dict("baz" => nothing))))), + AWS_MQTT_QOS_AT_LEAST_ONCE, + )[1], + ) + wait_for(() -> !isempty(values_post_update)) + @test doc["foo"] == Dict("bar" => 1) + + @info "unsubscribing" + fetch(unsubscribe(sf)[1]) + @info "done unsubscribing" + fetch(unsubscribe(oobsc.shadow_client)[1]) + finally + fetch(publish(sc, "/delete", "", AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) + end +end + +@testset "the initial sync must sync the local shadow document even if there is no delta state" begin + connection = new_mqtt_connection() + shadow_name = random_shadow_name() + doc = Dict("foo" => 1) + + values_foo = [] + foo_cb = x -> push!(values_foo, x) + + values_post_update = [] + post_update_cb = x -> begin + push!(values_post_update, x) + end + + sf = ShadowFramework( + connection, + THING1_NAME, + shadow_name, + doc; + shadow_document_post_update_callback = post_update_cb, + shadow_document_property_callbacks = Dict{String,Function}("foo" => foo_cb), + ) + sc = shadow_client(sf) + + oobc = new_mqtt_connection() + oobsc = OOBShadowClient(oobc, THING1_NAME, shadow_name) + + update_msgs = [] + task, id = subscribe( + oobsc.shadow_client, + "/update", + AWS_MQTT_QOS_AT_LEAST_ONCE, + (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> + push!(update_msgs, (; topic, payload, dup, qos, retain)), + ) + fetch(task) + + try + # if we have reported state that's out of sync with our local shadow doc, and no delta state + @info "publishing out of band /update" + fetch( + publish( + oobsc.shadow_client, + "/update", + json(Dict("state" => Dict("reported" => Dict("foo" => 2), "desired" => Dict("foo" => 2)))), + AWS_MQTT_QOS_AT_LEAST_ONCE, + )[1], + ) + wait_for(() -> !isempty(update_msgs)) + + @info "subscribing" + # wait for the first publish to finish, otherwise we will race it with our next update, which could arrive + # first and break this test + wait_until_synced(sf) do + fetch(subscribe(sf)[1]) + end + + # our local copy should get updated even though there is no delta state + @test doc == Dict("foo" => 2) + + @info "unsubscribing" + fetch(unsubscribe(sf)[1]) + @info "done unsubscribing" + fetch(unsubscribe(oobsc.shadow_client)[1]) + finally + fetch(publish(sc, "/delete", "", AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) + end +end + @testset "updates are published only if the reported state changed" begin @testset "happy path" begin connection = new_mqtt_connection() @@ -463,10 +597,9 @@ end end @testset "desired state that can't be reconciled with the local shadow doesn't cause excessive publishing" begin - @warn "Missing property errors are expected in this test" connection = new_mqtt_connection() shadow_name = random_shadow_name() - doc = ShadowDocMissingProperty(1, 0) + doc = Dict("foo" => rand()) values_foo = [] foo_cb = x -> push!(values_foo, x) @@ -488,7 +621,11 @@ end doc; shadow_document_pre_update_callback = pre_update_cb, shadow_document_post_update_callback = post_update_cb, - shadow_document_property_callbacks = Dict{String,Function}("foo" => foo_cb), + shadow_document_property_callbacks = Dict("foo" => foo_cb), + shadow_document_property_pre_update_funcs = Dict("foo" => (doc, k, v) -> begin + doc[k] = rand() + return false # pretend there was no update to cause a permanent different + end), ) sc = shadow_client(sf) @@ -525,15 +662,13 @@ end fetch(subscribe(sf)[1]) end - # publish an /update which adds bar=1. this should be rejected because bar is not present in the struct. - # the local shadow should not be updated. an /update should not be published. + # publish an /update which wants foo=1. the local shadow should not be updated. an /update should not be published. @info "publishing out of band /update" - @info "the following shadow property error is expected" fetch( publish( oobsc.shadow_client, "/update", - json(Dict("state" => Dict("desired" => Dict("bar" => 1)))), + json(Dict("state" => Dict("desired" => Dict("foo" => 1)))), AWS_MQTT_QOS_AT_LEAST_ONCE, )[1], ) @@ -542,22 +677,21 @@ end @test length(update_msgs) == 2 @show update_msgs payloads = [JSON.parse(it.payload) for it in update_msgs] - @test any(it -> maybe_get(it, "state", "reported", "foo") == 1, payloads) # from the initial update since the shadow doc didn't exist - @test any(it -> maybe_get(it, "state", "desired", "bar") == 1, payloads) # from our desired state update above - # there should not be any other update because the bar update should not have been accepted + @test any(it -> maybe_get(it, "state", "reported", "foo") != 1, payloads) # from the initial update since the shadow doc didn't exist + @test any(it -> maybe_get(it, "state", "desired", "foo") == 1, payloads) # from our desired state update above + # there should not be any other update because the foo update should not have been accepted update_msgs = [] - # publish an /update which changes foo=2 (with bar=1 still set). this should be accepted. + # publish an /update which adds bar=2. this should be accepted. # the local shadow should be updated. an /update should be published. the broker will respond with another # /update/delta which should be ignored. @info "publishing second out of band /update" - @info "the following shadow property error is expected" fetch( publish( oobsc.shadow_client, "/update", - json(Dict("state" => Dict("desired" => Dict("foo" => 2)))), + json(Dict("state" => Dict("desired" => Dict("bar" => 2)))), AWS_MQTT_QOS_AT_LEAST_ONCE, )[1], ) @@ -566,9 +700,8 @@ end @test length(update_msgs) == 2 @show update_msgs payloads = [JSON.parse(it.payload) for it in update_msgs] - @test any(it -> maybe_get(it, "state", "desired", "foo") == 2, payloads) # from our desired state update above - @test any(it -> maybe_get(it, "state", "reported", "foo") == 2, payloads) # the response to our update - # there should not be any other update because the bar update should not have been accepted + @test any(it -> maybe_get(it, "state", "desired", "bar") == 2, payloads) # from our desired state update above + @test any(it -> maybe_get(it, "state", "reported", "bar") == 2, payloads) # the response to our update @info "unsubscribing" fetch(unsubscribe(sf)[1]) diff --git a/test/shadow_framework_test.jl b/test/shadow_framework_test.jl index d4692ad..6efc8a2 100644 --- a/test/shadow_framework_test.jl +++ b/test/shadow_framework_test.jl @@ -1,28 +1,12 @@ -mutable struct ExampleShadowDocument <: Comparable - foo::Int - bar::String - version::Int -end - -mutable struct BadShadowDocWithoutVersion - foo::Int - bar::String -end - -struct ImmutableShadowDocWithVersion - version::Int -end - -const EXAMPLE_SD_1_DICT = Dict("foo" => 1, "bar" => "a", "version" => 2) -const EXAMPLE_SD_1_STRUCT = ExampleShadowDocument(1, "a", 2) -const ALL_EXAMPLE_SD_1 = [EXAMPLE_SD_1_DICT, EXAMPLE_SD_1_STRUCT] -const EXAMPLE_UPDATE_1_STATE = Dict("foo" => 23849, "bar" => "dmwofsh") +EXAMPLE_SD_1_DICT = Dict("foo" => 1, "bar" => "a", "version" => 2) +EXAMPLE_UPDATE_1_STATE = Dict("foo" => 23849, "bar" => "dmwofsh") function empty_shadow_framework( shadow_document; shadow_document_property_callbacks = Dict{String,Function}(), shadow_document_pre_update_callback = (v) -> nothing, shadow_document_post_update_callback = (v) -> nothing, + shadow_document_property_pre_update_funcs = Dict{String,Function}(), ) return ShadowFramework( 1, @@ -32,6 +16,7 @@ function empty_shadow_framework( shadow_document_property_callbacks, shadow_document_pre_update_callback, shadow_document_post_update_callback, + shadow_document_property_pre_update_funcs, ) end @@ -47,14 +32,6 @@ end @test d["version"] == 1 end -@testset "create ShadowFramework with struct without version number" begin - @test_throws ErrorException empty_shadow_framework(BadShadowDocWithoutVersion(1, "")) -end - -@testset "create ShadowFramework with immutable struct" begin - @test_throws ErrorException empty_shadow_framework(ImmutableShadowDocWithVersion(1)) -end - @testset "_sync_version!" begin d = Dict("version" => 1) AWSCRT._sync_version!(d, json(Dict("version" => 0))) @@ -74,37 +51,26 @@ end @test !AWSCRT._version_allows_update(d, 0) @test AWSCRT._version_allows_update(d, 1) @test AWSCRT._version_allows_update(d, 2) - - doc = ExampleShadowDocument(0, "", 1) - @test !AWSCRT._version_allows_update(doc, 0) - @test AWSCRT._version_allows_update(doc, 1) - @test AWSCRT._version_allows_update(doc, 2) end @testset "_update_shadow_property!" begin d = Dict() sf = empty_shadow_framework(d) - @test AWSCRT._update_shadow_property!(sf, "foo", 1) - @test !AWSCRT._update_shadow_property!(sf, "foo", 1) # no update now that it's set - @test AWSCRT._update_shadow_property!(sf, "bar", "a") - @test AWSCRT._update_shadow_property!(sf, "baz", "b") # new properties can be introduced + @test AWSCRT._update_shadow_property!(sf, d, "foo", 1) + @test !AWSCRT._update_shadow_property!(sf, d, "foo", 1) # no update now that it's set + @test AWSCRT._update_shadow_property!(sf, d, "bar", "a") + @test AWSCRT._update_shadow_property!(sf, d, "baz", "b") # new properties can be introduced @test d == Dict("foo" => 1, "bar" => "a", "baz" => "b", "version" => 1) - - doc = ExampleShadowDocument(0, "", 1) - sf = empty_shadow_framework(doc) - @test AWSCRT._update_shadow_property!(sf, "foo", 1) - @test !AWSCRT._update_shadow_property!(sf, "foo", 1) # no update now that it's set - @test AWSCRT._update_shadow_property!(sf, "bar", "a") - @test @test_logs (:error,) !AWSCRT._update_shadow_property!(sf, "baz", "b") # new properties fail with structs. returns false because there is no update - @test doc == ExampleShadowDocument(1, "a", 1) end -@testset "_get_shadow_property_pairs" for doc in ALL_EXAMPLE_SD_1 +@testset "_get_shadow_property_pairs" begin + doc = EXAMPLE_SD_1_DICT expected = ["foo" => 1, "bar" => "a"] @test sort_pairs(expected) == sort_pairs(AWSCRT._get_shadow_property_pairs(doc)) end -@testset "_create_reported_state_payload" for doc in ALL_EXAMPLE_SD_1 +@testset "_create_reported_state_payload" begin + doc = EXAMPLE_SD_1_DICT sf = empty_shadow_framework(doc) expected = Dict("state" => Dict("reported" => Dict("foo" => 1, "bar" => "a")), "version" => 2) @test json(expected) == AWSCRT._create_reported_state_payload(sf) @@ -113,7 +79,8 @@ end @test json(expected) == AWSCRT._create_reported_state_payload(sf; include_version = false) end -@testset "_fire_callback (happy path)" for doc in ALL_EXAMPLE_SD_1 +@testset "_fire_callback (happy path)" begin + doc = EXAMPLE_SD_1_DICT values = [] sf = empty_shadow_framework( deepcopy(doc); @@ -123,7 +90,8 @@ end @test values == [0] end -@testset "_fire_callback (callback throws)" for doc in ALL_EXAMPLE_SD_1 +@testset "_fire_callback (callback throws)" begin + doc = EXAMPLE_SD_1_DICT values = [] sf = empty_shadow_framework( doc; @@ -133,7 +101,8 @@ end @test isempty(values) end -@testset "_fire_callback (no matching callback)" for doc in ALL_EXAMPLE_SD_1 +@testset "_fire_callback (no matching callback)" begin + doc = EXAMPLE_SD_1_DICT values = [] sf = empty_shadow_framework( doc; @@ -143,12 +112,14 @@ end @test isempty(values) end -@testset "_fire_callback (no callbacks)" for doc in ALL_EXAMPLE_SD_1 +@testset "_fire_callback (no callbacks)" begin + doc = EXAMPLE_SD_1_DICT sf = empty_shadow_framework(doc) AWSCRT._fire_callback(sf, "bar", 0) end -@testset "_do_local_shadow_update!" for doc in ALL_EXAMPLE_SD_1 +@testset "_do_local_shadow_update!" begin + doc = EXAMPLE_SD_1_DICT values = [] doc_copy = deepcopy(doc) sf = empty_shadow_framework( @@ -179,7 +150,58 @@ end @test @test_logs (:error,) match_mode = :any !AWSCRT._do_local_shadow_update!(sf, state) # no update the second time end -@testset "_update_local_shadow_from_get!" for doc in ALL_EXAMPLE_SD_1 +@testset "use pre-update function instead of default behavior" begin + @testset "regular update" begin + doc = Dict("a" => 1) + sf = empty_shadow_framework( + doc, + shadow_document_property_pre_update_funcs = Dict("a" => (doc, key, value) -> begin + if key != "a" + error("bad key $key") + end + doc[key] = 3 + return true + end), + ) + @test AWSCRT._do_local_shadow_update!(sf, Dict("a" => 2)) + @test doc == Dict("a" => 3, "version" => 1) + end + + @testset "no update" begin + doc = Dict("a" => 1) + sf = empty_shadow_framework( + doc, + shadow_document_property_pre_update_funcs = Dict("a" => (doc, key, value) -> begin + if key != "a" + error("bad key $key") + end + doc[key] = 3 + return false + end), + ) + @test !AWSCRT._do_local_shadow_update!(sf, Dict("a" => 2)) + @test doc == Dict("a" => 3, "version" => 1) + end + + @testset "nested key" begin + doc = Dict{String,Any}("a" => Dict("b" => 1, "c" => 1)) + sf = empty_shadow_framework( + doc, + shadow_document_property_pre_update_funcs = Dict("b" => (doc, key, value) -> begin + if key != "b" + error("bad key $key") + end + doc[key] = 3 + return true + end), + ) + @test AWSCRT._do_local_shadow_update!(sf, Dict("a" => Dict("b" => 2))) + @test doc == Dict("a" => Dict("b" => 3, "c" => 1), "version" => 1) + end +end + +@testset "_update_local_shadow_from_get!" begin + doc = EXAMPLE_SD_1_DICT doc_copy = deepcopy(doc) sf = empty_shadow_framework(doc_copy) @test AWSCRT._update_local_shadow_from_get!( @@ -189,15 +211,16 @@ end @test are_shadow_states_equal_without_version(doc_copy, Dict("foo" => 2, "bar" => "a")) end -@testset "_update_local_shadow_from_delta!" for doc in ALL_EXAMPLE_SD_1 +@testset "_update_local_shadow_from_delta!" begin + doc = EXAMPLE_SD_1_DICT doc_copy = deepcopy(doc) sf = empty_shadow_framework(doc_copy) @test AWSCRT._update_local_shadow_from_delta!(sf, json(Dict("state" => Dict("foo" => 2), "version" => 3))) @test are_shadow_states_equal_without_version(doc_copy, Dict("foo" => 2, "bar" => "a")) end -@testset "out of order messages, version is respected" for doc in ALL_EXAMPLE_SD_1, - update_type in [:get_accepted, :delta] +@testset "out of order messages, version is respected" for update_type in [:get_accepted, :delta] + doc = EXAMPLE_SD_1_DICT update_func = if update_type == :get_accepted AWSCRT._update_local_shadow_from_get! From 08fcb35b3c0142be96aa25be86386467974bb95a Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Wed, 12 Feb 2025 12:24:25 -0500 Subject: [PATCH 2/9] add export --- src/AWSCRT.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/AWSCRT.jl b/src/AWSCRT.jl index 1c0dc58..98b42a8 100644 --- a/src/AWSCRT.jl +++ b/src/AWSCRT.jl @@ -96,6 +96,7 @@ export OnShadowMessage include("ShadowFramework.jl") export ShadowFramework +export ShadowDocumentPropertyPreUpdateFunction export ShadowDocumentPropertyUpdateCallback export ShadowDocumentPreUpdateCallback export ShadowDocumentPostUpdateCallback From ca549589bcb44868a0638e13951b33872758dc9d Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Wed, 12 Feb 2025 12:30:34 -0500 Subject: [PATCH 3/9] fix typecheck --- src/ShadowFramework.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ShadowFramework.jl b/src/ShadowFramework.jl index 548a56d..4e9ed7a 100644 --- a/src/ShadowFramework.jl +++ b/src/ShadowFramework.jl @@ -143,13 +143,13 @@ function ShadowFramework( thing_name::String, shadow_name::Union{String,Nothing}, shadow_document::AbstractDict; - shadow_document_property_callbacks::Dict{String,ShadowDocumentPropertyUpdateCallback} = Dict{ + shadow_document_property_callbacks::Dict{String,<:ShadowDocumentPropertyUpdateCallback} = Dict{ String, ShadowDocumentPropertyUpdateCallback, }(), shadow_document_pre_update_callback::ShadowDocumentPreUpdateCallback = v -> nothing, shadow_document_post_update_callback::ShadowDocumentPostUpdateCallback = v -> nothing, - shadow_document_property_pre_update_funcs::Dict{String,ShadowDocumentPropertyPreUpdateFunction} = Dict{ + shadow_document_property_pre_update_funcs::Dict{String,<:ShadowDocumentPropertyPreUpdateFunction} = Dict{ String, ShadowDocumentPropertyPreUpdateFunction, }(), From 87d8fbb8289b1508734d8d8d1b59eb008601ce1b Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Wed, 12 Feb 2025 12:31:35 -0500 Subject: [PATCH 4/9] fix types --- test/shadow_framework_integ_test.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index e0647e7..e6aef28 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -368,7 +368,7 @@ end @testset "updating a field in a nested dict causes only that field to get updated" begin connection = new_mqtt_connection() shadow_name = random_shadow_name() - doc = Dict("foo" => Dict("bar" => 1)) + doc = Dict{String,Any}("foo" => Dict("bar" => 1)) values_foo = [] foo_cb = x -> push!(values_foo, x) @@ -493,7 +493,7 @@ end end # our local copy should get updated even though there is no delta state - @test doc == Dict("foo" => 2) + @test doc == Dict("foo" => 2, "version" => 2) @info "unsubscribing" fetch(unsubscribe(sf)[1]) From 44cbf63195af876b216a1d7b19ff898806dce3ad Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Wed, 12 Feb 2025 12:48:09 -0500 Subject: [PATCH 5/9] disable unsupported test for now --- src/ShadowFramework.jl | 4 ++++ test/shadow_framework_integ_test.jl | 25 +++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/ShadowFramework.jl b/src/ShadowFramework.jl index 4e9ed7a..51a029b 100644 --- a/src/ShadowFramework.jl +++ b/src/ShadowFramework.jl @@ -137,6 +137,10 @@ Arguments: - `id (Int)`: A unique ID which disambiguates log messages from multiple shadow frameworks. See also [`ShadowDocumentPropertyUpdateCallback`](@ref), [`ShadowDocumentPreUpdateCallback`](@ref), [`ShadowDocumentPostUpdateCallback`](@ref), [`MQTTConnection`](@ref). + + !!! note "Limitations" + Removing properties by setting their desired value to `null` is not currently supported. AWS IoT will remove + that `null` property from the desired state, but the property will remain in the reported state. """ function ShadowFramework( connection::MQTTConnection, diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index e6aef28..2502b33 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -414,18 +414,19 @@ end empty!(values_post_update) - # remove foo.baz. foo.bar=1 should remain - @info "publishing out of band /update" - fetch( - publish( - oobsc.shadow_client, - "/update", - json(Dict("state" => Dict("desired" => Dict("foo" => Dict("baz" => nothing))))), - AWS_MQTT_QOS_AT_LEAST_ONCE, - )[1], - ) - wait_for(() -> !isempty(values_post_update)) - @test doc["foo"] == Dict("bar" => 1) + # TODO this is not supported yet + # # remove foo.baz. foo.bar=1 should remain + # @info "publishing out of band /update" + # fetch( + # publish( + # oobsc.shadow_client, + # "/update", + # json(Dict("state" => Dict("desired" => Dict("foo" => Dict("baz" => nothing))))), + # AWS_MQTT_QOS_AT_LEAST_ONCE, + # )[1], + # ) + # wait_for(() -> !isempty(values_post_update)) + # @test doc["foo"] == Dict("bar" => 1) @info "unsubscribing" fetch(unsubscribe(sf)[1]) From a56a9e47ae8836af6cbab7bf12e9efc2ad090d7e Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Thu, 13 Feb 2025 11:15:22 -0500 Subject: [PATCH 6/9] pre-update functions should also run for the parents of nested updates --- src/ShadowFramework.jl | 16 +++--- test/shadow_framework_integ_test.jl | 80 +++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/src/ShadowFramework.jl b/src/ShadowFramework.jl index 51a029b..046d2f3 100644 --- a/src/ShadowFramework.jl +++ b/src/ShadowFramework.jl @@ -478,22 +478,20 @@ If the `value` is an `AbstractDict`, it is merged into the `doc` instead of over Returns `true` if an update occured. """ function _update_shadow_property!(sf::ShadowFramework, doc::AbstractDict, key::String, value) - return if value isa AbstractDict + return if haskey(sf._shadow_document_property_pre_update_funcs, key) + sf._shadow_document_property_pre_update_funcs[key](doc, key, value) + elseif value isa AbstractDict updated = false for (k, v) in collect(value) updated |= _update_shadow_property!(sf, doc[key], k, v) end updated else - if haskey(sf._shadow_document_property_pre_update_funcs, key) - sf._shadow_document_property_pre_update_funcs[key](doc, key, value) + if !haskey(doc, key) || !isequal(doc[key], value) + doc[key] = value + true else - if !haskey(doc, key) || !isequal(doc[key], value) - doc[key] = value - true - else - false - end + false end end end diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index 2502b33..7987d7c 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -712,4 +712,84 @@ end fetch(publish(sc, "/delete", "", AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) end end + + @testset "the pre-update function can be used for the parents of nested properties" begin + connection = new_mqtt_connection() + shadow_name = random_shadow_name() + doc = Dict("foo" => Dict("bar" => 1)) + bar_latch = CountDownLatch(1) + + sf = ShadowFramework( + connection, + THING1_NAME, + shadow_name, + doc; + shadow_document_property_pre_update_funcs = Dict( + "foo" => (doc, k, v) -> begin + doc[k] = Dict("bar" => 3) + return true + end, + "bar" => (doc, k, v) -> begin + count_down(bar_latch) + error("this should not run") + end, + ), + ) + sc = shadow_client(sf) + + msgs = [] + function shadow_callback( + shadow_client::ShadowClient, + topic::String, + payload::String, + dup::Bool, + qos::aws_mqtt_qos, + retain::Bool, + ) + push!(msgs, (; shadow_client, topic, payload, dup, qos, retain)) + end + + oobc = new_mqtt_connection() + oobsc = OOBShadowClient(oobc, THING1_NAME, shadow_name) + + update_msgs = [] + task, id = subscribe( + oobsc.shadow_client, + "/update", + AWS_MQTT_QOS_AT_LEAST_ONCE, + (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> + push!(update_msgs, (; topic, payload, dup, qos, retain)), + ) + fetch(task) + + try + # make the remote shadow different from the local one to cause a sync on the first subscribe + @info "publishing out of band /update" + fetch( + publish( + oobsc.shadow_client, + "/update", + json(Dict("state" => Dict("reported" => Dict("foo" => Dict("bar" => 2))))), + AWS_MQTT_QOS_AT_LEAST_ONCE, + )[1], + ) + wait_for(() -> length(update_msgs) >= 2) + + @info "subscribing" + wait_until_synced(sf) do + fetch(subscribe(sf)[1]) + end + + # the pre-update function should have ran and changed the nested property foo.bar + @test doc == Dict("foo" => Dict("bar" => 3), "version" => 2) + @test get_count(bar_latch) == 1 + + @info "unsubscribing" + fetch(unsubscribe(sf)[1]) + @info "done unsubscribing" + fetch(unsubscribe(oobsc.shadow_client)[1]) + finally + fetch(publish(sc, "/delete", "", AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) + end + end end From cdb1cdcbd0fa91f07b47b7c8a881e484240466fa Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Thu, 13 Feb 2025 11:25:00 -0500 Subject: [PATCH 7/9] fix type --- test/shadow_framework_integ_test.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index 7987d7c..e01b1b0 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -716,7 +716,7 @@ end @testset "the pre-update function can be used for the parents of nested properties" begin connection = new_mqtt_connection() shadow_name = random_shadow_name() - doc = Dict("foo" => Dict("bar" => 1)) + doc = Dict{String,Any}("foo" => Dict("bar" => 1)) bar_latch = CountDownLatch(1) sf = ShadowFramework( From c4ec7ed6f184e438a34426a91e80c7247e23be30 Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Thu, 13 Feb 2025 11:29:52 -0500 Subject: [PATCH 8/9] fixup! pre-update functions should also run for the parents of nested updates --- test/shadow_framework_integ_test.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index e01b1b0..cc1a6e1 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -773,7 +773,7 @@ end AWS_MQTT_QOS_AT_LEAST_ONCE, )[1], ) - wait_for(() -> length(update_msgs) >= 2) + wait_for(() -> length(update_msgs) >= 1) @info "subscribing" wait_until_synced(sf) do From c14789c2237d62ea2f4040af37398ec8a8ebd3c3 Mon Sep 17 00:00:00 2001 From: Octogonapus Date: Thu, 13 Feb 2025 11:38:33 -0500 Subject: [PATCH 9/9] fixup! pre-update functions should also run for the parents of nested updates --- test/shadow_framework_integ_test.jl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/shadow_framework_integ_test.jl b/test/shadow_framework_integ_test.jl index cc1a6e1..f5e84bb 100644 --- a/test/shadow_framework_integ_test.jl +++ b/test/shadow_framework_integ_test.jl @@ -494,7 +494,12 @@ end end # our local copy should get updated even though there is no delta state - @test doc == Dict("foo" => 2, "version" => 2) + try + wait_for(() -> doc == Dict("foo" => 2, "version" => 2)) + catch + @show doc + rethrow() + end @info "unsubscribing" fetch(unsubscribe(sf)[1])