At Wehkamp we use AWS Lambda to classify images on S3. The Lambda is triggered when a new image is uploaded to the S3 bucket. Currently we have over 6.400.000 images in the bucket. Now we would like to run the Lambda for all images of the bucket. In this blog I'll show how we did this with a Python 3.6 script.
The idea
The basic idea is as follows:
- Keep messages short:
{bucket-name}/{key}/{to}/{item}
- Send multiple record at once to SQS using
send_message_batch
– unfortunately 10 is the max. - Use eventlet to make the networking as light and fast as possible. This gives us a nice pool on which we can operate.
- It would be nice to see some progress.
Dependencies
The script has the some dependencies (that can be easily installed):
pip3 install awscli --upgrade --user
to store the connection details of connecting to AWS. More about AWS CLI here. Don't forget to configure it doingaws configure
.pip3 install boto3
to interact with S3 and SQS.pip3 install eventlet
to use a great concurrent library for non-blocking I/O. Note: this library does not work for Python 3.7!
The script
I've added some comments to explain what's what. The most important thing is to monkey patch as soon as possible! ?
#!/usr/bin/python3
# patch all to use green threads
from eventlet import *
monkey_patch()
import argparse, boto3, hashlib, os, sys, time
# init aws clients and resources
s3 = boto3.resource('s3')
sqs = boto3.client('sqs')
start_time = time.time()
# sends a list of entries into the queue. Note: max size is 10
def send_to_queue(sqs_url, entries):
sqs.send_message_batch(QueueUrl=sqs_url, Entries=entries)
# reads the bucket en enqueues all objects in batches of 10 on
# a green thread pool with fast network IO
def enqueue_all_bucket_objects(sqs_url, bucket_name, pool_size):
sqs_name=sqs_url.split('/')[-1]
print("\nSending objects from S3 Bucket '{}' to SQS '{}':".format(bucket_name, sqs_name))
processed = 0
pool = GreenPool(size=pool_size)
bucket = s3.Bucket(bucket_name)
entries = []
for object in bucket.objects.all():
message = '{}/{}'.format(bucket.name, object.key)
id = hashlib.md5(message.encode('utf-8')).hexdigest()
entries.append({'Id': id, 'MessageBody': message})
# send the entry in batches of 10 on the green pool
if len(entries) == 10:
pool.spawn_n(send_to_queue, sqs_url, entries[:])
processed += 10
entries = []
# don't update UI too often, slows down performance
if processed % 250 == 0:
print_progress(processed)
if len(entries) != 0:
pool.spawn_n(send_to_queue, sqs_url, entries)
processed += len(entries)
print_progress(processed, False)
print('Waiting for pool to finish...')
pool.waitall()
# prints the formatted total
def print_progress(total, overwritable=True):
elapsed_time = time.time() - start_time
elapsed = time.strftime('%H:%M:%S', time.gmtime(elapsed_time))
end = ('\r' if overwritable else '\n')
print('Total objects queued: {:,} in {} '.format(total, elapsed), end = end)
def main(argv):
# use the ArgumentParser to tell humans what's going on
parser = argparse.ArgumentParser(description="Enqueues all object in the S3 bucket to the SQS bucket.")
parser.add_argument('-b', '--bucket', help='the name of the S3 bucket', required=True)
parser.add_argument('-s', '--sqs', help='the URL to the SQS queue', required=True)
parser.add_argument('-p', '--pool_size', help='set pool size for concurrency. Default is 20.', default=20)
args = parser.parse_args()
try:
enqueue_all_bucket_objects(args.sqs, args.bucket, args.pool_size)
except KeyboardInterrupt:
# prevent the interupt stack trace from showing when user cancels
print('\nProgram terminated by user.\n')
sys.exit(0)
print('Done!\n')
if __name__ == "__main__":
main(sys.argv[1:])
Run the script
We now have a script that we can run against any S3 bucket to any SQS queue. The ArgumentParser
provides a nice little help feature:
PS C:\scripts> py .\send-bucket-to-sqs.py --help
usage: send-bucket-to-sqs.py [-h] -b BUCKET -s SQS [-p POOL_SIZE]
Enqueues all object in the S3 bucket to the SQS bucket.
optional arguments:
-h, --help show this help message and exit
-b BUCKET, --bucket BUCKET
the name of the S3 bucket
-s SQS, --sqs SQS the URL to the SQS queue
-p POOL_SIZE, --pool_size POOL_SIZE
set pool size for concurrency. Default is 20.
When I run the script on my local Windows machine, it goes like this:

When I run it on a CentOS / T3 Medium Instance on AWS it looks like this:
