Skip to content

Commit

Permalink
Optimization GraphQL::AnyCable::Cleaner. Use a separate key for stori…
Browse files Browse the repository at this point in the history
…ng created_time of subscriptions and channels
  • Loading branch information
prog-supdex committed Sep 19, 2023
1 parent f16a4bb commit 38b0aaa
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 28 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ To avoid filling Redis storage with stale subscription data:
Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions).
### Recommendations
You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption,
but before using `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean`, you should configure `subscription_expiration_seconds`
and `use_redis_object_on_cleanup` settings
## Configuration
GraphQL-AnyCable uses [anyway_config] to configure itself. There are several possibilities to configure this gem:
Expand Down
53 changes: 43 additions & 10 deletions lib/graphql/anycable/cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module AnyCable
module Cleaner
extend self

MAX_RECORDS_AT_ONCE = 1_000

def clean
clean_channels
clean_subscriptions
Expand All @@ -16,24 +18,28 @@ def clean_channels
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::CHANNEL_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::CHANNELS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name)
end

def clean_subscriptions
return unless config.subscription_expiration_seconds
return unless config.use_redis_object_on_cleanup

redis.scan_each(match: "#{redis_key(adapter::SUBSCRIPTION_PREFIX)}*") do |key|
idle = redis.object("IDLETIME", key)
next if idle&.<= config.subscription_expiration_seconds
store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)

redis.del(key)
end
remove_old_objects(store_name)
end

# For cases, when we need to clear only `subscription time storage`
def clean_subscription_time_storage
clean_created_time_storage(redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME))
end

# For cases, when we need to clear only `channel time storage`
def clean_channel_time_storage
clean_created_time_storage(redis_key(adapter::CHANNELS_STORAGE_TIME))
end

def clean_fingerprint_subscriptions
Expand Down Expand Up @@ -74,6 +80,33 @@ def config
def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def remove_old_objects(store_name)
# Determine the time point before which the keys should be deleted
time_point = (Time.now - config.subscription_expiration_seconds).to_i

# iterating per 1000 records
loop do
# fetches keys, which need to be deleted
keys = redis.zrangebyscore(store_name, "-inf", time_point, limit: [0, MAX_RECORDS_AT_ONCE])

break if keys.empty?

redis.multi do |pipeline|
pipeline.del(*keys)
pipeline.zrem(store_name, keys)
end
end
end

# For cases, when the key was dropped, but it remains in the `subscription/channel time storage`
def clean_created_time_storage(storage_name)
redis.zscan_each(storage_name, count: MAX_RECORDS_AT_ONCE) do |key|
next if redis.exists?(key)

redis.zrem(storage_name, key)
end
end
end
end
end
49 changes: 35 additions & 14 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require "anycable"
require "graphql/subscriptions"
require "graphql/anycable/errors"

# rubocop: disable Metrics/AbcSize, Metrics/LineLength, Metrics/MethodLength

# A subscriptions implementation that sends data as AnyCable broadcastings.
Expand Down Expand Up @@ -56,10 +55,12 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
SUBSCRIPTIONS_STORAGE_TIME = "subscription-storage-time" # ZSET: Stores name and created_time of subscriptions
CHANNELS_STORAGE_TIME = "channel-storage-time" # ZSET: Stores name and created_time of channels

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
Expand Down Expand Up @@ -131,7 +132,6 @@ def write_subscription(query, events)
# Store subscription_id in the channel state to cleanup on disconnect
write_subscription_id(channel, channel_uniq_id)


events.each do |event|
channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
end
Expand All @@ -145,15 +145,29 @@ def write_subscription(query, events)
}

redis.multi do |pipeline|
pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_uniq_id}"

pipeline.sadd(full_channel_id, [subscription_id])
pipeline.mapped_hmset(full_subscription_id, data)

events.each do |event|
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end
next unless config.subscription_expiration_seconds
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)

# add the records to the storages if subscription_expiration_seconds is nil
unless config.subscription_expiration_seconds
current_timestamp = Time.now.to_i

pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)

next
end

pipeline.expire(full_channel_id, config.subscription_expiration_seconds)
pipeline.expire(full_subscription_id, config.subscription_expiration_seconds)
end
end

Expand Down Expand Up @@ -182,7 +196,10 @@ def delete_subscription(subscription_id)
fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score
end
# Delete subscription itself
pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id)
full_subscription_id = "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}"

pipeline.del(full_subscription_id)
pipeline.zrem(redis_key(SUBSCRIPTIONS_STORAGE_TIME), full_subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do |pipeline|
Expand All @@ -200,10 +217,14 @@ def delete_channel_subscriptions(channel_or_id)
# Missing in case disconnect happens before #execute
return unless channel_id

redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
full_channel_id = "#{redis_key(CHANNEL_PREFIX)}#{channel_id}"

redis.smembers(full_channel_id).each do |subscription_id|
delete_subscription(subscription_id)
end
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)

redis.del(full_channel_id)
redis.zrem(redis_key(CHANNELS_STORAGE_TIME), full_channel_id)
end

private
Expand Down
15 changes: 11 additions & 4 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,20 @@
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be true
subscription = "graphql-subscription:#{subscription_id}"
channel = "graphql-channel:#{subscription_id}"

expect(redis.exists?(subscription)).to be true
expect(redis.exists?(channel)).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be true
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be true
subject
expect(redis.exists?("graphql-channel:some-truly-random-number")).to be false
expect(redis.exists?(channel)).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
expect(redis.exists?(subscription)).to be false
expect(redis.zrange("graphql-subscription-storage-time", 0, -1).member?(subscription)).to be false
expect(redis.zrange("graphql-channel-storage-time", 0, -1).member?(channel)).to be false
end
end

Expand Down

0 comments on commit 38b0aaa

Please sign in to comment.