From content streaming to worker queues

For the last week, I’ve been working on deploying Style Transfer Playground, the Flask app that I built that blends the content of one user-uploaded image with the style of another, letting the user see the algorithmically generated hybrid image one iteration at a time in the web browser.

The biggest technical challenge when I built the app was how to show the user the intermediate images in real time as they were being generated. As explained in this earlier post, my solution to the problem utilized Flask’s content streaming. Essentially, the html template that the user sees while their image is being generated is streamed to them one piece at a time as the machine learning algorithm completes a new iteration. Each streamed chunk contains javascript code that updates the image element with the file location of the latest intermediate file. The intermediate files are served with a second, simple http server. The architecture of the app looks like this:


Unfortunately, after deciding to deploy my app using Google App Engine, I discovered that GAE does not support http response streaming. Even though my Flask app thought that it was streaming content to the user, Google holds all of that streamed html until the response is complete and then displays it to the user, which would allow them to see only the final image and not the intermediate ones.

Getting rid of content streaming forced me to completely overhaul the architecture of my application. In the new design, instead of having the main Flask server generate the images upon request, these requests are kicked out to additional “worker” processes. The flow of the app now goes as follows:

  • The client sends a request to /create_image
  • The Flask server receives the request, creates a task for the call to make_image(), and places that task in a queue
  • The Flask server sends a template and the pending task id to the client. The template contains Javascript code to poll a new endpoint, /status/, at regular intervals
  • When available, a worker fetches the make_image task from the queue and begins to generate the image via a Tensorflow computation
  • After every iteration of the image generation process, the worker updates the task’s status by sending a message through the queue containing the filename of the most recently generated intermediate image
  • When the client polls /status/ and a new update is available, the client grabs the location of the new file and updates the page asynchronously

Initially, I used Google’s Pub Sub to manage the queue and their psq package for the workers. Although I got it to work with these tools, I found the publisher/subscriber model to be unnecessarily complicated for my use case. I then switched to redis for the queue and celery for the workers.

The new architecture of the application can be seen below. In addition to the worker/queue changes, for cloud deploy I also changed over the database to Google’s Cloud SQL and the filesystem to Cloud Storage.


Configuring the celery workers was surprisingly easy. In, we establish the celery client and configure its broker to a redis url. The url can be a local redis server or one on Redis Cloud, which is compatible with GAE.

from celery import Celery


celery = Celery(, broker=app.config['CELERY_BROKER_URL'])

In the endpoint /create_image assign the call to make_image to the queue by calling it with apply_async(args) and tagging the function with the @celery decorator:

def create_image():
    task = make_image.apply_async((num_iters+1, style_im_path, content_im_path))
    return render_template('create_image.html',

# make_image function
def make_image(self, num_iterations, style_image_file, content_image_file=None):

On each iteration of the loop inside make_image, we update the task’s status:

# inside make_image
    for it in range(num_iterations):
        self.update_state(state="PROGRESS", meta={'status': 'working', 'iteration': it, 'file_url': file_url})

The new /status/ endpoint looks up the task’s status and returns a JSON response to the client:

def get_status(task_id):
    task = make_image.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = ...
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'iteration':'iteration', 0),
            'file_url':'file_url', ''),
            'status':'status', '')
        response = ...
    return jsonify(response)

Finally, the template create_image.html contains Javascript and JQuery code to poll the /status/ endpoint every 2 seconds and update the image element with the new file url:

<script type="text/javascript">
    function update_image(task_id) {
	$.getJSON('/status/' + task_id, function(data) {
	    if (data['status'] == 'working') {
		document.getElementById("my_image").src = data['file_url'];
		document.getElementById("iter_counter").innerHTML = 'Iterations: ' + data['iteration'] + '/{{num_iters}}'
	    if (data['status'] != 'completed') {
	        setTimeout(function() {
	         }, 2000);


This solution works great running the Flask server and the celery workers locally on my machine. However, when deployed to Google App Engine, Google automatically kills my workers after 60 seconds (see this  post), which is not long enough to run the machine learning algorithm. Google has their own task queue service which grants requests up to 10 minutes, but unfortunately I’d like even more time than that. I’m currently exploring other options, but hopefully this architecture is interesting to those of you with shorter requests to delegate.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s