Examples of aiopg usage¶
Below is a list of examples from aiopg/examples
Every example is a correct tiny python program.
async/await style¶
Low-level API¶
import asyncio
import aiopg
dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'
async def test_select():
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
ret = []
async for row in cur:
ret.append(row)
assert ret == [(1,)]
print("ALL DONE")
loop = asyncio.get_event_loop()
loop.run_until_complete(test_select())
Usage of LISTEN/NOTIFY commands¶
import asyncio
import aiopg
dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'
async def notify(conn):
async with conn.cursor() as cur:
for i in range(5):
msg = "message {}".format(i)
print('Send ->', msg)
await cur.execute("NOTIFY channel, %s", (msg,))
await cur.execute("NOTIFY channel, 'finish'")
async def listen(conn):
async with conn.cursor() as cur:
await cur.execute("LISTEN channel")
while True:
msg = await conn.notifies.get()
if msg.payload == 'finish':
return
else:
print('Receive <-', msg.payload)
async def main():
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn1:
listener = listen(conn1)
async with pool.acquire() as conn2:
notifier = notify(conn2)
await asyncio.gather(listener, notifier)
print("ALL DONE")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple sqlalchemy usage¶
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('val', sa.String(255)))
async def create_table(conn):
await conn.execute('DROP TABLE IF EXISTS tbl')
await conn.execute('''CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))''')
async def go():
async with create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd') as engine:
async with engine.acquire() as conn:
await create_table(conn)
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
async for row in conn.execute(tbl.select()):
print(row.id, row.val)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
Complex sqlalchemy queries¶
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
import random
import datetime
metadata = sa.MetaData()
users = sa.Table('users', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255)),
sa.Column('birthday', sa.DateTime))
emails = sa.Table('emails', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('user_id', None, sa.ForeignKey('users.id')),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('private', sa.Boolean, nullable=False))
async def create_tables(conn):
await conn.execute('DROP TABLE IF EXISTS emails')
await conn.execute('DROP TABLE IF EXISTS users')
await conn.execute('''CREATE TABLE users (
id serial PRIMARY KEY,
name varchar(255),
birthday timestamp)''')
await conn.execute('''CREATE TABLE emails (
id serial,
user_id int references users(id),
email varchar(253),
private bool)''')
names = {'Andrew', 'Bob', 'John', 'Vitaly', 'Alex', 'Lina', 'Olga',
'Doug', 'Julia', 'Matt', 'Jessica', 'Nick', 'Dave', 'Martin',
'Abbi', 'Eva', 'Lori', 'Rita', 'Rosa', 'Ivy', 'Clare', 'Maria',
'Jenni', 'Margo', 'Anna'}
def gen_birthday():
now = datetime.datetime.now()
year = random.randint(now.year - 30, now.year - 20)
month = random.randint(1, 12)
day = random.randint(1, 28)
return datetime.datetime(year, month, day)
async def fill_data(conn):
async with conn.begin():
for name in random.sample(names, len(names)):
uid = await conn.scalar(
users.insert().values(name=name, birthday=gen_birthday()))
emails_count = int(random.paretovariate(2))
for num in random.sample(range(10000), emails_count):
is_private = random.uniform(0, 1) < 0.8
await conn.execute(emails.insert().values(
user_id=uid,
email='{}+{}@gmail.com'.format(name, num),
private=is_private))
async def count(conn):
c1 = (await conn.scalar(users.count()))
c2 = (await conn.scalar(emails.count()))
print("Population consists of", c1, "people with",
c2, "emails in total")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (sa.select([users.c.name])
.select_from(join)
.where(emails.c.private == False) # noqa
.group_by(users.c.name)
.having(sa.func.count(emails.c.private) > 0))
print("Users with public emails:")
async for row in conn.execute(query):
print(row.name)
print()
async def show_julia(conn):
print("Lookup for Julia:")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (sa.select([users, emails], use_labels=True)
.select_from(join).where(users.c.name == 'Julia'))
async for row in conn.execute(query):
print(row.users_name, row.users_birthday,
row.emails_email, row.emails_private)
print()
async def ave_age(conn):
query = (sa.select([sa.func.avg(sa.func.age(users.c.birthday))])
.select_from(users))
ave = (await conn.scalar(query))
print("Average age of population is", ave,
"or ~", int(ave.days / 365), "years")
print()
async def go():
engine = await create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd')
async with engine:
async with engine.acquire() as conn:
await create_tables(conn)
await fill_data(conn)
await count(conn)
await show_julia(conn)
await ave_age(conn)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
yield from/@coroutine style¶
Old style Low-level API¶
import asyncio
import aiopg
dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'
@asyncio.coroutine
def test_select():
pool = yield from aiopg.create_pool(dsn)
with (yield from pool.cursor()) as cur:
yield from cur.execute("SELECT 1")
ret = yield from cur.fetchone()
assert ret == (1,)
print("ALL DONE")
loop = asyncio.get_event_loop()
loop.run_until_complete(test_select())
Usage of LISTEN/NOTIFY commands using old-style API¶
import asyncio
import aiopg
dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'
@asyncio.coroutine
def notify(conn):
cur = yield from conn.cursor()
try:
for i in range(5):
msg = "message {}".format(i)
print('Send ->', msg)
yield from cur.execute("NOTIFY channel, %s", (msg,))
yield from cur.execute("NOTIFY channel, 'finish'")
finally:
cur.close()
@asyncio.coroutine
def listen(conn):
cur = yield from conn.cursor()
try:
yield from cur.execute("LISTEN channel")
while True:
msg = yield from conn.notifies.get()
if msg.payload == 'finish':
return
else:
print('Receive <-', msg.payload)
finally:
cur.close()
@asyncio.coroutine
def main():
pool = yield from aiopg.create_pool(dsn)
with (yield from pool) as conn1:
listener = listen(conn1)
with (yield from pool) as conn2:
notifier = notify(conn2)
yield from asyncio.gather(listener, notifier)
print("ALL DONE")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple sqlalchemy usage commands using old-style API¶
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('val', sa.String(255)))
@asyncio.coroutine
def create_table(engine):
with (yield from engine) as conn:
yield from conn.execute('DROP TABLE IF EXISTS tbl')
yield from conn.execute('''CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))''')
@asyncio.coroutine
def go():
engine = yield from create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd')
yield from create_table(engine)
with (yield from engine) as conn:
yield from conn.execute(tbl.insert().values(val='abc'))
res = yield from conn.execute(tbl.select())
for row in res:
print(row.id, row.val)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
Complex sqlalchemy queries commands using old-style API¶
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
import random
import datetime
metadata = sa.MetaData()
users = sa.Table('users', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255)),
sa.Column('birthday', sa.DateTime))
emails = sa.Table('emails', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('user_id', None, sa.ForeignKey('users.id')),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('private', sa.Boolean, nullable=False))
@asyncio.coroutine
def create_tables(engine):
with (yield from engine) as conn:
yield from conn.execute('DROP TABLE IF EXISTS emails')
yield from conn.execute('DROP TABLE IF EXISTS users')
yield from conn.execute('''CREATE TABLE users (
id serial PRIMARY KEY,
name varchar(255),
birthday timestamp)''')
yield from conn.execute('''CREATE TABLE emails (
id serial,
user_id int references users(id),
email varchar(253),
private bool)''')
names = {'Andrew', 'Bob', 'John', 'Vitaly', 'Alex', 'Lina', 'Olga',
'Doug', 'Julia', 'Matt', 'Jessica', 'Nick', 'Dave', 'Martin',
'Abbi', 'Eva', 'Lori', 'Rita', 'Rosa', 'Ivy', 'Clare', 'Maria',
'Jenni', 'Margo', 'Anna'}
def gen_birthday():
now = datetime.datetime.now()
year = random.randint(now.year - 30, now.year - 20)
month = random.randint(1, 12)
day = random.randint(1, 28)
return datetime.datetime(year, month, day)
@asyncio.coroutine
def fill_data(engine):
with (yield from engine) as conn:
tr = yield from conn.begin()
for name in random.sample(names, len(names)):
uid = yield from conn.scalar(
users.insert().values(name=name, birthday=gen_birthday()))
emails_count = int(random.paretovariate(2))
for num in random.sample(range(10000), emails_count):
is_private = random.uniform(0, 1) < 0.8
yield from conn.execute(emails.insert().values(
user_id=uid,
email='{}+{}@gmail.com'.format(name, num),
private=is_private))
yield from tr.commit()
@asyncio.coroutine
def count(engine):
with (yield from engine) as conn:
c1 = (yield from conn.scalar(users.count()))
c2 = (yield from conn.scalar(emails.count()))
print("Population consists of", c1, "people with",
c2, "emails in total")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (sa.select([users.c.name])
.select_from(join)
.where(emails.c.private == False) # noqa
.group_by(users.c.name)
.having(sa.func.count(emails.c.private) > 0))
print("Users with public emails:")
ret = yield from conn.execute(query)
for row in ret:
print(row.name)
print()
@asyncio.coroutine
def show_julia(engine):
with (yield from engine) as conn:
print("Lookup for Julia:")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (sa.select([users, emails], use_labels=True)
.select_from(join).where(users.c.name == 'Julia'))
res = yield from conn.execute(query)
for row in res:
print(row.users_name, row.users_birthday,
row.emails_email, row.emails_private)
print()
@asyncio.coroutine
def ave_age(engine):
with (yield from engine) as conn:
query = (sa.select([sa.func.avg(sa.func.age(users.c.birthday))])
.select_from(users))
ave = (yield from conn.scalar(query))
print("Average age of population is", ave,
"or ~", int(ave.days / 365), "years")
print()
@asyncio.coroutine
def go():
engine = yield from create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd')
yield from create_tables(engine)
yield from fill_data(engine)
yield from count(engine)
yield from show_julia(engine)
yield from ave_age(engine)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())