项目背景
本次接手的项目是一个物联网项目,需要与大量设备建立长连接、远程下发指令进行群控操作
这些设备使用一套私有协议进行控制,而且只提供有Go语言版本的SDK。但是我们的开发团队主要使用的语言是Java,因此在讨论后选择了使用Go语言对接这套SDK,只暴露控制相关的接口,业务逻辑依旧使用Java来实现
考虑到设备在Go端初始化过后需要保持长连接,必定会消耗大量的内存,所以要支持多节点部署,加一层网关层路由便于调用,并且能进行扩容/缩减
最终架构确定为:Java端(HTTP调用)->Nginx网关(路由)->Go端(控制设备)
实现思路
设备路由
选择Nginx的原因很简单,除了对高并发的支撑,内置的路由策略就可以支持将同台设备始终路由至同一个节点的效果
为了实现这个效果,需要在业务层给所有设备生成一个唯一值(例如deviceId)。在请求中固定添加一个Param参数传入该值,Nginx再读取该参数进行Hash路由
状态恢复
设备在Go端初始化后会创建实例信息在内存,节点在重启之后必须恢复状态,因此需要再添加一个Redis缓存来记录节点初始化的设备列表
缓存的键名最初考虑用节点IP命名, 但考虑到节点可能会被替换(容灾特性)。如果给每个节点设置一个编号会更合理,按照编号缓存
随即新的问题又出现了,一旦节点的数量发生扩容/缩减,设备会被路由至其他节点,所以使用节点编号给缓存的Key命名同样行不通
之后博主在查询资料的时候突然想到可以借鉴Redis集群的实现方案:Redis集群固定了16384个槽(Slot)用于对Key进行Hash分片。每个Key通过Hash算法被映射到一个槽之中,每个槽都由一个Redis节点负责。为保证负载均衡,如果节点数量变化,槽会被尽可能均匀地分配到所有节点上
为了实现相同的效果,需要在Go端增加配置,记录最大槽数、总节点数、当前节点索引,各参数的解释如下:
最大槽数(SlotCount):也代表着最大可以部署的节点数量,确定后就不能再变更
总节点数(SliceCount):实际部署的节点数量,必须小于最大槽数,且满足
最大槽数 % 总节点数 = 0当前节点索引(ClusterIndex):在重启后判断槽是不是该分配给自己
前两个参数固定写进配置即可,当前节点索引可以考虑给每个节点设置一个固定前缀的主机名 + 索引号,索引从0开始,在节点启动时读取
流程梳理
在实现方案确定后,每个执行链路也就明了了
技术难点
由于Hash算法的实现有很多种,一台设备的deviceId在经过Nginx或Go端的Hash计算后得到的值可能不一样,导致Nginx将设备路由到节点A,但节点重启后被恢复到了节点B
所以算法必须统一,保证路由和重启后恢复到的是同一个节点。Nginx使用的算法虽然不能修改,但是在Github的开源项目中有相应的文件:
第205行的注释说明使用的是crc32:
/*
* Hash expression is compatible with Cache::Memcached:
* ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
* with REHASH omitted at the first iteration.
*/在Go端的相同实现:
/**
* 计算deviceId对应的slot
*/
func calSlot(deviceId string, slotCount int) int {
// 使用CRC32,和nginx一样的算法计算hash
hash := crc32.ChecksumIEEE([]byte(deviceId))
// 右移16位,与0x7fff进行位与操作
hash = (hash >> 16) & 0x7fff
// 对槽的总数取模
return int(hash) % slotCount
}配置示例
Java端
只需要使用任一HTTP框架(OkHttp、HttpClient、Feign、Forest等),请求Nginx网关的地址,在所有请求中携带Param参数deviceId即可
网关层
可以直接参考这里的nginx配置文件
user www-data;
worker_processes auto;
error_log /var/log/nginx/error.log;
include /etc/nginx/modules-enabled/*.conf;
events {
worker_connections 2048;
}
http {
# --- 基本设置 ---
sendfile on;
tcp_nopush on;
types_hash_max_size 2048;
include /etc/nginx/mime.types;
default_type application/octet-stream;
# --- 日志 & Gzip ---
log_format debug '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$upstream_addr"';
access_log /var/log/nginx/access.log debug;
gzip on;
gzip_types text/plain text/css application/javascript application/json;
# --- 上游服务器 ---
upstream backend_pool {
# 使用deviceId参数进行哈希路由
hash $arg_deviceId;
# 节点地址,关闭被动健康检查机制
server 10.0.0.1:8080 max_fails=0;
server 10.0.0.2:8000 max_fails=0;
server 10.0.0.3:8000 max_fails=0;
server 10.0.0.4:8000 max_fails=0;
}
# --- Server 块 ---
server {
listen 80;
# server_name ;
# 上传文件和超时设置
client_max_body_size 200M; # 限制上传大小
client_body_timeout 300s; # 请求体读取超时
client_header_timeout 300s; # 请求头读取超时
send_timeout 300s; # 响应超时
location / {
proxy_pass http://backend_pool;
proxy_next_upstream off; # 禁止自动切换
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
}配置的关键是在upstream里使用hash算法进行路由,将各节点的IP按照索引顺序配置进来
还要设置max_fails=0关闭被动健康检查机制,并在location块里配置proxy_next_upstream off禁止自动切换,这么做的目的是防止某个节点不可用时Nginx自动切换其他至节点路由(大坑)
Go端
参考以下内容创建一个cluster_config.go
import (
"hash/crc32"
"os"
"strconv"
"strings"
)
type ClusterConfig struct {
SlotCount int // 最大槽数,也代表最大的节点数量,部署后不能再变动
SliceCount int // 总节点数,必须满足:最大槽数 % 总节点数 = 0,若发生变更需要同步配置到nginx
HostNamePrefix string // 主机名前缀,后面跟序号,从0开始
}
var (
clusterConfig *ClusterConfig // 集群配置
hostName string // 当前节点主机名
clusterIndex int // 当前节点索引
)
/**
* 初始化集群配置
*/
func init() {
clusterConfig = &ClusterConfig{}
// TODO 读取配置
err := loadConfig(clusterConfig)
if err != nil {
panic("Init Cluster Config Error: Load Config Fail")
}
// 根据主机名确定当前节点索引
hostName = getHostName()
if clusterConfig.HostNamePrefix == "" || !strings.HasPrefix(hostName, clusterConfig.HostNamePrefix) {
// 当前的主机名前缀不符合配置要求
panic("Init Cluster Config Error: HostNamePrefix Not Available")
} else {
// 当前的主机名前缀符合配置要求,当前节点索引就是在集群里的索引
indexStr := strings.TrimPrefix(hostName, clusterConfig.HostNamePrefix)
clusterIndex, err = strconv.Atoi(indexStr)
if err != nil {
panic("Init Cluster Config Error: Convert clusterIndex Fail")
}
}
// 恢复设备状态
restoreDevice()
}
/**
* 获取主机名
*/
func getHostName() string {
name, err := os.Hostname()
if err != nil {
panic("Get HostName Error")
}
return name
}
/**
* 计算deviceId对应的slot
*/
func calSlot(deviceId string, slotCount int) int {
// 使用CRC32,和nginx一样的算法计算hash
hash := crc32.ChecksumIEEE([]byte(deviceId))
// 右移16位,与0x7fff进行位与操作
hash = (hash >> 16) & 0x7fff
// 对槽的总数取模
return int(hash) % slotCount
}
/**
* 恢复设备状态
*/
func restoreDevice() {
// 遍历0 到 最大槽数 - 1,判断该槽 % 总节点数 = 当前节点索引
for slot := 0; slot < clusterConfig.SlotCount - 1; slot++ {
if slot%clusterConfig.SliceCount == clusterIndex {
// TODO 读取Redis该slot下的deviceId列表并恢复
}
}
}
/**
* 保存deviceId至slot
*/
func PutCache(deviceId string) {
slot := calSlot(deviceId , clusterConfig.SlotCount)
// TODO 保存至Redis该slot下
}维护
如果节点IP变更,将IP替换到Nginx配置即可
如果节点数量发生变化,需要修改配置中的SliceCount,将新的节点IP配置到Nginx