Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe implementation of the sentinel connection pool #4

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ Accessing redis-py's Sentinel instance
Change log
----------

v2.1.0
~~~~~~

* Removed the thread-local variable for sentinel connection pool. If you want
to use sentinel with multiple threads, you need to use a patched
version of redis-py.
* Added `disconnect()` method for resetting the connection pool

v2.0.1
~~~~~~

Expand Down
88 changes: 60 additions & 28 deletions flask_redis_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
# limitations under the License.

import six
import logging
import inspect
import threading
import redis
import redis.sentinel
import redis_sentinel_url
from flask import current_app
from werkzeug.local import Local, LocalProxy
from werkzeug.local import LocalProxy
from werkzeug.utils import import_string

logger = logging.getLogger(__name__)


_EXTENSION_KEY = 'redissentinel'

Expand All @@ -33,24 +37,34 @@ def __init__(self, url, client_class, client_options, sentinel_class, sentinel_o
self.client_options = client_options
self.sentinel_class = sentinel_class
self.sentinel_options = sentinel_options
self.local = Local()
self.connection = None
self.master_connections = {}
self.slave_connections = {}
self._connect_lock = threading.Lock()
self._connect()
if self.local.connection[0] is None:
# if there is no sentinel, we don't need to use thread-local storage
self.connection = self.local.connection
self.local = self

def _connect(self):
try:
return self.local.connection
except AttributeError:
with self._connect_lock:
if self.connection is not None:
return self.connection

conn = redis_sentinel_url.connect(
self.url,
sentinel_class=self.sentinel_class, sentinel_options=self.sentinel_options,
client_class=self.client_class, client_options=self.client_options)
self.local.connection = conn
self.connection = conn
return conn

def _iter_connections(self):
if self.connection is not None:
for conn in self.connection:
if conn is not None:
yield conn
for conn in six.itervalues(self.master_connections):
yield conn
for conn in six.itervalues(self.slave_connections):
yield conn

@property
def sentinel(self):
return self._connect()[0]
Expand All @@ -60,38 +74,53 @@ def default_connection(self):
return self._connect()[1]

def master_for(self, service_name, **kwargs):
try:
return self.local.master_connections[service_name]
except AttributeError:
self.local.master_connections = {}
except KeyError:
pass
with self._connect_lock:
try:
return self.master_connections[service_name]
except KeyError:
pass

sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get master {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs)
self.local.master_connections[service_name] = conn
return conn
with self._connect_lock:
try:
return self.master_connections[service_name]
except KeyError:
pass

conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs)
self.master_connections[service_name] = conn
return conn

def slave_for(self, service_name, **kwargs):
try:
return self.local.slave_connections[service_name]
except AttributeError:
self.local.slave_connections = {}
except KeyError:
pass
with self._connect_lock:
try:
return self.slave_connections[service_name]
except KeyError:
pass

sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get slave {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs)
self.local.slave_connections[service_name] = conn
return conn
with self._connect_lock:
try:
return self.slave_connections[service_name]
except KeyError:
pass

conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs)
self.slave_connections[service_name] = conn
return conn

def disconnect(self):
with self._connect_lock:
for conn in self._iter_connections():
conn.connection_pool.disconnect()


class RedisSentinel(object):
Expand Down Expand Up @@ -176,5 +205,8 @@ def master_for(self, service_name, **kwargs):
def slave_for(self, service_name, **kwargs):
return LocalProxy(lambda: self.get_instance().slave_for(service_name, **kwargs))

def disconnect(self):
return self.get_instance().disconnect()


SentinelExtension = RedisSentinel # for backwards-compatibility
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import os
from setuptools import setup


def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()


setup(
name='Flask-Redis-Sentinel',
py_modules=['flask_redis_sentinel'],
version='2.0.1',
version='2.1.0',
install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0', 'six'],
description='Redis-Sentinel integration for Flask',
long_description=read('README.rst'),
Expand All @@ -33,4 +35,3 @@ def read(fname):
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

14 changes: 9 additions & 5 deletions test_flask_redis_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ def test_sentinel_threads(self):
sentinel.init_app(self.app)

connections = self._check_threads(sentinel)
self.assertIsNot(connections['from_another_thread'], connections['from_main_thread'])
self.assertIsNot(connections['from_another_thread'], connections['from_main_thread_later'])
self.assertIs(connections['from_another_thread'], connections['from_main_thread'])
self.assertIs(connections['from_another_thread'], connections['from_main_thread_later'])
self.assertIs(connections['from_main_thread'], connections['from_main_thread_later'])

def test_redis_threads(self):
Expand All @@ -324,6 +324,9 @@ def test_mixed_apps(self):
sentinel2 = SentinelExtension(app=self.app2, config_prefix='CUSTOM_REDIS', client_class=FakeRedis)
conn2 = sentinel2.default_connection

with self.app2.app_context():
conn2._get_current_object()

self.app3 = Flask('test3')

with self.app2.app_context():
Expand Down Expand Up @@ -357,8 +360,9 @@ def test_named_master_no_sentinel(self):
conn = sentinel.master_for('othermaster', db=6)
with self.app.app_context():
self.assertIsNone(sentinel.sentinel._get_current_object())
with self.assertRaisesRegexp(RuntimeError, 'Cannot get master othermaster using non-sentinel configuration'):
inst = conn._get_current_object()
msg = 'Cannot get master othermaster using non-sentinel configuration'
with self.assertRaisesRegexp(RuntimeError, msg):
conn._get_current_object()

def test_named_slave(self):
sentinel = SentinelExtension(client_class=FakeRedis, sentinel_class=FakeSentinel)
Expand All @@ -382,4 +386,4 @@ def test_named_slave_no_sentinel(self):
with self.app.app_context():
self.assertIsNone(sentinel.sentinel._get_current_object())
with self.assertRaisesRegexp(RuntimeError, 'Cannot get slave otherslave using non-sentinel configuration'):
inst = conn._get_current_object()
conn._get_current_object()
10 changes: 9 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
[tox]
envlist = py27,py33,py34,py35,py36,pypy

[testenv]
deps =
mock
nose
coverage
commands=nosetests --with-coverage --cover-package=flask_redis_sentinel
flake8
commands=
flake8
nosetests --with-coverage --cover-package=flask_redis_sentinel

[flake8]
max-line-length = 120
exclude=env,venv,.tox,.idea