Note

This is Part 2 of a three-part series.

  • Part 1 - pattern motivation and theory
  • Part 2 - naïve implementation
  • Part 3 - pattern implementation


Introduction

Let’s quickly review Part 1 of the series.

In Part 1, we:

  • Made the case for extensibility as a quality attribute to seek in designing algorithm-centric production workflows.
  • Presented a fictional use case for a containerized analysis worker environment, a common kind of setup in real-world applications, that reads images from a queue and runs algorithms on them.
  • Came up with an initial design the worker, naïve with respect to extensibility.
  • Analyzed what made the initial design less extensible and used our conclusions to come up with a design pattern that solves for extensibility.

In Part 2, we’ll go through implementing the naïve design. This is useful mostly as a precursor to Part 3 where we’ll re-implement our worker using the final design pattern. By first having the initial design coded and functioning, we’ll be able to apply the pattern to it, test that it still works and look at how much easier it is to extend in practical terms.

A note on implementation and code structure

As explained in Part 1, we’ll implement everything in Python. I include some code snippets throughout the article tailored to aid discussion, but you can find the complete working code for the example this GitHub repo. Code for the initial design presented here is available in branch initial.

The code for all components is placed in a single repository. Looking at the repo, you’ll find a directory for each component with its source files, Dockerfile and a build.sh script. The worker and producer directories are there to assist in running and testing everything locally.

Implementation

To quickly review Part 1, the initial design we’ll implement here consists of a Controller component that reads images from a queue and sends them to a Runner component. The latter houses the actual algorithm code, and exposes it on an auxiliary HTTP server for the controller to send requests to.

Controller

We’ll be using a RabbitMQ queue called tasks in our example setup, so we set up our controller with pika to consume from the tasks queue.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks')

We define a callback to be run on each consumed message. This callback does the controller work: call the runner, check for errors in the response and do something with the result or the error. In our example, we just log the result if the image was processed successfully and the error otherwise.

import requests

RUNNER_HOST = os.environ.get('RUNNER_HOST', 'localhost')
RUNNER_PORT = os.environ.get('RUNNER_PORT', 5000)

def callback(c, m, p, body):
		print('Received message, calling runner')
    headers = {'Content-Type': 'application/json'}
    url = 'http://{}:{}'.format(RUNNER_HOST, RUNNER_PORT)
    response = requests.request("POST", url, headers=headers, data=body)
    if response.ok:
        print(f'Received result from runner: {response.json()}')
    else:
        print(f'Received error response from runner: {response.status_code} {response.json()}')
        # Handle error (retry/requeue/send to dead-letter exchange)

Lastly, we start listening for messages on the queue.

channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback, auto_ack=True)
print(f'Listening for messages on queue {QUEUE_NAME}')
channel.start_consuming()

Classifier

I trained a simple classifier for the purposes of this article. Since our focus here is on the ML Ops side of things, we’ll just use the model as a black box and look at how to use and deploy it. However, you can check out the supported memes, model, data and full code for training in this kaggle notebook. Credit to Keras’s transfer learning guide and to gmor’s meme dataset on Kaggle.

We define a classifier module responsible for loading and running the classifier. In this module, the model is loaded and compiled from its .h5 serialization upon initialization. In this case, we assume the .h5 file is packaged with the code in a model/ directory at root level.

import tensorflow as tf

with open('model/meme-classifier-model.json') as f:
    model = tf.keras.models.model_from_json(f.read())

model.load_weights('model/meme-classifier-model.h5')
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss='categorical_crossentropy',
    metrics=['accuracy'],
)

For running the model, we expose a run_on_url function. It uses some auxiliary functions to process the input image and the output. We’ll omit the code for these functions to reduce cluttering here, but you can find it in the repo.

def run_on_url(url):
    logger.debug('Fetching image')
    image_bytes = _get_image_bytes(url)

    logger.debug('Reading and preparing image')
    image_tensor = _get_image_tensor(image_bytes)

    logger.debug('Running on image')
    pred = model.predict(image_tensor)

    return _pred_to_label(pred)

