来聊聊去中心化 Redis 集群节点如何完成通信

一、写在文章开头

今天我们来聊点有意思的,关于redis中集群间通信的设计与实现,本文将从源码的角度分析redis集群节点如何利用Gossip协议完成节点间的通信与传播,希望对你有帮助。

二、详解Redis集群节点通信的设计与实现

1. 详解Gossip协议

在此之前我们先简单介绍一下Gossip协议,该协议是分布式集群的一种通信协议,我们都知道管理集群的方式有中心化和去中心化两种方式,中心化的方式是通过第一个第三方的管理中心,例如zookeeper等来维护一份集群节点的信息、状态:

而redis采用的是去中心化的方式实现集群节点通信,即通过Gossip协议进行节点通信,让各个节点之间两两通信,广播与自己保持交流的节点,由此将节点串联起来构成一张关系网:

我们以一个简单的场景为例介绍一下Gossip协议,默认情况下我们的当前有3个节点的集群,各个节点彼此按照通信要求发送自己的信息和与自己保持交流的节点,由此将有限的资源共享出去构成一个集群。

此时,我们需要横向扩展一个节点4,我们只需配置/redis-cli --cluster add-node 新节点IP:新节点端口 任意存活节点IP:任意存活节点端口,这个存活节点后续和其他节点通信时,就会将当前新添加的节点4发送出去,由此其他节点收到这个消息并存储下来,经过各个节点的不断反复通信,这个集群中的各个节点就会拥有集群中所有节点的信息。

2. 集群消息协定

任何通信都是需要按照协议规范进行,redis集群也一样,为了保证节点间通信的规范,redis要求集群节点通信的消息的类型可以是以下几种:

ping消息,用来向其他节点发送节点信息。回复ping的pong消息。如果当前节点中存在新添加的节点,则通过meet格式的消息发送给其他节点。如果节点出现故障,则发送fail消息告知集群其他节点。

对此我们给出消息的宏定义代码,位于cluster.h中:

复制
//集群中的ping #define CLUSTERMSG_TYPE_PING 0 /* Ping */ //集群中的pong #define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ //想加入集群的节点 #define CLUSTERMSG_TYPE_MEET 2 /* Meet "lets join" message */ //某个节点有故障 #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */1.2.3.4.5.6.7.8.
3. 集群节点消息体

后续集群都会通过clusterMsg来表示一条消息,它记录消息长度以及发送节点名称、负责的slots以及节点端口号等信息:

