At Wehkamp we use AWS Lambda to classify images on S3. The Lambda is triggered when a new image is uploaded to the S3 bucket. Currently we have over 6.400.000 images in the bucket. Now we would like to run the Lambda for all images of the bucket. In this blog I’ll show how we did this with a Python 3.6 script.

The idea

The basic idea is as follows:

  • Keep messages short: {bucket-name}/{key}/{to}/{item}
  • Send multiple record at once to SQS using send_message_batch unfortunately 10 is the max.
  • Use eventlet to make the networking as light and fast as possible. This gives us a nice pool on which we can operate.
  • It would be nice to see some progress.

Dependencies

The script has the some dependencies (that can be easily installed):

  • pip3 install awscli --upgrade --user to store the connection details of connecting to AWS. More about AWS CLI here. Don’t forget to configure it doing aws configure.
  • pip3 install boto3 to interact with S3 and SQS.
  • pip3 install eventlet to use a great concurrent library for non-blocking I/O. Note: this library does not work for Python 3.7!

The script

I’ve added some comments to explain what’s what. The most important thing is to monkey patch as soon as possible! ?

#!/usr/bin/python3

# patch all to use green threads
from eventlet import *
monkey_patch()

import argparse, boto3, hashlib, os, sys, time

# init aws clients and resources
s3 = boto3.resource('s3')
sqs = boto3.client('sqs')
start_time = time.time()

# sends a list of entries into the queue. Note: max size is 10
def send_to_queue(sqs_url, entries):
    sqs.send_message_batch(QueueUrl=sqs_url, Entries=entries)


# reads the bucket en enqueues all objects in batches of 10 on
# a green thread pool with fast network IO
def enqueue_all_bucket_objects(sqs_url, bucket_name, pool_size):
    sqs_name=sqs_url.split('/')[-1]
    print("\nSending objects from S3 Bucket '{}' to SQS '{}':".format(bucket_name, sqs_name))

    processed = 0
    pool = GreenPool(size=pool_size)
    bucket = s3.Bucket(bucket_name)
    entries = []

    for object in bucket.objects.all():

        message = '{}/{}'.format(bucket.name, object.key)
        id = hashlib.md5(message.encode('utf-8')).hexdigest()
        entries.append({'Id': id, 'MessageBody': message})

        # send the entry in batches of 10 on the green pool
        if len(entries) == 10:
            pool.spawn_n(send_to_queue, sqs_url, entries[:])
            processed += 10
            entries = []

        # don't update UI too often, slows down performance
        if processed % 250 == 0:
            print_progress(processed)

    if len(entries) != 0:
        pool.spawn_n(send_to_queue, sqs_url, entries)
        processed += len(entries)

    print_progress(processed, False)
    print('Waiting for pool to finish...')
    pool.waitall()


# prints the formatted total
def print_progress(total, overwritable=True):
    elapsed_time = time.time() - start_time
    elapsed = time.strftime('%H:%M:%S', time.gmtime(elapsed_time))
    end = ('\r' if overwritable else '\n')
    print('Total objects queued: {:,} in {}  '.format(total, elapsed), end = end)


def main(argv):
    # use the ArgumentParser to tell humans what's going on
    parser = argparse.ArgumentParser(description="Enqueues all object in the S3 bucket to the SQS bucket.")
    parser.add_argument('-b', '--bucket', help='the name of the S3 bucket', required=True)
    parser.add_argument('-s', '--sqs', help='the URL to the SQS queue', required=True)
    parser.add_argument('-p', '--pool_size', help='set pool size for concurrency. Default is 20.', default=20)

    args = parser.parse_args()

    try:
        enqueue_all_bucket_objects(args.sqs, args.bucket, args.pool_size)
    except KeyboardInterrupt:
        # prevent the interupt stack trace from showing when user cancels
        print('\nProgram terminated by user.\n')
        sys.exit(0)

    print('Done!\n')
        
        
if __name__ == "__main__":
    main(sys.argv[1:])

Run the script

We now have a script that we can run against any S3 bucket to any SQS queue. The ArgumentParser provides a nice little help feature:

PS C:\scripts> py .\send-bucket-to-sqs.py --help                      
usage: send-bucket-to-sqs.py [-h] -b BUCKET -s SQS [-p POOL_SIZE]

Enqueues all object in the S3 bucket to the SQS bucket.

optional arguments:
  -h, --help            show this help message and exit
  -b BUCKET, --bucket BUCKET
                        the name of the S3 bucket
  -s SQS, --sqs SQS     the URL to the SQS queue
  -p POOL_SIZE, --pool_size POOL_SIZE
                        set pool size for concurrency. Default is 20.

When I run the script on my local Windows machine, it goes like this:

Speed on Windows. With this speed, parsing all records would take (6.400.000 / (14.000 / 18)) / 60 = ±137 minutes.
With this speed, parsing all records would take (6.400.000 / (14.000 / 18)) / 60 = ±137 minutes.

When I run it on a CentOS / T3 Medium Instance on AWS it looks like this:

Speed on AWS EC2 T3 Medium.  With this speed, parsing would take (6.400.000 / (20.000 / 18)) / 60 = 96 minutes = ±30% faster.
With this speed, parsing would take (6.400.000 / (20.000 / 18)) / 60 = 96 minutes = ±30% faster.