Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
INSIGHTS
42 min read
Share
In part 2 of this series, we took a deep dive into configuring a multi-tier reference infrastructure that can host an AIoT application. In the concluding part, we will see how to design and deploy a reference AIoT application on this infrastructure.
I will start by laying out the primary use case of this application. Then using detailed design artifacts such as event diagrams and deployment topology models I will explain the system design. I will show how to code, build, containerize, deploy and orchestrate AIoT modules and services as MLOps pipelines on ARM64 devices.
I will also show you how to design and code the IoT device firmware to perform inferences using the TFLM library. The electrical circuit schematics and calculations for the industrial IoT setup are also included in this post.
The reference application simulates a predictive maintenance scenario wherein an industrial induction motor is monitored by an AIoT application. In the real world, this motor could be powering a mud pump for an oil rig, a conveyor drive, or a cooling fan for a wind turbine’s thermal management system.
The primary use case of this application is summarized below:
The system design that fulfils the requirements of the primary use cases is organized into three sections
The smallest deployable unit of this application is a container. Every microservice and module of this application is packaged as a container. The containers are managed by K3S and the container workflows by Argo.
In order to deploy on resource constrained devices a resource conserving technique is to use lightweight Distroless containers.
The deployment target hardware for the reference application services in this tier is one Nvidia Jetson Nano and two Raspberry Pi devices.
Device | Jetson Nano DevKit |
---|---|
Node Name | agentnode-nvidia-jetson |
Cluster size | 1 |
Accelerator | GPU |
OS | Ubuntu 18.04.6 LTS / 4.9.253-tegra |
Packaging | Container |
Storage | Longhorn CSI Plugin |
Orchestration | Argo Workflows |
Workloads Deployed | MLOps Training Pipeline |
As an example let's see how a training task from the training pipeline gets deployed to this device. The training workload deployment is managed by the Argo workflow platform. The training pipelines and the dependencies are expressed as workflow DAGs. Each workload gets scheduled on the node with the required hardware accelerator using a declarative syntax of Kubernetes advanced scheduling directives such as pod affinity, node selectors, taints, and tolerations. See the section on labels and taints from the previous article.
nodeSelector:
gpuAccelerator: "true"
The Train module is packaged as a container and then get deployed by the Argo workflow engine as a task in the ML Pipeline. Here is a snippet that shows how a containerized task is expressed as Argo DAG
- name: train-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP Address>.nip.io:5000/training-module:latest
env:
- name: MODEL_REGISTRY_URL
value: "https://<IP Address>:30007/uploadModel"
nodeSelector:
gpuAccelerator: "true"
The gpuAccelerator label ensures that the training workloads are scheduled only to nodes with GPU accelerators.
To see how the entire training pipeline is built and deployed, see this section
Device | Raspberry Pi |
---|---|
Node Name | agentnode-raspi1 |
Cluster size | 1 of 2 |
Accelerator | None |
OS | Debian GNU/Linux 10 (buster)/5.10.63-v8+ |
Packaging | Container |
Storage | Longhorn CSI Plugin |
Orchestration | K3S |
Services Deployed | Embedded Go MQTT Broker MQTT-Kafka Protocol Bridge Model Registry μService Device Registry μService Training Datastore μService Data Ingest μService |
Device | Raspberry Pi |
---|---|
Node Name | agentnode-raspi12 |
Cluster size | 2 of 2 |
Accelerator | None |
OS | Debian GNU/Linux 10 (buster)/5.10.63-v8+ |
Storage | Longhorn CSI Plugin |
Services Deployed | K3S Server Argo Workflows Server Strimzi Longhorn Docker container registry |
We get into more detail on how to build and deploy these services in the subsequent sections.
The inference components of the reference app get deployed on two classes of devices - Coral Edge TPU and ESP32S.
Device | Coral Dev Board |
---|---|
Node Name | agentnode-coral-tpu1, agentnode-coral-tpu2, agentnode-coral-tpu3 |
Cluster size | 3 |
Accelerator | TPU |
OS | Mendel GNU/Linux 5 (Eagle)/ 4.14.98-imx |
Packaging | Container |
Storage | Longhorn CSI Plugin |
Orchestration | K3S |
Workloads Deployed | PyCoral TF Lite Inference Module Go Streaming API Sidecar |
The coral edge TPU deployments are managed by K3S and runs a PyCoral inference module that uses a streaming API sidecar to get event streams from the Kafka broker. The inference module and the sidecar are configured to run on the same pod and communicate over a TCP/IP socket. The required configuration is expressed in this YAML file.
Device | ESP32 SoC |
---|---|
Accelerator | None |
No. of devices | 1 |
OS | ESP-IDF FreeRTOS |
Packaging | Firmware |
Deployment | ESP32 OTA |
Workloads Deployed | TFLM Module |
The primary deployment target on the things tier is the ESP32 SoC firmware that runs the ESP-IDF FreeRTOS. This is the only tier that does not use containerization and hence is not managed by K3S. The control and communication for modules within this tier are based on lightweight pub/sub using MQTT. The ESP32 firmware embeds a TFLM C++ inference module. After the initial flash of the firmware, the TF Lite model is downloaded from the model registry. The details on how to code, build and flash the firmware are in this section.
Using the reference architecture blueprint, this application is organized and modularized into multiple components that run on separate tiers and communicate using both synchronous and asynchronous APIs. Let's zoom into these interactions using a sequence diagram. In this sequence diagram, you will see how various components of the application interact and exchange messages in order to carry out the functionality of the primary use case.
Various control and data interactions take place over two separate pub/sub brokers - MQTT and Kafka. The brokers are bridged together by a custom protocol bridge microservice. The control events, shown in green lines, flow on the control-topic, and the data messages, shown in purple lines, on the data-topic.
This diagram might seem complex and overwhelming, but I will walk you step by step and explain each interaction and message exchange in sufficient detail, tier by tier. So grab a cup of joe and stay with me - I am already on my third ristretto.
Components in this tier exchange messages exclusively over the MQTT broker. These messages are then relayed to the inference and platform tier components by the mqtt-kafka protocol bridge.
Topic Name | Type | Mode | Protocol | Format | Broker |
---|---|---|---|---|---|
control-message | control | Subscribe | MQTT | JSON | MQTT Broker |
shaded-pole-motor-sensor_data | data | Publish | MQTT | JSON | MQTT Broker |
motor-anomaly-level1-inference | data | Publish | MQTT | JSON | MQTT Broker |
Name | URL | Type |
---|---|---|
Model OTA URL | https://<HOST:30007>/quantized | Consumer |
Device Activation URL | https://<HOST:30006>/confirmActivation | Consumer |
MQTT Broker | tcp://<MQTT BROKER>:30005 | Consumer |
Here are the main interactions between the IoT components in this tier and the rest of the tiers:
{ "command": "download-model", "payload": "2022-05-06-20:36:11-model.tflite" }
Filtered sensor data: The filtered sensor data is published as an MQTT message over the topic shaded-pole-motor-sensor_data. Here is an example of this message
{
"deviceID": "14333616",
"current": 26.56,
"temperature": 32.81,
"vibration": 32.81,
"sound": 32.81,
"fft_data": "true"
}
This tier has components running on two classes of devices - Coral Dev board (Linux/ARM) and ESP32S (RTOS/MCU) devices.
Topic Name | Type | Mode | Protocol | Format | Broker |
---|---|---|---|---|---|
Coral Dev Board | |||||
shaded-pole-motor-sensor_data | Data | Subscribe | Kafka Stream | JSON | Kafka Broker |
motor-failure-alert-level2-inference | Data | Publish | Kafka Stream | JSON | Kafka Broker |
control-message | Control | Subscribe | Kafka Stream | JSON | Kafka Broker |
ESP32 SoC | |||||
shaded-pole-motor-sensor_data | Data | Subscribe | MQTT | JSON | MQTT Broker |
control-message | Control | Subscribe | MQTT | JSON | MQTT Broker |
motor-failure-alert-level2-inference | Data | Publish | MQTT | JSON | MQTT Broker |
Name | URL | Type |
---|---|---|
Model OTA URL | https://<HOST:30007>/quantized | Consumer |
MQTT Broker | tcp://<MQTT BROKER>:1883 | Consumer |
Kafka Broker | tcp://<Strimzi Endpoint>:32199 | Consumer |
There are four main interactions between the ML inference components and the rest of the system:
New Model Message: The devices in this tier subscribes to the control topic control-message. On this topic, any updated TFLite model information is sent to the device as a JSON key-value pair. As an example, the device will get this message if the MLOps pipeline generates a new model. The name of the model file is in the payload. Here is a sample control message
{
"command": "download-model",
"payload": "2022-05-06-20:36:11-model.tflite"
}
The platform tier host various components service for the following services
These services are hosted on a cluster comprising one Nvidia Jetson Nano and two Raspberry Pi SBCs. All the MLOps pipelines run on this platform and interact using the following events, endpoints, and dataflow.
Topic Name | Type | Mode | Protocol | Format | Broker |
---|---|---|---|---|---|
shaded-pole-motor-sensor_data | Data | Subscribe | Kafka Stream | JSON | Kafka Broker |
control-message | Control | Subscribe | Kafka Stream | JSON | Kafka Broker |
Name | VERB | URL | Type |
---|---|---|---|
Training Datastore - Raw data | GET | https://<HOST:30007>/<fileName > | Provider |
Training Datastore - Raw data | POST | https://<HOST:30007>/upload | Provider |
Model Registry - Normalized data | POST | https://<HOST:30008>/uploadNormalizedData | Provider |
Model Registry - Normalized data | GET | https://<HOST:30008>/normalized_training_data | Provider |
Model Registry - Frozen graph | POST | https://<HOST:30008>/uploadModel | Provider |
Model Registry - Frozen graph | GET | https://<HOST:30008>/full | Provider |
Model Registry - Quantized model | POST | https://<HOST:30008>/uploadQuantizedModel | Provider |
Model Registry - Quantized model | GET | https://<HOST:30008>/quantized | Provider |
MQTT Broker | tcp://<MQTT BROKER>:30005 | Provider | |
Kafka Broker | tcp://<Strimzi Endpoint>:32199 | Provider |
The MLOps training tasks are orchestrated by Argo workflow and interact in the following sequences with the rest of the system:
Publish Control Message: The ingest μService publishes a control message extract-data that triggers the Argo workflow training pipeline. The message is in the following format.
{
"command": "extract-data",
"payload": "raw_sensor_training_data_2022-05-06T16:14:48:1651853688477.csv"
}
Upload Quantized Model: The quantize task uploads the quantized model to Model Registry - Quantized Model endpoint.
Publish Control Message: The quantize task publishes the download-model control message on the Kafka Broker. The MQTT-Kafka protocol bridge converts this message into an MQTT message and publishes it as a download-model MQTT control message. Any IoT devices in the things tier or any inference devices in the inference tier, that subscribe to this topic, will get this message. Such devices, in response to this message, will download the latest quantized model from the Model Registry - Quantized Model endpoint. Here is an example of this message
{
"command": "download-model",
"payload": "2022-05-06-20:36:11-model.tflite"
}
The application is composed of several microservices, tasks, and orchestration workflows that operate concurrently on various infrastructure tiers. LEts see how to code, containerize, build and deploy these components.
All of the platform microservices will use the following TLS certificates. The certs will be mounted in the Kubernetes pod as secret volume.
Using OpenSSL create a public/private key pair
openssl genrsa -out server.key 2048
openssl req -new -x509 -sha256 -key server.key -out server.crt -days 30
Using this pair create a kubernetes secret ssh-keys-secret
kubectl -n architectsguide2aiot create secret generic ssh-keys-secret --from-file=ssh-privatekey=server.key --from-file=ssh-publickey=server.crt
Configure the yaml file to mount the secret volume
spec:
containers:
volumeMounts:
- name: secret-volume
mountPath: /keys
volumes:
- name: secret-volume
secret:
secretName: ssh-keys-secret
Deployment Target : Platform Tier - Nvidia Jetson Nano Device
This module performs the following functions
rand.Seed(time.Now().UTC().UnixNano())
topic := lookupStringEnv("CONTROL-TOPIC", "control-message")
brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP Address>:32199")
kafka_reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
})
ctx := context.Background()
downloadUrl := lookupStringEnv("RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL", "https://localhost:8081/")
uploadUrl := lookupStringEnv("NORMALIZED_DATA_UPLOAD_REGISTRY_URL", "https://localhost:8080/uploadNormalizedData")
trainingFileName := listenForControlMessage(kafka_reader, ctx, topic)
fmt.Println(trainingFileName)
rawDataRows := downloadRawData(downloadUrl, trainingFileName)
if rawDataRows != nil {
normalizedData := normalizeData(rawDataRows)
uploadToModelRegistry(uploadUrl, normalizedData)
fmt.Println(rawDataRows[0])
}
In order to run this container on an Nvidia Jetson Nano device, which is ARM64, we need to build an ARM64 compatible image. We first cross-compile for ARM64 and then containerize it as a distroless container using the following Dockerfile
FROM golang as builder
ENV USER=appuser
ENV UID=10001
WORKDIR /app
COPY go.mod ./
RUN go mod download
COPY *.go ./
# Build the binary for ARM64
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags='-w -s -extldflags "-static"' -a -o /go/bin/extract main.go
FROM scratch
COPY --from=builder /go/bin/extract /go/bin/extract
EXPOSE 8080
ENTRYPOINT ["/go/bin/extract"]
Build the image, tag it and then pushed to the private docker registry
docker build -t extract_module .
docker tag extract_module:latest docker.<IP Address>.nip.io:5000/extract_module:latest
docker push docker.<IP Address>.nip.io:5000/extract_module:latest
The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration
- name: extract-template
inputs:
parameters:
- name: message
container:
image: docker.<IP Address>.nip.io:5000/extract_module:latest
securityContext:
privileged: true
env:
- name: NORMALIZED_DATA_UPLOAD_REGISTRY_URL
value: "https://<IP Address>:30007/uploadNormalizedData"
- name: RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL
value: "https://<IP Address>:30008/"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
nodeSelector:
kubernetes.io/hostname: "agentnode-raspi1"
Deployment Target : Platform Tier - Nvidia Jetson Nano Device
This module primarily uses the tensorflow_data_validation library along with the following libs
import tensorflow_data_validation as tfdv
import sys
import os
import urllib
import json
from kafka import KafkaProducer
This module performs the following functions:
Run various validation routines on this data to detect drift.
train_stats = tfdv.generate_statistics_from_csv(data_location=training_data_set)
schema = tfdv.infer_schema(train_stats)
anomalies = tfdv.validate_statistics(statistics=train_stats, schema=schema)
if <drift criteria> > DRIFT_THRESHOLD :
publishControlMessage(training_data_set)
If the drift exceeds a certain threshold then publish a control message train-model
def publishControlMessage(trainingFileName):
fileName = os.path.basename(trainingFileName)
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)
json_data = {"command": "train-model", "payload" : fileName}
message = json.dumps(json_data)
bytesMessage = message.encode()
producer.send(CONTROL_TOPIC, bytesMessage )
This module is containerized as a distroless container using the following Dockerfile
FROM debian:buster-slim AS build
RUN apt-get update && \
apt-get install --no-install-suggests --no-install-recommends --yes python3-venv gcc libpython3-dev && \
python3 -m venv /venv && \
/venv/bin/pip install --upgrade pip
FROM build AS build-venv
COPY requirements.txt /requirements.txt
RUN /venv/bin/pip install --disable-pip-version-check -r /requirements.txt
# distroless python image
FROM gcr.io/distroless/python3-debian10
COPY --from=build-venv /venv /venv
COPY . /app
WORKDIR /app
ENTRYPOINT ["/venv/bin/python3", "validate.py"]
The Argo DAG specification then manages this container as task in the workflow pipeline using the following configuration
- name: detect-drift-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP
Address>.nip.io:5000/validation_module:latest
env:
- name: TRAINING_DATA_URL
value:
"https://<IP
Address>:30007/normalized_training_data/"
nodeSelector:
kubernetes.io/hostname: ""
Deployment Target : Platform Tier - Nvidia Jetson Nano Device
The Train module uses the following libraries
import sklearn
import pandas
import tensorflow as tf
It's important to keep sklearn as the first import for Nvidia jetson nano and set the environment variable OPENBLAS_CORETYPE to ARMV8
And performs the following functions:
Download the training data from the Model Registry - Normalized Training Data endpoint and trains the model.
dataset = dataframe.values
X = dataset[:,0:4].astype(float)
y = dataset[:,4]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
model = Sequential()
model.add(Dense(60, input_dim=4, activation='relu'))
model.add(Dense(30, activation='relu'))
model.add(Dense(10, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=int(EPOCS), batch_size=int(BATCH_SIZE), verbose=0)
zip_file_name = "{}/{}-model.zip".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
save_dir_name = "{}/{}-savedDir".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
model.save(save_dir_name)
Uploads the frozen graph to the Model Registry server Model Registry - Frozen Graph endpoint.
upload_url = MODEL_REGISTRY_URL
test_response = requests.post(upload_url, files = {"file": model_file})
This module is containerized using the nvidia l4t-tensorflow image.
FROM nvcr.io/nvidia/l4t-tensorflow:r32.6.1-tf2.5-py3
RUN mkdir /tensorflow
WORKDIR /tensorflow
COPY train.py .
COPY loop.sh .
COPY requirements.txt .
RUN pip3 install --upgrade setuptools
RUN python3 -m pip install --upgrade pip
RUN pip3 install -r requirements.txt
ENV OPENBLAS_CORETYPE ARMV8
CMD [ "python3", "/tensorflow/train.py" ]
Build the image, tag it and then push it to the private docker registry
docker build -t training-module .
docker tag training-module:latest docker.<IP Address>.nip.io:5000/training-module:latest
docker push docker.<IP Address>.nip.io:5000/training-module:latest
The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration. The environment vars for MODEL_REGISTRY_URL, EPOCS, BATCH_SIZE are specified in this file.
- name: train-template
inputs:
parameters:
container:
image: docker.<IP Address>.nip.io:5000/training-module:latest
env:
- name: MODEL_REGISTRY_URL
value: "https://<IP Address>:30007/uploadModel"
- name: EPOCS
value: "2"
- name: BATCH_SIZE
value: "32"
nodeSelector:
kubernetes.io/hostname: "agentnode-nvidia-jetson"
Deployment Target : Platform Tier - Nvidia Jetson Nano Device
This module performs the following functions:
Uploads the quantized file to the Model Registry - Quantized Model endpoint.
quantized_file_name = "{}/{}-model.tflite".format(dir_path,datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') )
converter = tf.lite.TFLiteConverter.from_saved_model(fileName)
tflite_model = converter.convert()
open(quantized_file_name, "wb").write(tflite_model)
This module is containerized using the nvidia l4t-tensorflow image.
FROM nvcr.io/nvidia/l4t-tensorflow:r32.6.1-tf2.5-py3
RUN mkdir /tensorflow
WORKDIR /tensorflow
COPY quantize.py .
COPY loop.sh .
COPY requirements.txt .
CMD [ "python3", "/tensorflow/quantize.py" ]
Build the image, tag it and then push it to the private docker registry
docker build -t quantize-module .
docker tag quantize-module:latest docker.<IP Address>.nip.io:5000/quantize-module:latest
docker push docker.<IP Address>.nip.io:5000/quantize-module:latest
The Argo DAG specification then manages this container as a task in the workflow pipeline using the following configuration
- name: quantize-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP Address>.nip.io:5000/quantize-module:latest
securityContext:
privileged: true
env:
- name: MODEL_DOWNLOAD_REGISTRY_URL
value: "https://<IP Address>:30007/full"
- name: MODEL_UPLOAD_REGISTRY_URL
value:
"https://<IP Address>:30007/uploadQuantizedModel"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
nodeSelector:
kubernetes.io/hostname: "agentnode-nvidia-jetson"
Deployment Target: Platform Tier - Raspberry Pi 4 Device
These edge learning tasks can now be assembled as a pipeline, and the pipeline can be expressed as a Directed Acyclic Graph (DAG).
Configure the tasks as steps (nodes) and the dependencies as (edges) between them in the Argo DAG YAML file.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: kubecon-aiotdemo-dag-
spec:
entrypoint: kubecon-aiotdemo-dag
templates:
- name: kubecon-aiotdemo-dag
dag:
tasks:
- name: extract
template: extract-template
arguments:
parameters:
- name: message
value: ""
- name: detect-drift
dependencies: [extract]
template: detect-drift-template
arguments:
parameters:
- name: message
value: ""
- name: train
dependencies: [detect-drift, extract]
template: train-template
arguments:
parameters:
- name: message
value: ""
- name: quantize
dependencies: [train]
template: quantize-template
arguments:
parameters:
- name: message
value: ""
- name: extract-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP
Address>.nip.io:5000/extract_module:latest
securityContext:
privileged: true
env:
- name: NORMALIZED_DATA_UPLOAD_REGISTRY_URL
value:
"https://<IP
Address>:30007/uploadNormalizedData"
- name: GCP_BUCKET
value: "architectsguide2aiot-aiot-mlops-demo"
- name: RAW_TRAINING_DATA_DOWNLOAD_REGISTRY_URL
value: "https://<IP Address>:30008/"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
volumeMounts:
- name: secret-volume
mountPath: /keys
nodeSelector:
kubernetes.io/hostname: "agentnode-raspi1"
- name: detect-drift-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP
Address>.nip.io:5000/validation_module:latest
env:
- name: TRAINING_DATA_URL
value: "https://73.252.176.163:30007/normalized_training_data/"
nodeSelector:
kubernetes.io/hostname: ""
- name: train-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP
Address>.nip.io:5000/training-module:latest
env:
- name: MODEL_REGISTRY_URL
value: "https://<IP Address>:30007/uploadModel"
- name: EPOCS
value: "2"
- name: BATCH_SIZE
value: "32"
- name: TRAINING_DATA_URL
value:
"https://<IP
Address>:30007/normalized_training_data/"
nodeSelector:
kubernetes.io/hostname: "agentnode-nvidia-jetson"
- name: quantize-template
inputs:
parameters:
- name: message
container:
image:
docker.<IP
Address>.nip.io:5000/quantize-module:latest
securityContext:
privileged: true
env:
- name: MODEL_DOWNLOAD_REGISTRY_URL
value: "https://<IP Address>:30007/full"
- name: MODEL_UPLOAD_REGISTRY_URL
value:
"https://<IP
Address>:30007/uploadQuantizedModel"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
nodeSelector:
kubernetes.io/hostname: "agentnode-nvidia-jetson"
Use the argo CLI to deploy the learning pipeline
argo submit -n kubecon2021 --serviceaccount argo --watch ../demo_DAG.yaml
This is what I see on my console
Name: kubecon-aiotdemo-dag-2m5wz
Namespace: architectsguide2aiot
ServiceAccount: argo
Status: Succeeded
Conditions:
PodRunning False
Completed True
Created: Fri May 27 20:06:28 +0000 (7 minutes ago)
Started: Fri May 27 20:06:28 +0000 (7 minutes ago)
Finished: Fri May 27 20:13:40 +0000 (now)
Duration: 7 minutes 12 seconds
Progress: 4/4
ResourcesDuration: 12m48s*(100Mi memory),12m48s*(1 cpu)
STEP TEMPLATE PODNAME DURATION MESSAGE
✔ kubecon-aiotdemo-dag-2m5wz kubecon-aiotdemo-dag
├─✔ extract extract-template kubecon-aiotdemo-dag-2m5wz-3861752307 5m
├─✔ detect-drift detect-drift-template kubecon-aiotdemo-dag-2m5wz-2571201847 22s
├─✔ train train-template kubecon-aiotdemo-dag-2m5wz-2497119492 31s
└─✔ quantize quantize-template kubecon-aiotdemo-dag-2m5wz-1935646649 28s
You can also see the status and be able to monitor and manage the workflow using the Argo Dashboard. Open the Argo console in your browser (make sure to follow the installation instructions from the previous post
Deployment Target : Platform Tier - Raspberry Pi 4 Device
The ingest microservice performs the following functions:
This module uses the segmentio kafka go client to connect to the kafka broker.
msg, err := kafka_reader.ReadMessage(ctx)
var rawSensorData RawSensorData
json.Unmarshal([]byte(string(msg.Value)), &rawSensorData)
t := time.Now()
if rawSensorData.TimeStamp == "" {
rawSensorData.TimeStamp = fmt.Sprintf("%02d",t.UnixNano()/int64(time.Millisecond))
}
if rawSensorData.DeviceID != "" {
if counter == 0 { // create new file and write header
f, err := os.Create(fileName)
testDataFile = f
if err != nil {
panic(err)
}
testDataFile.WriteString("deviceID,timeStamp,current,temperature,vibration,sound\n")
}
testDataFile.WriteString(fmt.Sprintf("%s,%s,%.1f,%.1f,%.1f,%.1f\n", rawSensorData.DeviceID, rawSensorData.TimeStamp, rawSensorData.Current, rawSensorData.Temperature, rawSensorData.Vibration, rawSensorData.Sound))
counter = counter + 1
if maxRows == counter { // upload the file
counter = 0 //reset
testDataFile.Close()
uploadFileToModelRegistry(fileName, upload_url)
publishControlMessage(fileName, kafka_writer, ctx)
}
}
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
gest_service .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/ingest_service:latest
docker push docker.<IP Address>.nip.io:5000/ingest_service:latest
The kubernetes.yaml file is configured to set the image name and the env vars as per the operating environment. The node selector label is set to run this service on one of the Raspberry Pi devices.
---
containers:
- name: ingest-microservice
image:
docker.<IP Address>.nip.io:5000/ingest_service:latest
env:
- name: TRAINING_DATA_UPLOAD_REGISTRY_URL
value: "https://<IP Address>:30008/upload"
- name: MAX_ROWS
value: "3000"
- name: DATA-TOPIC
value: "shaded-pole-motor-sensor_data"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
nodeSelector:
kubernetes.io/hostname: "agentnode-raspi1"
Deployment Target : Platform Tier - Raspberry Pi 4 Device
The model registry services are exposed as the following REST endpoints:
Name | VERB | URL |
---|---|---|
Model Registry - Frozen Graph | POST | https://<HOST:30008>/uploadModel |
Model Registry - Frozen Graph | GET | https://<HOST:30008>/full |
Model Registry - Quantized Model | POST | https://<HOST:30008>/uploadQuantizedModel |
Model Registry - Quantized Model | GET | https://<HOST:30008>/quantized |
Model Registry - Normalized Training Data | POST | https://<HOST:30008>/uploadNormalizedData |
Model Registry - Normalized Training Data | GET | https://<HOST:30008>/normalized_training_data |
This Golang module uses handlers and Servemuxes from the http package.
mux := http.NewServeMux()
os.MkdirAll("./model_store/full", os.ModePerm)
os.MkdirAll("./model_store/quantized", os.ModePerm)
os.MkdirAll("./model_store/normalized_training_data", os.ModePerm)
os.MkdirAll("./model_store/OTA_bin", os.ModePerm)
mux.HandleFunc("/uploadModel", uploadModelHandler)
mux.HandleFunc("/uploadQuantizedModel", uploadModelQuantizedHandler)
mux.HandleFunc("/uploadNormalizedData", uploadTrainingDataHandler)
fileServerHtml := http.FileServer(http.Dir("model_store"))
mux.Handle("/", fileServerHtml)
log.Printf("Serving Model Registry on port: %s\n", port)
if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
log.Fatal(err)
}
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
docker build -t model-registry .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/model-registry:latest
docker push docker.<IP Address>.nip.io:5000/model-registry:latest
The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30007 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.
image: docker.<IP Address>.nip.io:5000/model-registry:latest
spec:
type: NodePort
selector:
app: model-registry
ports:
- protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30007
Deployment Target : Platform Tier - Raspberry Pi 4 Device
The training datastore services are exposed as the following REST endpoints:
Name | VERB | URL |
---|---|---|
Training Datastore - Raw data | GET | https://<HOST:30008>/<fileName > |
Training Datastore - Raw data | POST | https://<HOST:30008>/upload |
This Golang module uses handlers and Servemuxes from the http package.
mux := http.NewServeMux()
mux.HandleFunc("/upload", uploadTrainingDataHandler)
fileServerHtml := http.FileServer(http.Dir("/data/raw_training_data"))
mux.Handle("/", fileServerHtml)
port := lookupStringEnv("PORT" , "8081")
log.Printf("Serving TrainingDataStore service on port: %s\n", port)
if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
log.Fatal(err)
}
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
docker build -t training-datastore .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/training-datastore:latest
docker push docker.<IP Address>.nip.io:5000/training-datastore:latest
The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30008 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.
image:
docker.<IP Address>.nip.io:5000/training-datastore:latest
spec:
type: NodePort
selector:
app: model-registry
ports:
- protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30008
Deployment Target : Platform Tier - Raspberry Pi 4 Device
The device registry service provides services to provision new IoT devices and validates their activation:
Name | VERB | URL |
---|---|---|
Device Registry - Provision Device | POST | https://<IP Address>:30006/provisionDevice |
Device Registry - Confirm Activation | POST | https://<IP Address>:30006/confirmActivation |
This Golang module uses handlers and Servemuxes from the http package.
mux := http.NewServeMux()
mux.HandleFunc("/confirmActivation", confirmActivation)
mux.HandleFunc("/provisionDevice", provisionDevice)
fileServerHtml := http.FileServer(http.Dir("/data/deviceRegistry"))
mux.Handle("/", fileServerHtml)
port := lookupStringEnv("PORT", "8082")
log.Printf("Serving device Registry on port: %s\n", port)
if err := http.ListenAndServeTLS(":"+port, "/keys/ssh-publickey", "/keys/ssh-privatekey", mux); err != nil {
log.Fatal(err)
}
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
docker build -t device-registry .
docker tag model-registry:latest docker.<IP Address>.nip.io:5000/device-registry:latest
docker push docker.<IP Address>.nip.io:5000/device-registry:latest
The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30006 using a nodeport. Specify TLS certs volume mount correctly as shown in this section.
image: docker.<IP Address>.nip.io:5000/device-registry:latest
.
.
.
apiVersion: v1
kind: Service
metadata:
name: device-registry-service
spec:
type: NodePort
selector:
app: device-registry
ports:
- protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30006
Deployment Target : Platform Tier - Raspberry Pi 4 Device
This microservice provides lightweight and high performance MQTT broker services.
The implementation is based on an embedded open-source go MQTT broker .
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
docker build -t go_amr64_mqtt_broker .
docker tag go_amr64_mqtt_broker:latest docker.<IP Address>.nip.io:5000/go_amr64_mqtt_broker:latest
docker push docker.<IP Address>.nip.io:5000/go_amr64_mqtt_broker:latest
The kubernetes.yaml file is configured to set the image name to the correct location and publish this app as a Kubernetes service to an external IP Address at port 30005 using a nodeport.
---
spec:
containers:
- name: broker-container
image:
docker.<IP
Adress>.nip.io:5000/go_amr64_mqtt_broker:latest
ports:
- containerPort: 1883
nodeSelector:
kubernetes.io/hostname: ""
---
apiVersion: v1
kind: Service
metadata:
name: mqtt-broker-service
spec:
type: NodePort
selector:
app: mqtt-broker
ports:
- protocol: TCP
port: 1883
targetPort: 1883
nodePort: 30005
The inference module running on the Coral Edge devkit
Deployment Target : Platform Tier - Raspberry Pi 4 Device
This microservice bridges the MQTT messages with Kafka streams.
This module uses the Sarama as the client library for Apache Kafka and Eclipse Paho as the client library for MQTT. We first need to set up and connect to the kafka broker as a publisher
kafka_topic := lookupStringEnv("KAFKA-TOPIC", "shaded-pole-motor-sensor_data")
kafka_brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP ADdress>:32199")
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
producerConfig := sarama.NewConfig()
producerConfig.Producer.RequiredAcks = sarama.RequiredAcks(int16(1))
producerConfig.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{kafka_brokerAddress}, producerConfig)
end := make(chan int, 1)
Now we need to set up an MQTT subscription handler. The handler uses go channels to asynchronously wait for the MQTT message. When it receives an MQTT message it calls the Kafka publisher handler "publishMqttMessageToKafka" which gets passed to it as an anonymous function.
func startMqttSubscriber(opts *MQTT.ClientOptions, publishMqttMessageToKafka func(string)) {
qos := 0
mqtt_topic := lookupStringEnv("MQTT-TOPIC", "shaded-pole-motor-sensor_data")
choke := make(chan [2]string)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
choke <- [2]string{msg.Topic(), string(msg.Payload())}
})
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := client.Subscribe(mqtt_topic, byte(qos), nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for {
incoming := <-choke
publishMqttMessageToKafka(incoming[1])
}
client.Disconnect(250)
}
Using this MQTT handler a closure is set up with the body of the anonymous function "publishMqttMessageToKafka" and publishes the kfka message
startMqttSubscriber(mqtt_opts, func(messageVal string) {
msg := &sarama.ProducerMessage{
Topic: kafka_topic,
Value: sarama.StringEncoder(messageVal),
}
producer.SendMessage(msg)
Conversely, when this service gets a Kafka message, it publishes this message as an MQTT message.
mqtt_client := MQTT.NewClient(opts)
if token := mqtt_client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
ctx := context.Background()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafka_broker},
Topic: kafka_topic,
})
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
panic("could not read message " + err.Error())
}
retry(100, 4, func() error {
fmt.Println("Sample Publisher Started")
token := mqtt_client.Publish(mqtt_pub_topic, byte(qos), false, string(msg.Value))
token.Wait()
return nil
})
}
Follow the steps in this section for an ARM64 compatible Dockerfile. Build the image, tag it and then push it to the private docker registry
docker build -t protocol_bridge .
docker tag protocol_bridge:latest docker.<IP Address>.nip.io:5000/protocol_bridge:latest
docker push docker.<IP Address>.nip.io:5000/protocol_bridge:latest
The kubernetes.yaml file is configured to set the image name to the correct location and the env vars to the appropriate MQTT and Kafka settings
apiVersion: apps/v1
kind: Deployment
metadata:
name: protocol-bridge-deployment
spec:
selector:
matchLabels:
app: protocol-bridge
replicas: 1
template:
metadata:
labels:
app: protocol-bridge
spec:
containers:
- name: protocol-bridge
image:
docker.<IP
Address>.nip.io:5000/protocol_bridge:latest
env:
- name: MQTT-BROKER
value: "tcp://<IP Address>:30005"
- name: MQTT-ID
value: "architectsguide2aiot_mqtt-id"
- name: DATA-TOPIC
value: "shaded-pole-motor-sensor_data"
- name: CONTROL-TOPIC
value: "control-message"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
Deployment Target : Inference Tier - Coral Edge TPU Devices
The inference module running on the Coral Edge devkit cluster is built using the PyCoral API. This module gets events streams from the Kafka broker via a streaming API sidecar.
Import the following PyCoral adapters
from pycoral.adapters import classify
from pycoral.adapters import common
from pycoral.utils.dataset import read_label_file
from pycoral.utils.edgetpu import make_interpreter
While testing on hardware that does not have a TPU accelerator, use the TF Lite imports and comment out the pycoral imports.
import numpy as np import tensorflow as tf import numpy as np
Subscribe to the control message and in response to the "download-model" message, get the latest TF Lite quantized model file from the Model Registry and use the Interpreter to load it
# use this for testing on non TPU h/w
# interpreter = tf.lite.Interpreter(model_path= dir_path + '/' + latest_model_file_name)
interpreter = make_interpreter(dir_path + '/' + latest_model_file_name)
Setup the input data, invoke the inference, and get the output
interpreter.allocate_tensors()
sensor_data = json.loads(msg, object_hook=lambda d: SimpleNamespace(**d))
sensor_data_arr = [np.float32(sensor_data.current), np.float32(sensor_data.temperature), np.float32(sensor_data.vibration), np.float32(sensor_data.sound)]
np_arr_64 = np.array(sensor_data_arr)
np_arr_f32 = np_arr_64.astype(np.float32)
inp_details = interpreter.get_input_details()
out_details = interpreter.get_output_details()
interpreter.set_tensor(inp_details[0]['index'], [sensor_data_arr])
interpreter.invoke()
output_details = interpreter.get_output_details()
predictions = interpreter.get_tensor(output_details[0]['index'])
output_index = interpreter.get_output_details()[0]["index"]
ten = interpreter.get_tensor(output_index)
inference_val = float(('%f' % ten))
The input data comes in a JSON object over a TCP/IP socket from a streaming API sidecar. The inference module setups a TCP server endpoint
HOST = '127.0.0.1' # Standard loopback interface address (localhost)
PORT = int(SIDECAR_PORT) # Port to listen on (non-privileged ports are > 1023)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
while True:
try:
conn, addr = s.accept()
with conn:
logToFile('Connected by ' + addr[0] )
while True:
data = conn.recv(1024).decode()
logToFile("Received message from go client{}".format(data))
if not data:
break
ret = infer(interpreter, data)
strRet = "inference value from the edge tpu = {}".format(ret)
conn.sendall(strRet.encode())
except Exception as inst:
print(inst)
This Golang module uses Segment Kafka-go package to connect to the broker. It subscribes to the Kafka topic and waits synchronously to receive the messages.
connHost := "127.0.0.1"
connPort := lookupStringEnv("PORT", "9898")
topic := lookupStringEnv("TOPIC", "my-topic")
brokerAddress := lookupStringEnv("KAFKA-BROKER", "<IP Address>:32199")
ctx := context.Background()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic
})
msg, err := r.ReadMessage(ctx)
A retry loop function is setup with an gradual backoff interval, to ensure guaranteed message delivery
// Return the original error for later checking
return s.error
}
if attempts--; attempts > 0 {
time.Sleep(sleep * time.Second)
return retry(attempts, 1*sleep, fn)
}
return err
}
return nil
}
This function in then used in a retry closure that sends the received messages as JSON objects to the Inference module over a the TCP/IP endpoint exposed by the inference module.
retry(100, 4, func() error {
var socketConnection net.Conn
socketConnection, err := net.Dial(connType, connHost+":"+connPort)
buff := []byte(msg.Value)
_, e := socketConnection.Write([]byte(buff))
buff2 := make([]byte, 1024)
n, _ := socketConnection.Read(buff2)
log.Printf("Receive: %s", buff2[:n])
socketConnection.Close()
return nil
})
The inference module is containerized using a debian:buster image.
FROM debian:buster
RUN mkdir /coral
WORKDIR /coral
RUN apt update && \
apt-get install curl gnupg ca-certificates -y
RUN echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | tee /etc/apt/sources.list.d/coral-edgetpu.list
RUN echo "deb https://packages.cloud.google.com/apt coral-cloud-stable main" | tee /etc/apt/sources.list.d/coral-cloud.list
RUN curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
RUN apt-get update
RUN apt-get update && \
apt-get install python3 python3-pip -y
RUN apt-get install python3-pycoral -y
COPY loop.sh ./
COPY infer_tflite_socket.py ./
COPY requirements.txt ./
RUN pip3 install -r requirements.txt
EXPOSE 9898
CMD [ "python3", "/coral/infer_tflite_socket.py" ]
In order to run this container on a coral TPU device, which is ARM64, we need to build an ARM64 compatible image. We do this by using docker buildx and following these steps
Tag the image and push it to the private docker registry
docker buildx build -t asheeshgoja/edge-tpu-inference-engine:latest --platform linux/arm64 --push .
docker pull docker.io/asheeshgoja/edge-tpu-inference-engine:latest
docker tag docker.io/asheeshgoja/edge-tpu-inference-engine:latest docker.<IP Address>.nip.io:5000/edge-tpu-inference-engine:latest
docker push docker.<IP Address>.nip.io:5000/edge-tpu-inference-engine:latest
This module is cross-compiled for ARM64 and then containerize it as a distroless container using the following two-stage docker build
# Stage 1
FROM golang
ENV USER=appuser
ENV UID=10001
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./
# Build the binary
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags='-w -s -extldflags "-static"' -a -o /go/bin/golang_kafka_consumer main.go
# Stage 2
FROM scratch
COPY --from=builder /go/bin/golang_kafka_consumer /go/bin/golang_kafka_consumer
ENTRYPOINT ["/go/bin/golang_kafka_consumer"]
Build the image, tag it and then push it to the private docker registry
docker build -t golang-api-sidecar -f Dockerfile .
docker tag golang-api-sidecar:latest docker.<IP Address>.nip.io:5000/golang-api-sidecar:latest
docker push docker.<IP Address>.nip.io:5000/golang-api-sidecar:latest
The inference module and the streaming API sidecar get deployed as co-containers on the same pod. They share a common file mount and communicate over a local TCP/IP link.
---
containers:
- name: edge-tpu-inference-engine
image:
docker.<IP
Address>.nip.io:5000/edge-tpu-inference-engine:latest
securityContext:
privileged: true
ports:
- containerPort: 9898
env:
- name: STREAM_GRP_ID
value: work-load-B
- name: SIDECAR_PORT
value: "9898"
- name: MODEL_REGISTRY_URL
value: "https://<IP Address>:30007/quantized"
- name: golang-api-sidecar
image:
docker.<IP
Address>.nip.io:5000/golang-api-sidecar:latest
ports:
- containerPort: 9898
securityContext:
privileged: true
env:
- name: STREAM_GRP_ID
value: "work-load-A"
- name: PORT
value: "9898"
- name: TOPIC
value: "shaded-pole-motor-sensor_data"
- name: KAFKA-BROKER
value: "<IP Address>:32199"
nodeSelector:
tpuAccelerator: "true"
The tpuAccelerator: "true" label ensures that this pod gets scheduled only on TPU accelerated hardware.
Deployment Target : Things Tier - ESP32 SoC
The IDE used to build the firmware is VSCode with the PlatformIO extension. Install this extension and then open the IDE from the iot-gateway-firmware folder. Set the board, ports, dependencies, and the baud correctly by using the following configuration in the platformio.ini file.
[env:esp32dev]
platform = espressif32
board = esp32dev
framework = arduino
upload_port = /dev/cu.SLAB_USBtoUART
monitor_port = /dev/cu.SLAB_USBtoUART
monitor_speed = 115200
lib_deps =
knolleary/PubSubClient@^2.8.0
openenergymonitor/EmonLib@^1.1.0
bblanchon/ArduinoJson@^6.18.5
Copy the contents of the server.crt created in the previous section into a static char array and name it reg_svr_pub_key. We will subsequently use this key in the HTTPClient operations.
const char* reg_svr_pub_key= \
"-----BEGIN CERTIFICATE-----\n" \
"MIICSDCCAc6gAwIBAgIUDXRzo8SpZJeJqZmgNP1BpyllvHkwCgYIKoZIzj0EAwIw\n" \
.
.
.
"-----END CERTIFICATE-----\n" ;
The inference module for the ESP32 MCU is built using the TensorFlow Lite for Microcontrollers C++ library. You need to first download and include the TFLM C++ libraries into your PlatformIO project under the lib folder. Your lib structure should look like this.
Include the TFLM header files in your module
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "tensorflow/lite/version.h"
Download the latest TFLite file from the Model Registry endpoint ( in response to the control message download-model)
int downloadTFLiteModel(uint8_t **tfLiteModel, const char *tfLiteFileName)
{
char tfliteFileURL[255] = {}; // the file name in the control message payload
snprintf(tfliteFileURL, 255, "%s%s", modelRegistry, tfLiteFileName);
pHTTPClient->begin(tfliteFileURL, reg_svr_pub_key);
int httpResponseCode = pHTTPClient->GET();
if (httpResponseCode == 200)
{
int len = 11148;
*tfLiteModel = (byte *)malloc(len);
pHTTPClient->getStream().readBytes(*tfLiteModel, len);
return len;
}
}
The tfLiteModel array is the TFLite quantized model and can now be used to perform the inferences. Map and load the model and its tensors
tflite::ErrorReporter *error_reporter;
const tflite::Model *model;
tflite::MicroInterpreter *interpreter;
TfLiteTensor *input;
TfLiteTensor *output;
int inference_count;
HTTPClient *pHTTPClient;
char modelRegistry[255];
model = tflite::GetModel(tfLiteModel);
static tflite::AllOpsResolver resolver;
static tflite::MicroInterpreter static_interpreter(model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
interpreter = &static_interpreter;
// Allocate memory from the tensor_arena for the model's tensors.
TfLiteStatus allocate_status = interpreter->AllocateTensors();
// Obtain pointers to the model's input and output tensors.
input = interpreter->input(0);
output = interpreter->output(0);
The module is now ready to make inferences
input->data.f[0] = current;
input->data.f[1] = temperature;
input->data.f[2] = vibration;
input->data.f[3] = sound;
interpreter->Invoke();
return output->data.f[0]; // this is the inference val used to
determine is the motor if fau
Add the PubSubClient@^2.8.0 library to your project. Then include the following header files
nclude <PubSubClient.h>
#include <ArduinoJson.h>lty
Set up a callback handler for download-model messages on the control topic. Call the TFLM_Module::setNewModelFileName to download and map the new TFLite model.
char buf[255] = "";
String jsonMessage(buf);
StaticJsonDocument<255> jsonBuffer;
DeserializationError error = deserializeJson(jsonBuffer, jsonMessage);
const char *command = jsonBuffer["command"];
const char *cmd_payload = jsonBuffer["payload"];
if (strcmp(command, "download-model") == 0)
{
TFLM_Module::setNewModelFileName(cmd_payload);
}
Get the Arduino FFT library and copy it to the lib folder. Add the following header file to your module and set the sample size and frequency to these values
#include "arduinoFFT.h"
const double signalFrequency = 1000;
const double samplingFrequency = 13095; // find using the function getMaxSamplingFrequency
Find the max sampling frequency of your device by using this code
long newTime = micros();
for (int i = 0; i < 1000000; i++)
{
analogRead(ANY_ANALOG_PIN);
}
float t = (micros() - newTime) / 1000000.0;
long samplingFrequency = (1.0 / t) * 1000000;
Record the sensor data and apply the FFT transform to get the major peak
ptrFFT->Windowing(vReal, MAX_SAMPLE_SIZE, FFT_WIN_TYP_HAMMING, FFT_FORWARD); /* Weigh data */
ptrFFT->Compute(vReal, vImag, MAX_SAMPLE_SIZE, FFT_FORWARD); /* Compute FFT */
ptrFFT->ComplexToMagnitude(vReal, vImag, MAX_SAMPLE_SIZE); /* Compute magnitudes */;
majorPeakParabola(vReal);
Use the chip ID of the ESP32S SoC as the device key.
uint32_t Activation_Module::getChipID()
{
uint32_t c_id = 0;
for (int i = 0; i < 17; i = i + 8)
{
c_id |= ((ESP.getEfuseMac() >> (40 - i)) & 0xff) << i;
}
return c_id;
}
Pass this key to the REST Device Registry endpoint https://<IP Address>:30006/confirmActivation and confirm activation
int Activation_Module::isDeviceActivated()
{
uint32_t chipID = getChipID();
pHTTPClient->begin(activationServer.c_str(), reg_svr_pub_key);
pHTTPClient->addHeader("Content-Type", "application/x-www-form-urlencoded");
char payload[255] = {};
snprintf(payload, 255, "%s=%d", "device_id", chipID);
int httpResponseCode = pHTTPClient->POST(payload);
if (httpResponseCode == 200)
{
String activationCode = pHTTPClient->getString();
pHTTPClient->end();
return strcmp(activationCode.c_str() , "TRUE") == 0 ? 1 : 0;
}
else
{
pHTTPClient->end();
return 0;
}
return 0;
}
Get the Esp32 Servo library and copy it to the lib folder. Add the header file to your module and use the Servo class to control the motor.
Servo_Module::Servo_Module(int pin)
{
pHydraulic_valve_servo = new Servo();
pHydraulic_valve_servo->attach(pin);
}
void Servo_Module::turnValveOn()
{
for (int posDegrees = 0; posDegrees <= 180; posDegrees++)
{
pHydraulic_valve_servo->write(posDegrees);
delay(20);
}
}
void Servo_Module::turnValveOff()
{
for (int posDegrees = 180; posDegrees >= 0; posDegrees--)
{
pHydraulic_valve_servo->write(posDegrees);
delay(20);
}
}
Include all the libs and custom modules and the main file
#include "tflm_module.h"
#include "motor_sensors.h"
#include "mqtt_module.h"
#include "fft_module.h"
#include "servo_module.h"
#include "aggregation.h"
#include "activation_module.h"
//open source libs
HTTPClient http_client_activation , http_client_model;
WiFiClient espClient;
//custome modules
TFLM_Module tflm_module;
Sensors_Module sensors_module;
Mqtt_Module mqtt_module;
Fft_Module fft_module;
Servo_Module hydraulicValveController(SERVO_PIN);
Aggregation_Module aggregationModule;
Activation_Module activationModule;
Initialize the modules and confirm device activation
void setup()
{
Serial.begin(115200);
pinMode(LED_BUILTIN, OUTPUT);
setupWifi();
activationModule.init(&http_client_activation, deviceRegistry);
chipId = activationModule.getChipID();
isValidDevice = activationModule.isDeviceActivated();
Serial.printf("Device Activation code from server = %d\n", isValidDevice);
if (0 == isValidDevice)
{
Serial.printf("Device Activation failed!!\n");
}
tflm_module.init(&http_client_model, modelRegistry);
sensors_module.init(TMP_PIN, CURRENT_PIN, VIBRATION_PIN, SOUND_PIN);
mqtt_module.init(espClient, broker, broker_port, commandTopic, dataTopic, String(chipId));
hydraulicValveController.turnValveOff();
}
In the super loop get the sensor data, apply the FFT filter and then the TFLM inference. If the inference value is above a certain threshold then turn on the servo controlling the hydraulic valve.
void loop()
{
if (isValidDevice == 0)
{
Serial.printf("Device not registered, retrying ... !\n");
delay(3000);
isValidDevice = activationModule.isDeviceActivated();
return;
}
mqtt_module.reconnect();
float tempVal, vibrationVal, soundVal, currentVal;
tempVal = vibrationVal = soundVal = currentVal = 0;
int maxSampleSize = fft_module.getMaxSampleSize();
int samplingPeriod = fft_module.getSamplingPeriod();
long sampleStartMicros = 0;
for (int i = 0; i < maxSampleSize; i++)
{
sampleStartMicros = micros();
aggregationModule.aggregateData(sensors_module, currentVal, tempVal, vibrationVal, soundVal);
fft_module.recordSample(currentVal, tempVal, vibrationVal, soundVal);
while ((micros() - sampleStartMicros) < samplingPeriod)
{ /* spin */
}
}
float temp_fft, current_fft, vibr_fft, sound_fft;
temp_fft = current_fft = vibr_fft = sound_fft = 0;
fft_module.perform_fft(¤t_fft, &temp_fft, &vibr_fft, &sound_fft);
char sensor_data_fft[255];
snprintf(sensor_data_fft, 255, "{\"deviceID\": \"%d\", \"current\": %.2f, \"temperature\": %.2f, \"vibration\": %.2f, \"sound\": %.2f, \"fft_data\": \"true\"}",
chipId, current_fft, temp_fft, vibr_fft, sound_fft);
mqtt_module.publish(sensor_data_fft);
fft_module.resetSampleCounter();
float inference_val = tflm_module.predict(current_fft, temp_fft, vibr_fft, sound_fft);
if (inference_val > 8.0)
{
hydraulicValveController.turnValveOn();
}
}
Connect the ESP32 devkit
Use the Upload and Build option to flash the firmware. You may need to toggle the reset button to set the device in flash mode. After the flashing is complete and the device starts, this is what I see on my serial monitor
{"deviceID": "14339204", "current": 567.15, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 549.70, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 541.80, "temperature": 28.30, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 549.33, "temperature": 27.91, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 529.03, "temperature": 28.40, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 508.29, "temperature": 29.77, "vibration": 0.00, "sound": 20971516.00}
{"deviceID": "14339204", "current": 1269.85, "temperature": 77.81, "vibration": 850.28, "sound": 635.15, "fft_data": "true"}
Publishing data success
TFLM inference val : 0.53, data : Current: 1269.85, Temp: 77.81, Vibration: 850.28, Sound: 635.15
Publishing data success
The IoT sensors sense the vibration, sound, temperature, and current on the motor. These sensors are placed on and around the motor.
Component | Part Name |
---|---|
MCU | ESP32S SoC |
Current Sensor | CT 013 |
Sound Sensor | MAX 4465 |
Vibration Sensor | Ceramic Piezo Vibration Sensor |
Induction Motor | Shaded Pole C-Frame Motor |
Wire the sensors to the ESP32 GPIO analog ports
The source code for all the modules are in this git repo.
This post concludes the three part series on how to architect, build and deploy AIoT applications. I hope this series equips you with the principles, patterns, best practices, and tools necessary to venture into the universe of Edge AI and build "real-world" AIoT applications.
See ya on the Edge!
orchestrating-edge-learning-tasks---argo-dagsingest-μservicestreaming-api-sidecar
Get emerging insights on innovative technology straight to your inbox.
Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.
The Shift is Outshift’s exclusive newsletter.
The latest news and updates on cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.