上一篇简单OTA系统设计与实现 | 四叶草の博客里我们聊了这套 OTA 系统的“现在”。如果说上一篇主要是在解释“它为什么长这样”,那么这一篇就是要把他从一个玩具变为一个真正的工程。作为一个在实验室里临时搭建的系统,能在内网跑通只是第一步,要让它真正成为一个健壮的工程,笔者还有很长的路要走。


1. 任务队列

register/registerServer.py 的代码中,我们采用了递归的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def updateNext():
if (len(dM.updateList) > 0):
# ...
status = dM.update(updatePackage) # 执行更新分发
if (not status): # 失败分支
# ... 记录日志 ...
dM.updateList.pop(0) # 移除失败任务
updateNext() # 递归调用,继续处理下一个 <--- 这里有递归
return
dM.updateList.pop(0) # 成功分支,移除成功任务
updateNext()
else:
# 队列处理完毕
dM.isUpdating = False

问题深度剖析

采用递归方式来处理任务队列,在工程上是不可接受的。这意味着我们把一个后台任务强行和 Web 请求绑在了一起。

  1. 栈溢出风险 (Stack Overflow)

    假设你需要给 2000 台设备推送更新。递归调用的原理是:函数 A 调用函数 A’,A 必须等待 A’ 执行完才能结束;A’ 又调用 A’’… 这就像一层层叠罗汉。Python 解释器为了防止内存被耗尽,默认设置了最大递归深度(通常是 1000 层)。当你处理到第 1001 台设备时,程序会直接抛出 RecursionError 并崩溃。这会导致整个服务停止运行,剩下的 999 台设备无法收到更新。

  2. 主线程阻塞 (Main Thread Blocking)

    假设每台设备的网络连接和发送请求平均耗时 2 秒。你要更新 50 台设备。updateNext 是在 Flask 的主线程中同步执行的。这意味着,当管理员点击“开始更新”按钮后,网页会一直转圈加载,通过 HTTP 的连接会保持打开状态长达 100 秒(50 * 2)。在此期间,服务器将无法响应任何其他请求(如其他设备上报的心跳、管理员刷新页面),直到所有更新发完为止。如果在这个过程中浏览器因为超时断开了连接,可能会不仅导致用户体验极差,还可能导致 Web 服务器进程挂死。


1.1 什么是 Celery?

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统。它专注于实时处理的任务队列,同时也支持任务调度。在我们的场景中,它将扮演“调度指挥官”的角色。

1.2 核心原理

Celery 的工作流非常适合我们的 OTA 分发场景:

  • Producer (Flask Server): 当管理员点击“更新”时,Flask 不再亲自去连接设备(这是耗时操作),而是生成一条消息:“给 ID=1001 的设备发送更新包”,然后把这条消息“丢”进 Broker(中间件),随即立马给前端返回成功。
  • Broker (使用 Redis): 它像一个邮局,暂存这些待处理的消息。
  • Consumer (Worker): 这是后台运行的独立进程,它时刻盯着 Broker。一旦有新消息,就取出来,在后台独立线程中慢慢执行耗时的网络请求。

这种架构实现了 “发送”与“执行”的完全解耦

1.3 代码重构实战

我们将分三步走,将原本脆弱的递归调度替换为 Celery 异步任务。

Step 1: 实例化 Celery (register/celery_worker.py)

1
2
3
4
5
6
7
8
9
from celery import Celery

