Creating a Simple Work Queue Behind an Flask API⚓︎
Like all of us I am always looking for ways to stop wasting my time. I often find myself needing to a run a command over and over again to test something (e.g. simulate load). My preference is always that I have a button that I can just click and it does the thing I need it to do. The below are the steps involved in setting up a simple Flask API (with swagger page), with a simple in memory work queue, that a configured number of workers monitor to perform the required task.
First install gunicorn which we will use the run the web server, and the flask framework.
python3 -mpip install gunicorn flask-restx
Create the App⚓︎
Then on to creating the application.
Setup Work Queue and Configure Workers⚓︎
from queue import Queue
from threading import Thread
import time
number_of_threads = 5 ### obviously this is configurable
my_work_queue = Queue()
def doFileUploads(i, q):
while True:
print("%s: Looking for the next record" % i)
record = q.get()
print("%s: Uploading:" % i, record)
time.sleep(10) ## Do the thing that needs to be done
for i in range(number_of_threads):
worker = Thread(target=doFileUploads, args=(i, my_work_queue,)) ### setup the workers -- give them work
Create the Flask App and Add The Routes⚓︎
from flask import Flask
from flask_restx import Resource, Api, reqparse
app = Flask(__name__)
api = Api(app)
ns = api.namespace("queue", description="Simple Queue API operations")
parser = reqparse.RequestParser()
"recordDetails", type=int, help="Rate cannot be converted", location="json"
"numberOfQueueEntriesToCreate", required=True, type=int, location="args"
"priority", choices=(1, 2), type=int, help="Bad choice: {error_msg}", location="args"
class Add_Queue_Record(Resource):
def put(self):
Add a record to the queue
args = parser.parse_args()
for x in range(0, args['numberOfQueueEntriesToCreate']):
my_work_queue.put({"data": args['recordDetails']})
return {
"message": "Record Added to Work Queue",
"queue_size": my_work_queue.qsize(),
"args": args,
def delete(self):
Remove everything from the queue
while my_work_queue.qsize() > 0:
return {"message": "Work Queue Emptied", "queue_size": my_work_queue.qsize()}
class Count_Queue_Records(Resource):
def get(self):
Count the number of records in the queue
return {"queue_size": my_work_queue.qsize()}
Configure Interface and Port⚓︎
if __name__ == '__main__':, host='', port='9999')
Start the Web Server⚓︎
gunicorn app:app --reload -b '' -w 1 -t 300
That's it. Noting that only one gunicorn worker can be used as if we start mulitple each would have their own work queue.