Example psycopg2_pool.pyΒΆ

  1from __future__ import print_function
  2# gevent-test-requires-resource: psycopg2
  3# pylint:disable=import-error,broad-except,bare-except
  4import sys
  5import contextlib
  6
  7import gevent
  8from gevent.queue import Queue
  9from gevent.socket import wait_read, wait_write
 10from psycopg2 import extensions, OperationalError, connect
 11
 12
 13if sys.version_info[0] >= 3:
 14    integer_types = (int,)
 15else:
 16    import __builtin__
 17    integer_types = (int, __builtin__.long)
 18
 19
 20def gevent_wait_callback(conn, timeout=None):
 21    """A wait callback useful to allow gevent to work with Psycopg."""
 22    while 1:
 23        state = conn.poll()
 24        if state == extensions.POLL_OK:
 25            break
 26        elif state == extensions.POLL_READ:
 27            wait_read(conn.fileno(), timeout=timeout)
 28        elif state == extensions.POLL_WRITE:
 29            wait_write(conn.fileno(), timeout=timeout)
 30        else:
 31            raise OperationalError(
 32                "Bad result from poll: %r" % state)
 33
 34
 35extensions.set_wait_callback(gevent_wait_callback)
 36
 37
 38class AbstractDatabaseConnectionPool(object):
 39
 40    def __init__(self, maxsize=100):
 41        if not isinstance(maxsize, integer_types):
 42            raise TypeError('Expected integer, got %r' % (maxsize, ))
 43        self.maxsize = maxsize
 44        self.pool = Queue()
 45        self.size = 0
 46
 47    def create_connection(self):
 48        raise NotImplementedError()
 49
 50    def get(self):
 51        pool = self.pool
 52        if self.size >= self.maxsize or pool.qsize():
 53            return pool.get()
 54
 55        self.size += 1
 56        try:
 57            new_item = self.create_connection()
 58        except:
 59            self.size -= 1
 60            raise
 61        return new_item
 62
 63    def put(self, item):
 64        self.pool.put(item)
 65
 66    def closeall(self):
 67        while not self.pool.empty():
 68            conn = self.pool.get_nowait()
 69            try:
 70                conn.close()
 71            except Exception:
 72                pass
 73
 74    @contextlib.contextmanager
 75    def connection(self, isolation_level=None):
 76        conn = self.get()
 77        try:
 78            if isolation_level is not None:
 79                if conn.isolation_level == isolation_level:
 80                    isolation_level = None
 81                else:
 82                    conn.set_isolation_level(isolation_level)
 83            yield conn
 84        except:
 85            if conn.closed:
 86                conn = None
 87                self.closeall()
 88            else:
 89                conn = self._rollback(conn)
 90            raise
 91        else:
 92            if conn.closed:
 93                raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
 94            conn.commit()
 95        finally:
 96            if conn is not None and not conn.closed:
 97                if isolation_level is not None:
 98                    conn.set_isolation_level(isolation_level)
 99                self.put(conn)
100
101    @contextlib.contextmanager
102    def cursor(self, *args, **kwargs):
103        isolation_level = kwargs.pop('isolation_level', None)
104        with self.connection(isolation_level) as conn:
105            yield conn.cursor(*args, **kwargs)
106
107    def _rollback(self, conn):
108        try:
109            conn.rollback()
110        except:
111            gevent.get_hub().handle_error(conn, *sys.exc_info())
112            return
113        return conn
114
115    def execute(self, *args, **kwargs):
116        with self.cursor(**kwargs) as cursor:
117            cursor.execute(*args)
118            return cursor.rowcount
119
120    def fetchone(self, *args, **kwargs):
121        with self.cursor(**kwargs) as cursor:
122            cursor.execute(*args)
123            return cursor.fetchone()
124
125    def fetchall(self, *args, **kwargs):
126        with self.cursor(**kwargs) as cursor:
127            cursor.execute(*args)
128            return cursor.fetchall()
129
130    def fetchiter(self, *args, **kwargs):
131        with self.cursor(**kwargs) as cursor:
132            cursor.execute(*args)
133            while True:
134                items = cursor.fetchmany()
135                if not items:
136                    break
137                for item in items:
138                    yield item
139
140
141class PostgresConnectionPool(AbstractDatabaseConnectionPool):
142
143    def __init__(self, *args, **kwargs):
144        self.connect = kwargs.pop('connect', connect)
145        maxsize = kwargs.pop('maxsize', None)
146        self.args = args
147        self.kwargs = kwargs
148        AbstractDatabaseConnectionPool.__init__(self, maxsize)
149
150    def create_connection(self):
151        return self.connect(*self.args, **self.kwargs)
152
153
154def main():
155    import time
156    pool = PostgresConnectionPool("dbname=postgres", maxsize=3)
157    start = time.time()
158    for _ in range(4):
159        gevent.spawn(pool.execute, 'select pg_sleep(1);')
160    gevent.wait()
161    delay = time.time() - start
162    print('Running "select pg_sleep(1);" 4 times with 3 connections. Should take about 2 seconds: %.2fs' % delay)
163
164if __name__ == '__main__':
165    main()

Current source

Next page: Example threadpool.py