-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpymysqlpool.py
61 lines (53 loc) · 1.89 KB
/
pymysqlpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import threading
import pymysql
def new_close(conn):
if conn.pooling != None:
conn.pooling.put(conn)
elif conn.old_close != None:
conn.old_close()
class Pool(object):
def __init__(self, create_instance, max_count=10, timeout=10):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.in_use_list = set()
self.free_list = set()
self.max_count = max_count or 10
self.timeout = timeout or 10
self.new_instance = create_instance
assert (create_instance != None)
self.__change_pymysql_close()
def __change_pymysql_close(self):
old_close = pymysql.connections.Connection.close
if old_close == new_close:
return
pymysql.connections.Connection.close = new_close
pymysql.connections.Connection.old_close = old_close
def get(self):
"""get one from free list"""
with self.lock:
if len(self.free_list) > 0:
one = self.free_list.pop()
self.in_use_list.add(one)
return one
if len(self.in_use_list) < self.max_count:
one = self.new_instance()
one.pooling = self
self.free_list.add(one)
if len(self.free_list) <= 0:
self.condition.wait(self.timeout)
if len(self.free_list) <= 0:
raise TimeoutError()
one = self.free_list.pop()
self.in_use_list.add(one)
return one
def put(self, value):
"""put one into free list"""
with self.lock:
self.in_use_list.remove(value)
self.free_list.add(value)
self.condition.notify_all()
def size(self):
with self.lock:
return len(self.free_list) + len(self.in_use_list)
def max_size(self):
return self.max_count