Here is a quick and simple producer/consumer example. The producer will put messages on the queue with a delay of up to 4 seconds between each put. The consumer will read off any messages on the queue, waiting up to 2 seconds for messages to appear before returning.
Producer¶
#!/usr/bin/env python3
"""
aiobotocore SQS Producer Example
"""
import asyncio
import random
import sys
import botocore.exceptions
from aiobotocore.session import get_session
QUEUE_NAME = 'test_queue12'
async def go():
# Boto should get credentials from ~/.aws/credentials or the environment
session = get_session()
async with session.create_client('sqs', region_name='us-west-2') as client:
try:
response = await client.get_queue_url(QueueName=QUEUE_NAME)
except botocore.exceptions.ClientError as err:
if (
err.response['Error']['Code']
== 'AWS.SimpleQueueService.NonExistentQueue'
):
print(f"Queue {QUEUE_NAME} does not exist")
sys.exit(1)
else:
raise
queue_url = response['QueueUrl']
print('Putting messages on the queue')
msg_no = 1
while True:
try:
msg_body = f'Message #{msg_no}'
await client.send_message(
QueueUrl=queue_url, MessageBody=msg_body
)
msg_no += 1
print(f'Pushed "{msg_body}" to queue')
await asyncio.sleep(random.randint(1, 4))
except KeyboardInterrupt:
break
print('Finished')
def main():
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Consumer¶
#!/usr/bin/env python3
"""
aiobotocore SQS Consumer Example
"""
import asyncio
import sys
import botocore.exceptions
from aiobotocore.session import get_session
QUEUE_NAME = 'test_queue12'
async def go():
# Boto should get credentials from ~/.aws/credentials or the environment
session = get_session()
async with session.create_client('sqs', region_name='us-west-2') as client:
try:
response = await client.get_queue_url(QueueName=QUEUE_NAME)
except botocore.exceptions.ClientError as err:
if (
err.response['Error']['Code']
== 'AWS.SimpleQueueService.NonExistentQueue'
):
print(f"Queue {QUEUE_NAME} does not exist")
sys.exit(1)
else:
raise
queue_url = response['QueueUrl']
print('Pulling messages off the queue')
while True:
try:
# This loop wont spin really fast as there is
# essentially a sleep in the receive_message call
response = await client.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=2,
)
if 'Messages' in response:
for msg in response['Messages']:
print(f'Got msg "{msg["Body"]}"')
# Need to remove msg from queue or else it'll reappear
await client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg['ReceiptHandle'],
)
else:
print('No messages in queue')
except KeyboardInterrupt:
break
print('Finished')
if __name__ == '__main__':
asyncio.run(go())