1from __future__ import print_function
2# gevent-test-requires-resource: psycopg2
3# pylint:disable=import-error,broad-except,bare-except
4import sys
5import contextlib
6
7import gevent
8from gevent.queue import Queue
9from gevent.socket import wait_read, wait_write
10from psycopg2 import extensions, OperationalError, connect
11
12
13if sys.version_info[0] >= 3:
14 integer_types = (int,)
15else:
16 import __builtin__
17 integer_types = (int, __builtin__.long)
18
19
20def gevent_wait_callback(conn, timeout=None):
21 """A wait callback useful to allow gevent to work with Psycopg."""
22 while 1:
23 state = conn.poll()
24 if state == extensions.POLL_OK:
25 break
26 elif state == extensions.POLL_READ:
27 wait_read(conn.fileno(), timeout=timeout)
28 elif state == extensions.POLL_WRITE:
29 wait_write(conn.fileno(), timeout=timeout)
30 else:
31 raise OperationalError(
32 "Bad result from poll: %r" % state)
33
34
35extensions.set_wait_callback(gevent_wait_callback)
36
37
38class AbstractDatabaseConnectionPool(object):
39
40 def __init__(self, maxsize=100):
41 if not isinstance(maxsize, integer_types):
42 raise TypeError('Expected integer, got %r' % (maxsize, ))
43 self.maxsize = maxsize
44 self.pool = Queue()
45 self.size = 0
46
47 def create_connection(self):
48 raise NotImplementedError()
49
50 def get(self):
51 pool = self.pool
52 if self.size >= self.maxsize or pool.qsize():
53 return pool.get()
54
55 self.size += 1
56 try:
57 new_item = self.create_connection()
58 except:
59 self.size -= 1
60 raise
61 return new_item
62
63 def put(self, item):
64 self.pool.put(item)
65
66 def closeall(self):
67 while not self.pool.empty():
68 conn = self.pool.get_nowait()
69 try:
70 conn.close()
71 except Exception:
72 pass
73
74 @contextlib.contextmanager
75 def connection(self, isolation_level=None):
76 conn = self.get()
77 try:
78 if isolation_level is not None:
79 if conn.isolation_level == isolation_level:
80 isolation_level = None
81 else:
82 conn.set_isolation_level(isolation_level)
83 yield conn
84 except:
85 if conn.closed:
86 conn = None
87 self.closeall()
88 else:
89 conn = self._rollback(conn)
90 raise
91 else:
92 if conn.closed:
93 raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
94 conn.commit()
95 finally:
96 if conn is not None and not conn.closed:
97 if isolation_level is not None:
98 conn.set_isolation_level(isolation_level)
99 self.put(conn)
100
101 @contextlib.contextmanager
102 def cursor(self, *args, **kwargs):
103 isolation_level = kwargs.pop('isolation_level', None)
104 with self.connection(isolation_level) as conn:
105 yield conn.cursor(*args, **kwargs)
106
107 def _rollback(self, conn):
108 try:
109 conn.rollback()
110 except:
111 gevent.get_hub().handle_error(conn, *sys.exc_info())
112 return
113 return conn
114
115 def execute(self, *args, **kwargs):
116 with self.cursor(**kwargs) as cursor:
117 cursor.execute(*args)
118 return cursor.rowcount
119
120 def fetchone(self, *args, **kwargs):
121 with self.cursor(**kwargs) as cursor:
122 cursor.execute(*args)
123 return cursor.fetchone()
124
125 def fetchall(self, *args, **kwargs):
126 with self.cursor(**kwargs) as cursor:
127 cursor.execute(*args)
128 return cursor.fetchall()
129
130 def fetchiter(self, *args, **kwargs):
131 with self.cursor(**kwargs) as cursor:
132 cursor.execute(*args)
133 while True:
134 items = cursor.fetchmany()
135 if not items:
136 break
137 for item in items:
138 yield item
139
140
141class PostgresConnectionPool(AbstractDatabaseConnectionPool):
142
143 def __init__(self, *args, **kwargs):
144 self.connect = kwargs.pop('connect', connect)
145 maxsize = kwargs.pop('maxsize', None)
146 self.args = args
147 self.kwargs = kwargs
148 AbstractDatabaseConnectionPool.__init__(self, maxsize)
149
150 def create_connection(self):
151 return self.connect(*self.args, **self.kwargs)
152
153
154def main():
155 import time
156 pool = PostgresConnectionPool("dbname=postgres", maxsize=3)
157 start = time.time()
158 for _ in range(4):
159 gevent.spawn(pool.execute, 'select pg_sleep(1);')
160 gevent.wait()
161 delay = time.time() - start
162 print('Running "select pg_sleep(1);" 4 times with 3 connections. Should take about 2 seconds: %.2fs' % delay)
163
164if __name__ == '__main__':
165 main()
Next page: Example threadpool.py