复制
typedef struct { char sig[4]; //消息总长度 uint32_t totlen; //...... //消息类型 uint16_t type; //...... //发送节点的名称 char sender[REDIS_CLUSTER_NAMELEN]; //发送节点负责的slots unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; //...... char notused1[32]; //节点端口 uint16_t port; //...... //记录消息的消息体 union clusterMsgData data; } clusterMsg;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.

这里我们对这个消息体clusterMsgData进行展开说明一下,可以看到他用一段共用体维护各种类型消息的结构,这其中我们只需要了解的是ping消息,从注释可以看到ping消息这个结构体可以发送ping、meet、pong等类型消息,ping消息类型其内部用clusterMsgDataGossip数组维护,这一点这个消息可以包含多个节点信息存于数组中:

复制
union clusterMsgData { //可以发送ping meet pong的消息,该结构体内部有clusterMsgDataGossip数组,这意味这个结构体可以存放多个节点的消息 struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping; //...... };1.2.3.4.5.6.7.8.9.

步入clusterMsgDataGossip即可看到这个结构体存储的是需要发送给它人的节点名称、ping和收到ping的时间以及端口号等信息:

复制
typedef struct { char nodename[REDIS_CLUSTER_NAMELEN];//节点名称 uint32_t ping_sent; //发送ping的事件 uint32_t pong_received;//收到pong的事件 char ip[REDIS_IP_STR_LEN]; //广播的节点ip uint16_t port; //节点与客户端进行通信的端口 //...... } clusterMsgDataGossip;1.2.3.4.5.6.7.8.

我们来简单小结一下,假设我们的某个节点向其他节点发送ping消息告知自己维护的节点信息和状态,那么对应的消息格式大体如下图所示:

4. 详解集群节点ping流程

集群节点的指向流程也是交由redis的时间事件serverCron执行,它会每个100ms执行一次集群的定任务clusterCron方法,其内部会检查这个定时任务是否执行了10次,一旦执行10次(也就是100ms*10即每1秒)后就会随机从当前节点维护的其他节点信息字典表中抽取5个节点,找到最早回复pong给当前节点发送一条ping消息:

对此我们给出定时执行的serverCron函数,可以看到其内部每100ms执行一次集群定时任务clusterCron:

复制
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { //...... //100ms执行一次集群的函数 run_with_period(100) { if (server.cluster_enabled) clusterCron(); } //...... }1.2.3.4.5.6.7.8.

我们步入clusterCron即可看到,该定时任务会随机抽取5个节点然后找到最早给该节点发送pong的节点发送ping消息包:

复制
void clusterCron(void) { //...... // 每10次即每过去1s执行一次这段逻辑 if (!(iteration % 10)) { int j; //随机选出5个节点 for (j = 0; j < 5; j++) { de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); /* Dont ping nodes disconnected or with a ping currently active. */ //断连、或者自己、或者正在握手的节点不处理 if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; //选择最早收到pong的节点 if (min_pong_node == NULL || min_pong > this->pong_received) { min_pong_node = this; min_pong = this->pong_received; } } //向最早收到pong的调用clusterSendPing发送消息 if (min_pong_node) { redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); } } //...... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.

步入clusterSendPing即可看到我们所说的核心逻辑,即按照公式计算出要发送给最早回复pong的节点对应节点数,然后封装成消息发送出去:

复制
void clusterSendPing(clusterLink *link, int type) { //...... //我们希望添加的最大节点数,集群总是减去自己和正在握手的 int freshnodes = dictSize(server.cluster->nodes)-2; //...... //计算wanted wanted = floor(dictSize(server.cluster->nodes)/10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; //...... /* Populate the header. */ //设置ping消息头,构建端口号、slot等信息 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ int maxiterations = wanted*3; //基于maxiterations进行循环随机抽取自己维护的节点信息并组装 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); clusterMsgDataGossip *gossip; int j; //如果是自己则跳过 if (this == myself) continue; //故障节点不发送 if (maxiterations > wanted*2 && !(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) continue; //.... freshnodes--; //组装当前节点的名称、ip、端口等信息存到hdr所指向的消息结构体 //指向gossip某个索引位置设置名称、ip、端口等 gossip = &(hdr->data.ping.gossip[gossipcount]); memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN); gossip->ping_sent = htonl(this->ping_sent); gossip->pong_received = htonl(this->pong_received); memcpy(gossip->ip,this->ip,sizeof(this->ip)); gossip->port = htons(this->port); gossip->flags = htons(this->flags); gossip->notused1 = 0; gossip->notused2 = 0; gossipcount++; } //...... //创建一个发送事件提交给redis发送出去 clusterSendMessage(link,buf,totlen); zfree(buf); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.
5. 等待pong消息回复并解析

每个集群的节点都会定时检查和对端链接的连接是否断开,如果断开的尝试异步非阻塞向其发送建立连接请求,并注册一个处理器clusterReadHandler处理对端的ping等消息,所以我们上文的ping消息实际上就是通过这个函数进行解析读取:

对此我们给出这段源码的入口即可集群的定时任务clusterCron方法,可以看到其内部会便利当前节点通信的节点,查看连接是否为空,若为空则发起连接并注册clusterReadHandler处理消息:

复制
void clusterCron(void) { //...... di = dictGetSafeIterator(server.cluster->nodes); //遍历与当前节点保持通信的节点 while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); //如果连接为空则非阻塞发起连接,然后注册clusterReadHandler处理对端节点的消息 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link; fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR); //...... //创建链接对应存储数据的空间 link = createClusterLink(node); link->fd = fd; node->link = link; //为这个链接注册clusterReadHandler处理发送的消息 aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); //...... } } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.

步入clusterReadHandler即可看到redis服务端解析消息存储到buf并通过clusterProcessPacket解析的逻辑:

复制
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { //...... while(1) { /* Read as long as there is data to read. */ //...... //hdr指向link->rcvbuf hdr = (clusterMsg*) link->rcvbuf; //读取消息到buf即link->rcvbuf中 nread = read(fd,buf,readlen); //...... if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) { //调用clusterProcessPacket解析这个连接的消息,即 link->rcvbuf if (clusterProcessPacket(link)) { sdsfree(link->rcvbuf); link->rcvbuf = sdsempty(); } else { return; /* Link no longer valid. */ } } } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.

而clusterProcessPacket即是该方法的核心所在,它会将对端节点发送的消息进行解析与处理,这里我们就以收到pong消息为例说明一下流程,假设回复pong的是master节点,它会更新收到这条网络连接pong响应时间,然后解析报文内容,如果发现有个节点不在我们的节点列表中,将其存入node字典表中:

复制
int clusterProcessPacket(clusterLink *link) { //...... /* Perform sanity checks */ //消息完整性校验 //...... /* Check if the sender is a known node. */ //检查发送节点是否是已知节点 sender = clusterLookupNode(hdr->sender); //...... //...... /* PING, PONG, MEET:消息处理逻辑 */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { //...... //如果收到pong则更新pong_received为当前时间 if (link->node && type == CLUSTERMSG_TYPE_PONG) { link->node->pong_received = mstime(); link->node->ping_sent = 0; //...... } //...... //如果当前节点是已知节点,则调用clusterProcessGossipSection查看当前pong消息中的内容是否包含未知的、新加入的节点 if (sender) clusterProcessGossipSection(hdr,link); } else if (type == CLUSTERMSG_TYPE_FAIL) { //...... } //...... return 1; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.

步入clusterProcessGossipSection即可看到该函数会遍历消息中的节点,一旦发现该节点是新添加节点则调用clusterStartHandshake其存入nodes字典表中:

复制
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { uint16_t count = ntohs(hdr->count); //解析当前节点gossip消息内容 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); //遍历node while(count--) { //...... //打印当前节点信息 redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s", g->nodename, g->ip, ntohs(g->port), ci); node = clusterLookupNode(g->nodename); if (node) {//已知节点处理,如果不可通信才握手重连 //...... } else {//未知节点则发起握手,若握手建立通信成功则将其存入nodes字典中 //...... if (sender && !(flags & REDIS_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) { clusterStartHandshake(g->ip,ntohs(g->port)); } } //走到下一个节点 g++; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.

我们给出clusterStartHandshake中将其存入server的cluster的nodes字典表的逻辑:

复制
int clusterStartHandshake(char *ip, int port) { //...... //如果处于握手中,则说明之前已经发现并进行通信了,直接返回 if (clusterHandshakeInProgress(norm_ip,port)) { errno = EAGAIN; return 0; } //基于消息创建node结构其,并调用clusterAddNode将其存入server.cluster->nodes字典表中 n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET); memcpy(n->ip,norm_ip,sizeof(n->ip)); n->port = port; clusterAddNode(n); return 1; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

三、小结

来简单小结一下Redis集群节点如何通过Gossip协议构建集群网络的:

新节点通过meet和集群中某个节点a建立连接。当前节点执行clusterCron定时任务时,随机抽取5个节点并找到最早回复pong的实例,假设是节点a,发送ping消息。注册clusterReadHandler处理器其他节点发送的消息。收到节点a的pong消息回复,判断查看该节点是否是已知节点,如果是则调用clusterProcessGossipSection解析报文内容,如果存在新节点则进行握手通信,如果连接建立成功则将该节点存入当前实例的nodes节点中。

阅读剩余
THE END