def make_celery(app_name=__name__):
# 配置 Celery 使用 Redis 作为 Broker (消息队) 和 Backend (结果存储)
return Celery(app_name,
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')

celery = make_celery()

Step 2: 定义异步任务 (register/tasks.py)

将原来的 dM.update 业务逻辑包装进 Celery Task:

1
2
3
4
5
6
7
8
from celery_worker import celery
import deviceManager as dM

@celery.task # <--- 这个装饰器赋予了函数被“异步投递”的能力
def send_update_task(update_package):
# 这里执行耗时的网络请求,阻塞也不会影响 Web 服务
status = dM.update(update_package)
return status

Step 3: 在 Flask 中调用 (register/registerServer.py)

删除那个该死的 updateNext 递归函数,改为直接调用 .delay()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from tasks import send_update_task

@app.route("/update", methods=["POST"])
def update():
# ... 解析参数 ...
for item in updatelist:
package_info = {
"id": item["id"],
"package": item["packages"][0]["package"],
# ... 构造参数 ...
}

# [旧代码]: dM.updateList.append(package_info); updateNext()

# [新代码]: 直接丢给 Celery,瞬间完成,不阻塞主线程
send_update_task.delay(package_info)

return json.dumps({"status": "queued", "msg": "更新任务已在后台排队"})

通过这套系统,我们还获得了极高的扩展性——如果设备太多,只需要多启动几个 Celery Worker 进程即可实现并行分发。


2. 数据库连接池

server/versionManager.pyregister/deviceManager.py 中,几乎每一个函数是这样写的:

1
2
3
4
def some_function():
db = pymysql.connect(...) # 建立TCP连接,认证,握手
# 执行业务...
db.close() # 断开连接

性能瓶颈:
MySQL 的连接建立是昂贵的(TCP 三次握手 + 认证交互)。在高频心跳 (/heartbeat) 场景下,每秒几百次的连接创建/销毁会迅速耗尽数据库服务器的资源,导致 Too many connections 错误。

解决方案:数据库连接池 (Connection Pooling)

2.1 连接池原理

在目前的逻辑中,每次执行 SQL(例如接收心跳包)都会经历以下“漫长”的过程:

  1. 客户端发起 TCP 连接(三次握手)。
  2. MySQL 验证用户名密码(认证交互)。
  3. 执行 SQL(这才是唯一的正事,可能只需要 1ms)。
  4. 关闭连接(四次挥手)。

这就像**“为了喝一口水,专门打了一口井,喝完就把井填了”**。在高并发心跳场景下,这种开销是毁灭性的,会导致数据库 CPU 飙升,甚至报出 Too many connections 错误。

连接池的原理非常简单:它在系统启动时预先建立好一批连接(例如 5 个)放在那里。

  • 当函数需要查库时,找池子一个连接。
  • 用完后,不关闭,而是把连接回池子里。
  • 这样,昂贵的“握手”和“挥手”开销就被省去了,剩下的全是纯粹的业务执行。

2.2 引入 DBUtils

我们可以使用 DBUtils 这个库来为现有的 pymysql 加个“外挂”。

Step 1: 初始化全局连接池

server/database_pool.py 中创建一池子连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from dbutils.pooled_db import PooledDB
import pymysql

# 初始化连接池
POOL = PooledDB(
creator=pymysql, # 使用的底层驱动
maxconnections=10, # 连接池允许的最大连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待
host='localhost',
port=3306,
user='root',
# ... 其他配置 ...
)

Step 2: 修改业务代码调用

server/versionManager.py 中,不再自己 connect,而是找 POOL 要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from database_pool import POOL

def getMaxVersion(package: str):
# [旧代码]: db = pymysql.connect(...)

# [新代码]: 从池中获取连接
db = POOL.connection()

try:
cursor = db.cursor()
cursor.execute("SELECT ...")
# ...
finally:
# [重要]: 这里的 close 不是真的关闭 TCP 连接
# 而是把连接“归还”给 POOL,供下一次使用
db.close()

通过这几行改动,数据库操作的 QPS (Queries Per Second) 承载能力通常能提升一个数量级。


3. 微服务化

目前的系统架构耦合度依然较高。Client 的 daemon.py 手动管理着子进程,Register Server 同时承担了 Web 展示和心跳处理,这导致了“不仅代码难管,运维也难管”的现状。

3.1 容器化

问题:
每次部署 Client、Register 或 Server 时,都需要手动安装一堆 Python 依赖(requests, pymysql…),环境不一致(Dev 环境是 Python 3.8,Prod 环境是 3.9)经常导致莫名其妙的报错。这就好比“你把代码给了别人,却忘了给运行代码的电脑”。

解决方案:Docker:
我们将为每个服务编写 Dockerfile。容器化后,每个组件都自带运行环境,“一次构建,到处运行”。

Dockerfile 示例 (client/Dockerfile):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基础镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖说明并安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制源代码
COPY . .

# 启动命令
CMD ["python", "daemon.py"]

3.2 技术栈分离

问题:
现在的 Register Server 既要处理 /heartbeat 这种高频 API 请求,又要负责渲染 Jinja2 的 HTML 页面,甚至还把 CSS 和 JS 混在一起。这违背了“单一职责原则”。

重构方案:

  1. 前后端分离: 将前端剥离为独立的 Vue 或 React 项目,通过 Nginx 托管静态资源。
  2. API 独立: Flask 后端只负责输出 JSON 数据,退化为纯粹的 RESTful API Server。

4. 结语

这篇文章介绍了我对于ota系统的一系列改造过程,通过这一系列现代化的改造方案——从递归到 Celery 任务队列、从短连接到数据库连接池、从裸机部署到 Docker 容器化,博主试图将一个典型的学生玩具项目进化为“工程级”系统的。笔者以为真正的工程师,不看你写了多少 Demo,而看你能填多少自己挖的坑,以及你能否从这些坑里爬出来,走向更优雅的架构。