Creating a Simple Work Queue Behind an Flask API⚓︎
Summary⚓︎
Like all of us I am always looking for ways to stop wasting my time. I often find myself needing to a run a command over and over again to test something (e.g. simulate load). My preference is always that I have a button that I can just click and it does the thing I need it to do. The below are the steps involved in setting up a simple Flask API (with swagger page), with a simple in memory work queue, that a configured number of workers monitor to perform the required task.
Setup⚓︎
First install gunicorn which we will use the run the web server, and the flask framework.
python3 -mpip install gunicorn flask-restx
Create the App⚓︎
Then on to creating the application.
Setup Work Queue and Configure Workers⚓︎
from queue import Queue
from threading import Thread
import time
number_of_threads = 5 ### obviously this is configurable
my_work_queue = Queue()
def doFileUploads(i, q):
while True:
print("%s: Looking for the next record" % i)
record = q.get()
print("%s: Uploading:" % i, record)
time.sleep(10) ## Do the thing that needs to be done
q.task_done()
for i in range(number_of_threads):
worker = Thread(target=doFileUploads, args=(i, my_work_queue,)) ### setup the workers -- give them work
worker.setDaemon(True)
worker.start()
Create the Flask App and Add The Routes⚓︎
from flask import Flask
from flask_restx import Resource, Api, reqparse
app = Flask(__name__)
api = Api(app)
ns = api.namespace("queue", description="Simple Queue API operations")
parser = reqparse.RequestParser()
parser.add_argument(
"recordDetails", type=int, help="Rate cannot be converted", location="json"
)
parser.add_argument(
"numberOfQueueEntriesToCreate", required=True, type=int, location="args"
)
parser.add_argument(
"priority", choices=(1, 2), type=int, help="Bad choice: {error_msg}", location="args"
)
@ns.route("/")
class Add_Queue_Record(Resource):
@api.expect(parser)
def put(self):
"""
Add a record to the queue
"""
args = parser.parse_args()
for x in range(0, args['numberOfQueueEntriesToCreate']):
my_work_queue.put({"data": args['recordDetails']})
return {
"message": "Record Added to Work Queue",
"queue_size": my_work_queue.qsize(),
"args": args,
}
def delete(self):
"""
Remove everything from the queue
"""
while my_work_queue.qsize() > 0:
my_work_queue.get()
my_work_queue.task_done()
return {"message": "Work Queue Emptied", "queue_size": my_work_queue.qsize()}
@ns.route("/count")
class Count_Queue_Records(Resource):
def get(self):
"""
Count the number of records in the queue
"""
return {"queue_size": my_work_queue.qsize()}
Configure Interface and Port⚓︎
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port='9999')
Start the Web Server⚓︎
gunicorn app:app --reload -b '0.0.0.0:9999' -w 1 -t 300
That's it. Noting that only one gunicorn worker can be used as if we start mulitple each would have their own work queue.