Skip to content

Commit

Permalink
refactoring the solution for storing created_time of subscriptions/ch…
Browse files Browse the repository at this point in the history
…annels
  • Loading branch information
prog-supdex committed Sep 19, 2023
1 parent 3d0d38a commit 0d32aba
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 53 deletions.
24 changes: 4 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,11 @@ To avoid filling Redis storage with stale subscription data:
Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions).
### Limitations
### Recommendations
The `GraphQL::AnyCable::Cleaner` uses [IDLETIME](https://redis.io/commands/object-idletime/) for detecting how long the object was inactive.
It is a good way to detect inactive subscriptions, but `IDLETIME` is updated every time, not only when new subscriptions are, but when we read it
It means that `broadcasting` also updates the `IDLETIME`, which does not give us the ability to detect useless subscriptions for cleaning
It will be resolved in the next versions
You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption,
but before using `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean`, you should configure `subscription_expiration_seconds`
and `use_redis_object_on_cleanup` settings
## Configuration
Expand Down Expand Up @@ -173,21 +172,6 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
And any other way provided by [anyway_config]. Check its documentation!
## Emergency Actions
In situations, when you don't set `subscription_expiration_seconds`, have a lot of inactive subscriptions and `GraphQL::AnyCable::Cleaner` doesn`t help in that, you can do the
next actions for clearing subscriptions
1. Set `config.subscription_expiration_seconds`. After that, the new subscriptions will have `TTL`
2. Through the `redis_prefix` (look at the `Configuration` block) change the redis prefixes, which uses for storing keys
3. Run the script, using the old `redis_prefix` (the default value is `graphql`)
```ruby
redis = GraphQL::AnyCable.redis
redis.scan_each("graphql-subscription:*") do |key|
redis.del(key) if redis.ttl(key) < 0 # Remove it, because it is an old record
end
```

## Data model
As in AnyCable there is no place to store subscription data in-memory, it should be persisted somewhere to be retrieved on `GraphQLSchema.subscriptions.trigger` and sent to subscribed clients. `graphql-anycable` uses the same Redis database as AnyCable itself.
Expand Down
55 changes: 38 additions & 17 deletions lib/graphql/anycable/cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module AnyCable
module Cleaner
extend self

MAX_RECORDS_AT_ONCE = 1_000

def clean
clean_channels
clean_subscriptions
Expand All @@ -16,26 +18,28 @@ def clean_channels
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::CHANNEL_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::CHANNELS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name)
end

def clean_subscriptions
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::SUBSCRIPTION_PREFIX)}*") do |key|
next unless object_created_time_expired?(key)
store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)

redis.multi do |pipeline|
pipeline.del(key)
pipeline.hdel(redis_key(adapter::CREATED_AT_KEY), key)
end
end
remove_old_objects(store_name)
end

# For cases, when we need to clear only `subscription time storage`
def clean_subscription_time_storage
clean_created_time_storage(redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME))
end

# For cases, when we need to clear only `channel time storage`
def clean_channel_time_storage
clean_created_time_storage(redis_key(adapter::CHANNELS_STORAGE_TIME))
end

def clean_fingerprint_subscriptions
Expand Down Expand Up @@ -77,14 +81,31 @@ def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def object_created_time_expired?(key)
last_created_time = redis.hget(redis_key(adapter::CREATED_AT_KEY), key)
def remove_old_objects(store_name)
# Determine the time point before which the keys should be deleted
time_point = (Time.now - config.subscription_expiration_seconds).to_i

# iterating per 1000 records
loop do
# fetches keys, which need to be deleted
keys = redis.zrangebyscore(store_name, "-inf", time_point, limit: [0, MAX_RECORDS_AT_ONCE])

return false unless last_created_time
break if keys.empty?

redis.multi do |pipeline|
pipeline.del(*keys)
pipeline.zrem(store_name, keys)
end
end
end

expire_date = Time.parse(last_created_time) + config.subscription_expiration_seconds
# For cases, when the key was dropped, but it remains in the `subscription/channel time storage`
def clean_created_time_storage(storage_name)
redis.zscan_each(storage_name, count: MAX_RECORDS_AT_ONCE) do |key|
next if redis.exists?(key)

Time.now >= expire_date
redis.zrem(storage_name, key)
end
end
end
end
Expand Down
34 changes: 24 additions & 10 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
CREATED_AT_KEY = "objects:list-created-times" # HASH: Stores name and created_time of object
SUBSCRIPTIONS_STORAGE_TIME = "subscription-storage-time" # ZSET: Stores name and created_time of subscriptions
CHANNELS_STORAGE_TIME = "channel-storage-time" # ZSET: Stores name and created_time of channels

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
Expand Down Expand Up @@ -145,19 +146,28 @@ def write_subscription(query, events)

redis.multi do |pipeline|
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_uniq_id}"

pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.sadd(full_channel_id, [subscription_id])
pipeline.mapped_hmset(full_subscription_id, data)

pipeline.hset(redis_key(CREATED_AT_KEY), full_subscription_id, Time.now.to_s)

events.each do |event|
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end
next unless config.subscription_expiration_seconds
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)

# add the records to the storages if subscription_expiration_seconds is nil
unless config.subscription_expiration_seconds
current_timestamp = Time.now.to_i

pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)

next
end

pipeline.expire(full_channel_id, config.subscription_expiration_seconds)
pipeline.expire(full_subscription_id, config.subscription_expiration_seconds)
end
end

Expand Down Expand Up @@ -189,7 +199,7 @@ def delete_subscription(subscription_id)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"

pipeline.del(full_subscription_id)
pipeline.hdel(redis_key(CREATED_AT_KEY), full_subscription_id)
pipeline.zrem(redis_key(SUBSCRIPTIONS_STORAGE_TIME), full_subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do |pipeline|
Expand All @@ -207,10 +217,14 @@ def delete_channel_subscriptions(channel_or_id)
# Missing in case disconnect happens before #execute
return unless channel_id

redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_id}"

redis.smembers(full_channel_id).each do |subscription_id|
delete_subscription(subscription_id)
end
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)

redis.del(full_channel_id)
redis.zrem(redis_key(CHANNELS_STORAGE_TIME), full_channel_id)
end

private
Expand Down
17 changes: 11 additions & 6 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,20 @@
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
subscription = "graphql-subscription:#{subscription_id}"
channel = "graphql-channel:#{subscription_id}"

expect(redis.exists?(subscription)).to be true
expect(redis.exists?(channel)).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
expect(redis.hexists("graphql-objects:list-created-times", "graphql-subscription:some-truly-random-number")).to be true
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be true
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?(channel)).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
expect(redis.hexists("graphql-objects:list-created-times", "graphql-subscription:some-truly-random-number")).to be false
expect(redis.exists?(subscription)).to be false
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be false
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be false
end
end

Expand Down

0 comments on commit 0d32aba

Please sign in to comment.