We also define a server module responsible for exposing the classifier on a local HTTP endpoint. We use the FastAPI and uvicorn to set up a simple API on a local HTTP server that calls classifier.run_on_url upon POST requests on / with a JSON body containing an image URL. It looks something like this.

Some relevant imports. exceptions is a local module defining expected exceptions in our application (find it in the repo).

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

import classifier
import exceptions

Define request and response models using Pydantic.

class ClassificationRequest(BaseModel):
    image_url: str

class ClassificationResponse(BaseModel):
    label: str
    score: float

Define the FastAPI app and the API.

app = FastAPI()

@app.post("/", status_code=200)
async def run_algorithm(request: ClassificationRequest):
    try:
        logger.info('Running classifier on URL'.format(request.image_url))
        label, score = classifier.run_on_url(request.image_url)
        return ClassificationResponse(label=label, score=score)

    except exceptions.RequestError as e:
        raise HTTPException(status_code=400, detail=f'Error fetching request image, received {e.response.status_code}')

    except Exception as e:
        error_str = traceback.format_exc()
        raise HTTPException(status_code=500, detail=error_str)

Run the server when main.

if __name__ == '__main__':
    uvicorn.run(app)

Notice we leverage FastAPI automatic model validation on our endpoint and we do some additional error management for errors we can expect coming from our processing of that payload. As we saw above, the controller looks out for an error status being returned by its runner and does some management itself of the failure to process a given message (like logging it and/or sending it to a dead-letter exchange). So in implementing the runner, we want to be diligent in capturing expected errors so that we can return a meaningful status for the controller to handle, which will later help us understand failed messages more easily and be robust in managing unexpected errors.

Dockerization, Compose File

I tend to favor containerizing all components that go into a worker setup as this lends itself very well to deployment using orchestration tools such as AWS ECS or Kubernetes. It also helps in minimizing differences between development, staging and production environments. We’ll use Docker Compose to run our setup locally.

Controller Dockerfile

FROM python:3.7

WORKDIR /opt/project

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY ./* ./

Classifier Dockerfile

FROM tensorflow/tensorflow

WORKDIR /opt/project

COPY ./requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
RUN pip install pillow

COPY ./ ./

Compose file

We set our environment up with the queue service and both components of our worker. I only show some relevant fields from the file, but you can check it out with all of its details in the repo.

The restart and depends_on clauses in the controller service are there to allow a warmup period for the rabbitmq service after it starts.

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3 
    ports:
      - 5672:5672
      
  meme-classifier:
    image: meme-classifier
    command: python server.py
    
  controller:
    image: controller
    command: python main.py
    restart: on-failure
    depends_on:
      - rabbitmq

Producer

The code in the producer directory is there to aid in testing. It’s set up to connect to the queue and allow us to easily send some test messages for our worker to process.

Testing it out

After building the component images and tagging them appropriately, run docker-compose up -d in the worker directory to spin up the environment. Run docker-compse logs -f to track initialization. You’re likely to see some connection errors from controller as it fails to connect to queue while the latter completes its initialization.

Once both the controller and classifier services are listening for messages and requests respectively, we can send some meme image URLs to the queue and get some classification happening.

...
meme-classifier | INFO :: [+] Listening on port 5000
...
controller      | INFO :: [+] Listening for messages on queue tasks

Let’s try one out.

What if I told you / Matrix Morpheus - what if i told you i have no idea how my glasses dont fall out

From a terminal at ./producer/:

$ python main.py "https://memegenerator.net/img/instances/39673831.jpg"

Logs:

controller         | INFO :: Received message, calling runner
meme-classifier    | INFO :: Running classifier on URL
controller         | INFO :: Received result from runner: {'label': 'matrix_morpheus', 'score': 0.99989}

Looking good! You can play around some more with it if you like and have some fun looking at memes as I have while doing the same 😛.

What’s Next

In Part 3 of the series, we’ll implement the final pattern presented in Part 1 and test it. We’ll then try to further extend our resulting setup with a new algorithm and see in practical terms if we achieved our goal of making it low-overhead and easy to do.