该项目自主实现了分布式环境下本地服务在RPC节点上的注册、发布与远程调用功能。主要实现了自定义通信协议、服务注册中心、日志系统及高并发网络模型,具备高性能和良好的扩展性。

RPC介绍:是远程过程调用的缩写,可以通过网络从远程服务器上请求服务。具体功能简要来讲就是用户可以像在本地调用函数一样在客户端从服务器远程调用函数。

以下为需要的前置知识学习,不需要的可以跳过此部分:

ZooKeeper概述

概念:一个分布式的开源的分布式应用程序的协调服务。

功能:1.配置管理2.分布式锁3.集群管理。

  1. 配置管理:对于多个程序,设置一个配置中心,将配置信息写入配置中心中,当需要相应的配置信息时,直接从配置中心拉取对应的配置信息。
  2. 分布式锁:
  • 单机:当一个用户访问数据时,为其加锁,当该用户访问完后解锁,其他用户才能进入并访问。
  • 多用户:设置一个分布式锁的组件,当其中一个事务上锁时,另外一个事务需等待锁释放后才能上锁。
  1. 集群管理:设置注册1中心,provider将地址提供给注册中心,当consumer需要访问provider时,先从注册中心获取provider的地址,然后访问provider。

(本项目仅用到了zookeeper的api配置,所以只了解到这里)

TCP/IP协议的工作原理:

它是一个协议簇,包含四个协议:

应用协议:主要包括HTTP协议(超文本传输协议),SMTP协议(简单邮件传送协议),FTP(文件传输协议)等

传输层协议:TCP(传输控制协议),UDP(用户数据报协议)

网际互联协议:IP, ARP, RARP, ICMP

路由控制协议

(一)TCP协议的核心特性

  1. 面向连接:在数据传输前先建立连接,传输完成后释放连接。
  2. 可靠性:通过序列号、确认应答、超时重传机制确保数据准确到达。
  3. 字节流传输:数据被视为连续的字节流,没有消息边界的概念。
  4. 全双工通信:运行通信双方同时进行数据传输。
  5. 流量控制:通过滑窗机制控制数据发送速率,防止接收双方缓冲区溢出。
  6. 拥塞控制:动态调整数据发送速率,避免网络拥塞。

(二)三次握手概念

第一步:客户发送SYN包:客户端向服务器发送一个TCP数据包,SYN(同步序列号)标志位被设为1,表示这是一个连接请求包。同时客户端随机选择一个初始序列号(ISN),假设为x,将其放入数据包的序列号字段中,客户端发送SYN包后进入SYN_SENT阶段,等待服务器的响应。

第二步:服务器发送SYN-ACK包:服务器接到客户端的SYN包后,会为该连接分配必要的资源,服务器将SYN与ACK标志位都设置为1,表示同意建立连接并对客户端的请求进行确认。服务器也会随机选择一个初始序列号,假设为y,放入数据包的序列号字段中,同时将确认号字段设置为x+1,表示已收到客户端的SYN包,期待接下来收到客户端序列号为x+1的数据包,服务器发送SYN-ACK包后进入SYN-RCYD状态。

第三步:客户端发送ACK包:客户端接收到服务器发送的SYN-ACK包后,会检查确认号是否为x+1,如果是,则认为服务器已正确接受到自己的SYN包,客户端将ACK标志位设置为1,序列号字段设置为x+1,确认号字段设置为y+1,表示已收到服务器的SYN包,期望接下来收到服务器序列号为y+1的数据包。客户端发送ACK包后进入ESTABLISHED状态,此时客户端可以开始向服务器发送数据。

服务器接收到ACK包后,也进入ESTABLISHED状态,双方连接建立成功。

三次握手的设计原理:1.客户端发送SYN确保客户端有发送数据的能力。第二次握手证明服务器具有接收和发送数据的能力。第三次握手证明客户端可以接收数据。

  1. TCP协议通过同步初始序列号来跟踪每个字节的传输速度,并且初始序列号是随机生成的,降低被预测攻击的风险。
  2. 防止历史连接的干扰,服务器在接受到SYN时会发送SYN-ACK包,而当客户端接受到SYN-ACK包时会根据确认号是否正确才会发送最后的ACK包。

(三)TCP优化:

TFO技术:在SYN报文中携带数据,减少一次RTT(往返时间)延迟。

SYN cookie:一种应对SYN flood攻击的技术。SYN flood攻击:攻击者伪造大量的SYN包,耗尽服务器资源,导致正常请求无法处理。SYN cookie技术在SYN-ACK包中嵌入

(四)TCP报文

里面有几个关键字段:

  1. 序号:标识本报文段在发送方字节流中的位置。
  2. 确认号:表示接受方期望接收的下一个字节的序号。如果接收到了序号为x的字节,那么它会返回确认号x+1,确认号只有在ACK标志位为1的报文中有效。
  3. 控制位。包含八个标志位,用于控制 TCP 的各种操作。其中,CWR 标志与后面的 ECE 标志都用于 IP 首部的 ECN 字段,ECE 标志为 1 时,则通知对方已将拥塞窗口缩小;ECE若其值为 1 则会通知对方,从对方到这边的网络有阻塞。在收到数据包的 IP 首部中 ECN 为 1 时将 TCP 首部中的 ECE 设为 1;ACK 标志位表示确认号是否有效;SYN 标志位用于建立连接;FIN 标志位用于关闭连接;RST 标志位用于重置连接;PSH 标志位提示接收方应尽快将数据传递给应用层;URG 标志位表示紧急指针是否有效。

(五)TCP的数据分段与重组

每个网络链路都有一个最大传输单元(NTU),表示该链路能传输的最大数据包大小。TCP在发送数据时,会根据网络的MTU来决定每个段的大小,以避免数据在网络层被分片。TCP通常在三次握手过程中协商最大段大小(MSS)来确定每个段的最大数据量。MSS通常等于MTU减去IP头和TCP头的长度。

(六)可靠传输

  1. ACK延迟:接收方在接收多个报文后发送一个ACK报文,或者有数据要发送时将ACK与数据一同发送,提高传输效率。
  2. PSH标志:接收方在收到有PSH标志的报文时会立刻将缓冲区的数据传递给应用层,不用等待缓冲区满。
  3. 超时重传:设置超时时间,当一段时间内发送方没有接到反馈的ACK包就会重新发送该数据段。TCP超时时间一般由RTT(数据往返时间)确定。
  4. 最大重传次数:当超过一定重传次数后依然没有收到ACK包,发送方则认为接收端出现了问题,则会关闭连接。
  5. 快速重传,当接收方收到失序的段时,会发送重复的ACK包,这样发送方就会得知接收方期望序列的数据包没有发送,从而立即重传该段。
  6. 序列号的作用:1。数据排序。2数据去重。3确认应答。
  7. 数据去重的判断步骤:检查接收到的数据的序列号,若该数据在接收窗口内并且还没被接收,则将其放入缓冲区。以下情况将被丢弃:(1)已被确认接收的数据(2)如果该序列号大于接收窗口的上界,则将其丢弃。

(七)滑动窗口与流量限制

  1. 发送方滑窗设置:已发送并确认,已发送未确认,未发送但可发送,未发送不可发送。
  2. 接收方滑窗:表示可以接收的数据量,由接收方剩余缓冲区空间决定。
  3. 缩放因子:当窗口大小不足以充分利用网络带宽时,接收方通告一个缩放因子n,那么发送方在接收到这个缩放因子后发送的实际窗口大小则为2^n*窗口大小个字节。

(八)拥塞控制机制(四个阶段)

  1. 慢启动:连接建立初期,拥塞窗口初始化为一个最大段大小,每收到一个ACK报文拥塞窗口就增加一个MSS的大小;或者每收到一轮的ACK报文,拥塞窗口就翻倍。公式如下:

$ cwnd=cwnd+MSS $

$ Cwnd=cwnd*2 $

  1. 慢启动阈值:当拥塞窗口增长到慢启动阈值时,结束慢启动阶段,进入拥塞避免阶段。
  2. 拥塞避免阶段:拥塞窗口进入线性增长状态。

$ cwnd=cwnd+(MSS*MSS)/cwnd $4.拥塞判断:(1)超时事件(将拥塞窗口减半,并且重置为一个MSS)(2)发送方接收到三个重复的ACK报文(执行快速重传和快速回复)

5.快速重传:前面已经说过。

6.快速恢复:将慢启动阈值设置为当前拥塞窗口一半,将拥塞窗口设置为慢启动阈值加上3倍的MSS,随后执行拥塞避免算法。

(九)四次挥手

1,主动关闭方发送FIN包:FIN标志位设为1,序列号字段设为u,进入FIN_WAIT_1状态

2.被动关闭方发送ACK包,ACK标志位设为1,序列号字段设为v,确认号字段设为u+1,等待确认,进入CLOSE_WAIT状态,此时被动关闭方依然可以向主动关闭方发送数据

3.被动关闭方发送FIN包:被动关闭方传输完数据后,向主动关闭方发送一个FIN包,标志位设为1,序列号字段为v,确认号字段设为u+1,随后进入LAST_ACK状态。

4.主动关闭方发送ACK包,标志位为1,序列号字段为u+1,确认号字段为v+1,,进入TIME_WAIT阶段,等待一段时间(2倍的最大段生存期2MSL)

被动关闭方收到ACK包后进入CLOSED状态,主动关闭方等待一段时间后也进入CLOSED状态。

(十)TCP与UDP区别

TCP面向连接, 可靠传输,字节流,UDP无连接,不可靠传输,数据报

TCP效率较UDP低,消耗资源较高。

IO模型:

  1. 阻塞型IO:在操作系统完成IO操作前,会中断其他事务的操作,操作简单,适用于并发量小的开发。
  2. 非阻塞型IO:在操作系统完成IO操作前依然可以进行其他事务的操作,但是会使用轮询或事件机制两种机制去确认该IO操作是否完成:1.轮询就是时常询问该IO是否完成,会占用CPU时间2.事件机制就是IO完成后发送中断完成信号。非阻塞IO适用于高并发场景。
  3. 同步型IO:必须等待IO操作完成后才能执行后续代码。(要点:阻塞型IO一定是同步型IO,但是同步型IO不一定是阻塞型IO,比如使用休眠机制等待IO完成)
  4. 异步IO:程序发起IO操作后立即返回,内核负责完成所有操作,完成后通知应用程序。适用于极高并发需求的场景。
  5. 多路复用:允许单个线程通过一个系统调用监视多个文件描述符,当其中任何一个描述符就绪时,系统调用返回。下面讲select,poll以及epoll机制。

(一)多路复用的三种机制:

Select:初始化fd_set集合,调用select并等待,随后select遍历所有fd检查就绪状态,然后处理就绪的fd。

Poll:初始化pollfd数组,调用poll并等待,poll返回后遍历pollfd数组检查revents,返回就绪的fd。

Epoll:创建epoll实例,使用epoll_ctl添加修改删除监视的fd,调用epoll_wait等待事件,处理返回的就绪事件。

以下是三种机制的区别:

  1. 性能和效率
    • Select:在处理大量文件描述符时性能较差,因为每次调用都需要遍历所有文件描述符,且文件描述符数量受限(通常为1024)。
    • Poll:性能较Select有所提升,因为Poll支持更大的文件描述符数量(无上限),但仍然需要遍历所有文件描述符。
    • Epoll:性能最优,尤其是处理大量文件描述符时。Epoll采用事件驱动机制,只处理就绪的文件描述符,避免了遍历所有文件描述符的开销。
  2. 文件描述符数量
    • Select:文件描述符数量有限制(通常为1024)。
    • Poll:文件描述符数量无上限。
    • Epoll:文件描述符数量无上限,适合处理大量连接。
  3. 阻塞和唤醒机制
    • Select:阻塞在select调用,直到有文件描述符就绪。
    • Poll:阻塞在poll调用,直到有文件描述符就绪。
    • Epoll:阻塞在epoll_wait调用,直到有注册的事件发生。Epoll还支持边缘触发(Edge-Triggered)和水平触发(Level-Triggered)模式,进一步优化性能。
  4. 内存使用
    • Select:需要维护一个fd_set集合,内存使用较高。
    • Poll:需要维护一个pollfd数组,内存使用较高。
    • Epoll:内存使用较低,因为Epoll只处理就绪的文件描述符。
  5. 适用场景
    • Select:适用于小规模应用,文件描述符数量较少。
    • Poll:适用于中等规模应用,文件描述符数量较多但不极端。
    • Epoll:适用于大规模应用,特别是需要处理大量并发连接的场景(如高并发服务器)。

总结:

  • Select:简单易用,但性能较差,适用于小型应用。
  • Poll:性能较Select有所提升,适用于中型应用。
  • Epoll:性能最优,适用于大规模高并发场景,是Linux环境下处理大量文件描述符的最佳选择。

HTTP协议:

(一)请求响应步骤:

  1. 客户端与服务器HTTP端口建立TCP套接字连接。
  2. 发送HTTP请求:通过TCP套接字客户端向服务器发送一个文本的请求报文,一个请求报文由请求行,请求头部,空行与请求数据四部分组成。
  3. 服务器接受请求并返回HTTP响应:服务器解析请求,定位请求资源。服务器将资源复本写到TCP套接字,由客户端读取。一个响应由状态行、响应头部、空行与响应数据四部分组成。
  4. 释放TCP连接:两个状态:close状态,服务器主动关闭连接,客户端被动关闭连接。Keepalive状态,连接会保持一段时间,在时间内可以继续接受请求。
  5. 客户端浏览器解析HTML内容:首先解析状态行,查看表明请求是否成功的状态代码。随后解析响应头,响应头告知HTML文档,随后读取响应数据HTML并对其格式化并显示。

(二)无连接

服务器处理完客户请求并收到客户应答后即断开连接。但是HTTP1.1对其进行了优化,在处理完客户请求后,会等待几秒后再关闭连接,这几秒等待的作用是在客户还有后续请求时可以复用该连接,提高效率。

(三)请求方法(区分大小写,这部分需要注意)

  1. GET:向指定的资源发出“显示”请求。
  2. HEAD:与GET方法一样,向服务器发出指定资源的请求。不过服务器不传回资源的本文部分。
  3. POST:向指定资源提交数据,请求服务器进行处理。数据被包含在请求本文中。这个请求可能会创建新的资源或修改现有资源。
  4. PUT:向指定资源位置上传其最新内容。
  5. DELETE:请求服务器删除request-url所标识的资源。
  6. TRACE:回显服务器收到的请求,主要用于测试或诊断。
  7. OPTIONS:使服务器传回该资源所支持的所有HTTP请求方法,用‘*’代替资源名称向web服务器发送OPTION请求,可以测试该服务器功能是否正常运作。
  8. CONNECT:HTTP1.1协议中预留给能将连接改为管道方式的代理服务器,通常用于SSL加密服务器的链接。

注意:

  1. 服务器不支持对应请求时会发送405,不认识对应请求时会发送501。
  2. GET与HEAD方法是必须实现的,其他方法可选。
  3. GET提交的数据会放在URL之后,也就是请求行里面,用?分割URL与传输数据,参数之间以&相连。
  4. GET提交的数据大小有限制,而POST提交的数据没有限制

(四)状态码

所有HTTP响应的第一行都是状态行,依次是当前HTTP版本号,3位数字组成的状态代码,以及描述状态的短语,彼此由空格分隔。

状态代码的第一个数组代表当前响应的类型。

(五)URL

超文本传输协议(HTTP)的统一资源定位符将从因特网获取信息的五个基本元素包括在一个简单的地址中:

  • 传送协议。 层级URL标记符号(为[//],固定不变) 访问资源需要的凭证信息(可省略) 服务器。(通常为域名,有时为IP地址)

  • 端口号。(以数字方式表示,若为HTTP的默认值“:80”可省略) 路径。(以“/”字符区别路径中的每一个目录名称)

  • 查询。(GET模式的窗体参数,以“?”字符为起点,每个参数以“&”隔开,再以“=”分开参数名称与数据,通常以UTF8的URL编码,避开字符冲突的问题)

  • 片段。以“#”字符为起点

http://www.luffycity.com:80/news/index.html?id=250&page=1 为例, 其中:

  • http,是协议;
  • http://www.luffycity.com,是服务器;
  • 80,是服务器上的默认网络端口号,默认不显示;
  • /news/index.html,是路径(URI:直接定位到对应的资源);
  • ?id=250&page=1,是查询。

大多数网页浏览器不要求用户输入网页中“http://”的部分,因为绝大多数网页内容是超文本传输协议文件。同样,“80”是超文本传输协议文件的常用端口号,因此一般也不必写明。一般来说用户只要键入统一资源定位符的一部分就可以了。

由于超文本传输协议允许服务器将浏览器重定向到另一个网页地址,因此许多服务器允许用户省略网页地址中的部分,比如 www。从技术上来说这样省略后的网页地址实际上是一个不同的网页地址,浏览器本身无法决定这个新地址是否通,服务器必须完成重定向的任务。

数据序列化与反序列化

由于项目主要使用的是Proto3,这里针对Proto3进行学习。主要信息来源:Proto官方指南

(一)定义消息类型

如果要用proto3来编写,syntax=”proto3”;必须是文件的第一个非空、非注释行,如果没有指定则默认proto2。

message 消息名{}:花括号内定义字段的名称与类型

例:string name=1;这里1是编号,name是名称,string是类型。

(二)指定字段类型

字段类型除了标量类型还有枚举及其他消息类型等复合类型。

(三)分配字段编号

  1. 给定编号在该消息的所有字段中必须唯一。
  2. 字段编号1900到1999保留用于Protocol Buffers实现。
  3. 不能使用任何以前保留的字段编号,也不能使用分配给扩展的任何字段编号。
  4. 消息类型一旦投入使用,编号不能更改
  5. 字段编号不能被重用。
  6. 常设置的字段编号应设置在1-15之间,1-15占用1个字节,16到2047占用2个字节。

(四)指定字段基数

  1. Singular:推荐使用optional类型的单一字段,它具有两种可能的状态:1.字段已设置,它将会被序列化到线格式中。2.字段未设置,将返回默认值,不会被序列化到线格式中。
  2. repeated:该字段类型在格式良好的信息中可以重复零次或多次。重复值的顺序将得到保留。在proto3中,标量数值类型的repeated字段默认使用packed编码。
  3. Map:这是一种成对的键/值字段类型。

(五)格式良好的消息

单一字段可以在线格式字节中出现多次。解析器接受输入,但只有该字段的最后一个实例可以通过生成的绑定访问。

(六)更多消息类型

单个.proto文件中可以定义多个消息类型,但是当大量具有不同依赖关系的消息定义在同一个文件中时,这也会导致依赖膨胀。

(七)删除字段

当需要在客户端代码中删除所有引用时,必须保留已删除的字段编号,否则开发者会很有可能重用该编号,并且应该保留字段名称,以便JSON编码与TextFormant编码能继续解析。将保留的字段编号与字段名称写入reserved列表中,一个reserved语句中不能混合使用字段名称与字段编号。

(八)文件生成

对于C++,每个.proto文件会生成一个.h文件和.cc文件,文件中描述的每种消息类型都会有一个对应的类。

(九)标量值类型

类型较多,详情请见官方文档。

(十)默认字段值

  1. 对于字符串,默认值为空字符串
  2. 对于bytes,默认值为空bytes
  3. 对于bools,默认值为false
  4. 对于数值类型默认值为0
  5. 对于枚举,默认值是定义的第一个枚举值,该值必须为0
  6. 重复字段默认值为空
  7. map字段默认值为空。

消息类型到此为止,以下为服务定义部分:

1
2
3
4
5
6
7
service CalculatorService {

rpc Add (AddRequest) returns (AddResponse);

rpc Subtract (SubtractRequest) returns (SubtractResponse);

}

这是一个服务定义的示例,里面一行的内容:

Add、SubStract:远程方法名

AddRequest、SubtractRequest:输入参数类型,必须是消息类型

AddResponse、SubtractResponse:返回参数类型,必须是消息类型

Proto3的基础使用语法部分到此为止,后续为C++侧需要实现的部分。

Muduo网络库:

这部分作者陈硕出了一本书叫《Linux多线程服务端编程》,建议直接找书吃透,可以淘宝上买也可以在网上找到电子书资源。

代码正式学习(这里主要提出来设计细节):

user.proto:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
syntax="proto3";//使用proto3语法

package Kuser;//命名空间为Kuser

option cc_generic_services=true;//生成C++服务接口

message ResultCode{
int32 errcode=1;//错误码
bytes errmsg=2;//错误信息,C++中对应std::string
}
message LoginRequest{
bytes name=1;//用户名
bytes pwd=2;//密码
}
message LoginResponse{
ResultCode result=1;//操作结果
bool success=2;//登录是否成功
}
message RegisterRequest{
uint32 id=1;//用户id
bytes name=2;//用户名
bytes pwd=3;//密码
}
message RegisterResponse{
ResultCode result=1;//操作结果
bool success=2;//注册是否成功
}
service UserServiceRpc{//服务接口定义
rpc Login(LoginRequest) returns(LoginResponse);
rpc Register(RegisterRequest) returns(RegisterResponse);
}

设计要点:

  1. 错误处理标准化:resultcode作为统一错误返回结构,所有响应均包含操作结果与操作状态
  2. 服务扩展性:新增RPC方法只需在service中添加新rpc定义,新字段向后兼容。
  3. 跨语言:proto文件可生成Java/Python等客户端。

Krpcheader.proto:

1
2
3
4
5
6
7
8
syntax="proto3";
package Krpc;

message RpcHeader{
bytes service_name=1;//服务名
bytes method_name=2;//方法名
uint32 args_size=3;//参数数据长度
}

Krpcconfig(配置加载模块):

1
2
3
4
5
6
7
std::unique_ptr<FILE, decltype(&fclose)> pf(
fopen(config_file, "r"),
&fclose
);
if (pf == nullptr) { // 如果文件打开失败
exit(EXIT_FAILURE); // 退出程序
}

使用unique_ptr确保文件在任何情况下都会关闭,自定义删除器&fclose保证资源安全

配置解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
char buf[1024];  // 用于存储从文件中读取的每一行内容
// 使用pf.get()方法获取原始指针,逐行读取文件内容
while (fgets(buf, 1024, pf.get()) != nullptr) {
std::string read_buf(buf); // 将读取的内容转换为字符串
Trim(read_buf); // 去掉字符串前后的空格

// 忽略注释行(以#开头)和空行
if (read_buf[0] == '#' || read_buf.empty()) continue;

// 查找键值对的分隔符'='
int index = read_buf.find('=');
if (index == -1) continue; // 如果没有找到'=',跳过该行

// 提取键(key)
std::string key = read_buf.substr(0, index);
Trim(key); // 去掉key前后的空格

// 查找行尾的换行符
int endindex = read_buf.find('\n', index);
// 提取值(value),并去掉换行符
std::string value = read_buf.substr(index + 1, endindex - index - 1);
Trim(value); // 去掉value前后的空格

// 将键值对存入配置map中
config_map.insert({key, value});
}

根据key值查找对应的value:

1
2
3
4
5
6
7
8
// 根据key查找对应的value
std::string Krpcconfig::Load(const std::string &key) {
auto it = config_map.find(key); // 在map中查找key
if (it == config_map.end()) { // 如果未找到
return ""; // 返回空字符串
}
return it->second; // 返回对应的value
}

字符串处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 去掉字符串前后的空格
void Krpcconfig::Trim(std::string &read_buf) {
// 去掉字符串前面的空格
int index = read_buf.find_first_not_of(' ');
if (index != -1) { // 如果找到非空格字符
read_buf = read_buf.substr(index, read_buf.size() - index); // 截取字符串
}

// 去掉字符串后面的空格
index = read_buf.find_last_not_of(' ');
if (index != -1) { // 如果找到非空格字符
read_buf = read_buf.substr(0, index + 1); // 截取字符串
}

使用find_first_not_of与find_last_not_of除去空格,避免不必要的内存拷贝。

可优化点:

  1. 支持更多数据类型
  2. 添加热更新支持
  3. 支持多级配置

Krpclogger(日志系统):

RALL资源管理:

1
2
3
4
5
6
7
8
9
explicit KrpcLogger(const char *argv0)
{
google::InitGoogleLogging(argv0);
FLAGS_colorlogtostderr=true;//启用彩色日志
FLAGS_logtostderr=true;//默认输出标准错误
}
~KrpcLogger(){
google::ShutdownGoogleLogging();
}

构造函数初始化Glog系统,禁用拷贝构造与赋值保证单例性。

日志接口设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//提供静态日志方法
static void Info(const std::string &message)
{
LOG(INFO)<<message;
}
static void Warning(const std::string &message){
LOG(WARNING)<<message;
}
static void ERROR(const std::string &message){
LOG(ERROR)<<message;
}
static void Fatal(const std::string& message) {
LOG(FATAL) << message;
}

禁用拷贝构造函数与赋值重载函数:

1
2
3
4
//禁用拷贝构造函数和重载赋值函数
private:
KrpcLogger(const KrpcLogger&)=delete;
KrpcLogger& operator=(const KrpcLogger&)=delete;

可优化点:

  1. 增加文件输出
  2. 支持日志分级控制
  3. 添加日志轮转

Krpcapplication(框架入口与系统管理):

全局配置对象与互斥锁准备:

1
2
3
Krpcconfig KrpcApplication::m_config;  // 全局配置对象
std::mutex KrpcApplication::m_mutex; // 用于线程安全的互斥锁
KrpcApplication* KrpcApplication::m_application = nullptr; // 单例对象指针,初始为空

创建单例对象:

1
2
3
4
5
6
7
8
KrpcApplication &KrpcApplication::GetInstance() {
std::lock_guard<std::mutex> lock(m_mutex); // 加锁,保证线程安全
if (m_application == nullptr) { // 如果单例对象还未创建
m_application = new KrpcApplication(); // 创建单例对象
atexit(deleteInstance); // 注册atexit函数,程序退出时自动销毁单例对象
}
return *m_application; // 返回单例对象的引用
}

加锁保证线程安全,atexit确保资源释放。

命令行参数解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 初始化函数,用于解析命令行参数并加载配置文件
void KrpcApplication::Init(int argc, char **argv) {
if (argc < 2) { // 如果命令行参数少于2个,说明没有指定配置文件
std::cout << "格式: command -i <配置文件路径>" << std::endl;
exit(EXIT_FAILURE); // 退出程序
}

int o;
std::string config_file;
// 使用getopt解析命令行参数,-i表示指定配置文件
while (-1 != (o = getopt(argc, argv, "i:"))) {
switch (o) {
case 'i': // 如果参数是-i,后面的值就是配置文件的路径
config_file = optarg; // 将配置文件路径保存到config_file
break;
case '?': // 如果出现未知参数(不是-i),提示正确格式并退出
std::cout << "格式: command -i <配置文件路径>" << std::endl;
exit(EXIT_FAILURE);
break;
case ':': // 如果-i后面没有跟参数,提示正确格式并退出
std::cout << "格式: command -i <配置文件路径>" << std::endl;
exit(EXIT_FAILURE);
break;
default:
break;
}
}

// 加载配置文件
m_config.LoadConfigFile(config_file.c_str());
}

安全的资源管理:加载失败后会终止程序,与退出自动释放一同保证了资源不泄漏。

1
2
3
4
KrpcApplication(){}
~KrpcApplication(){}
KrpcApplication(const KrpcApplication&)=delete;
KrpcApplication(KrpcApplication&&)=delete;

禁用了所有构造方式,保证了严格的单例控制。

可优化点:

  1. 增强配置验证
  2. 支持动态重载

KrpcChannel:

该部分负责实现protobuf的rpcchannel接口。

RPC头部组装:

组装过程结合KrpcHeader来理解:设置service_name/method_name,计算参数序列化长度args_size。序列化rpcheader,写入头部长度,拼接头部+参数。

该部分作用:

  1. 标识这是RPC请求而非普通数据包。
  2. 指明调用的服务和方法。
  3. 校验数据,确保参数完整传输。

该部分包含的模块:

  1. 初始化客户端socket:
1
2
3
4
5
if (-1 == m_clientfd) {  // 如果客户端socket未初始化
// 获取服务对象名和方法名
const google::protobuf::ServiceDescriptor *sd = method->service();
service_name = sd->name(); // 服务名
method_name = method->name(); // 方法名
  1. 找到服务器地址:
1
2
3
4
5
6
7
8
// 客户端需要查询ZooKeeper,找到提供该服务的服务器地址
ZkClient zkCli;
zkCli.Start(); // 连接ZooKeeper服务器
std::string host_data = QueryServiceHost(&zkCli, service_name, method_name, m_idx); // 查询服务地址
m_ip = host_data.substr(0, m_idx); // 从查询结果中提取IP地址
std::cout << "ip: " << m_ip << std::endl;
m_port = atoi(host_data.substr(m_idx + 1, host_data.size() - m_idx).c_str()); // 从查询结果中提取端口号
std::cout << "port: " << m_port << std::endl;
  1. 连接服务器:
1
2
3
4
5
6
7
8
// 尝试连接服务器
auto rt = newConnect(m_ip.c_str(), m_port);
if (!rt) {
LOG(ERROR) << "connect server error"; // 连接失败,记录错误日志
return;
} else {
LOG(INFO) << "connect server success"; // 连接成功,记录日志
}
  1. 序列化请求参数:
1
2
3
4
5
6
7
8
9
10
// 将请求参数序列化为字符串,并计算其长度
uint32_t args_size{};
std::string args_str;
if (request->SerializeToString(&args_str)) { // 序列化请求参数
args_size = args_str.size(); // 获取序列化后的长度
} else {
controller->SetFailed("serialize request fail"); // 序列化失败,设置错误信息
return;
}

  1. 定义请求头部信息:
1
2
3
4
5
// 定义RPC请求的头部信息
Krpc::RpcHeader krpcheader;
krpcheader.set_service_name(service_name); // 设置服务名
krpcheader.set_method_name(method_name); // 设置方法名
krpcheader.set_args_size(args_size); // 设置参数长度
  1. 序列化头部信息并计算长度:
1
2
3
4
5
6
7
8
9
// 将RPC头部信息序列化为字符串,并计算其长度
uint32_t header_size = 0;
std::string rpc_header_str;
if (krpcheader.SerializeToString(&rpc_header_str)) { // 序列化头部信息
header_size = rpc_header_str.size(); // 获取序列化后的长度
} else {
controller->SetFailed("serialize rpc header error!"); // 序列化失败,设置错误信息
return;
}
  1. 拼接RPC请求报文:
1
2
3
4
5
6
7
8
std::string send_rpc_str;
{
google::protobuf::io::StringOutputStream string_output(&send_rpc_str);
google::protobuf::io::CodedOutputStream coded_output(&string_output);
coded_output.WriteVarint32(static_cast<uint32_t>(header_size)); // 写入头部长度
coded_output.WriteString(rpc_header_str); // 写入头部信息
}
send_rpc_str += args_str; // 拼接请求参数
  1. 发送请求到服务器:
1
2
3
4
5
6
7
8
// 发送RPC请求到服务器
if (-1 == send(m_clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)) {
close(m_clientfd); // 发送失败,关闭socket
char errtxt[512] = {};
std::cout << "send error: " << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl; // 打印错误信息
controller->SetFailed(errtxt); // 设置错误信息
return;
}

这里及后续代码为什么仅明显的写出了错误处理:在执行if判断的同时就会执行发送请求,等待响应,反序列化等操作,如果成功,则不会执行if内的语句,这体现了UNIX网络编程的典型模式“失败即异常,成功即默认”。

IO模型:阻塞

  1. 接收服务器响应:
1
2
3
4
5
6
7
8
9
// 接收服务器的响应
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(m_clientfd, recv_buf, 1024, 0))) {
char errtxt[512] = {};
std::cout << "recv error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl; // 打印错误信息
controller->SetFailed(errtxt); // 设置错误信息
return;
}

!! 粘包处理:通过长度前缀明确消息边界。

  1. 反序列化:
1
2
3
4
5
6
7
8
// 将接收到的响应数据反序列化为response对象
if (!response->ParseFromArray(recv_buf, recv_size)) {
close(m_clientfd); // 反序列化失败,关闭socket
char errtxt[512] = {};
std::cout << "parse error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl; // 打印错误信息
controller->SetFailed(errtxt); // 设置错误信息
return;
}
  1. 创建新的socket连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool KrpcChannel::newConnect(const char *ip, uint16_t port) {
// 创建socket
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd) {
char errtxt[512] = {0};
std::cout << "socket error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl; // 打印错误信息
LOG(ERROR) << "socket error:" << errtxt; // 记录错误日志
return false;
}

// 设置服务器地址信息
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET; // IPv4地址族
server_addr.sin_port = htons(port); // 端口号
server_addr.sin_addr.s_addr = inet_addr(ip); // IP地址

// 尝试连接服务器
if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr))) {
close(clientfd); // 连接失败,关闭socket
char errtxt[512] = {0};
std::cout << "connect error" << strerror_r(errno, errtxt, sizeof(errtxt)) << std::endl; // 打印错误信息
LOG(ERROR) << "connect server error" << errtxt; // 记录错误日志
return false;
}

m_clientfd = clientfd; // 保存socket文件描述符
return true;
}

客户端发现:

节点注册规范:

服务端注册路径:/$ {service_name}/ ${method_name}

节点数据格式:IP:Port

临时节点:利用ZooKeeper的临时节点特性实现服务下线自动清除

  1. 加分布式锁获取数据
1
2
3
4
5
6
7
std::string KrpcChannel::QueryServiceHost(ZkClient *zkclient, std::string service_name, std::string method_name, int &idx) {
std::string method_path = "/" + service_name + "/" + method_name; // 构造ZooKeeper路径
std::cout << "method_path: " << method_path << std::endl;

std::unique_lock<std::mutex> lock(g_data_mutx); // 加锁,保证线程安全
std::string host_data_1 = zkclient->GetData(method_path.c_str()); // 从ZooKeeper获取数据
lock.unlock(); // 解锁
  1. 错误处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
if (host_data_1 == "") {  // 如果未找到服务地址
LOG(ERROR) << method_path + " is not exist!"; // 记录错误日志
return " ";
}

idx = host_data_1.find(":"); // 查找IP和端口的分隔符
if (idx == -1) { // 如果分隔符不存在
LOG(ERROR) << method_path + " address is invalid!"; // 记录错误日志
return " ";
}

return host_data_1; // 返回服务地址
}
  1. 延迟连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
// 构造函数,支持延迟连接
KrpcChannel::KrpcChannel(bool connectNow) : m_clientfd(-1), m_idx(0) {
if (!connectNow) { // 如果不需要立即连接
return;
}

// 尝试连接服务器,最多重试3次
auto rt = newConnect(m_ip.c_str(), m_port);
int count = 3; // 重试次数
while (!rt && count--) {
rt = newConnect(m_ip.c_str(), m_port);
}
}

查询失败时会进行重试,此处重试次数设为3次。

可优化点:

  1. 增加连接池管理
  2. 添加超时控制

Krpcprovider:

1
void KrpcProvider::NotifyService(google::protobuf::Service *service) {

NotifyService函数:

功能:将Protobuf生成的服务类注册到RPC框架

调用:在Run()之前调用

服务描述符获取:

1
2
3
// 通过动态多态调用 service->GetDescriptor(),
// GetDescriptor() 方法会返回 protobuf 生成的服务类的描述信息(ServiceDescriptor)。
const google::protobuf::ServiceDescriptor *psd = service->GetDescriptor();

通过protobuf反射机制获取服务的元信息。

ServiceDsecipter包含服务名,方法列表,各方法的输入输出类型。

1
2
3
4
// 获取服务的名字
std::string service_name = psd->name();
// 获取服务端对象service的方法数量
int method_count = psd->method_count();

获取服务名及方法数量。

方法遍历注册:

1
2
3
4
5
6
7
8
// 遍历服务中的所有方法,并注册到服务信息中
for (int i = 0; i < method_count; ++i) {
// 获取服务中的方法描述
const google::protobuf::MethodDescriptor *pmd = psd->method(i);
std::string method_name = pmd->name();
std::cout << "method_name=" << method_name << std::endl;
service_info.method_map.emplace(method_name, pmd); // 将方法名和方法描述符存入map
}

服务信息存储:

1
2
3
 service_info.service = service;  // 保存服务对象
service_map.emplace(service_name, service_info); // 将服务信息存入服务map
}

建立服务名->方法名->方法实现的二级映射。

设计细节:

  1. 利用Protobuf原生反射API,避免手动维护服务列表。
  2. 标准化接口:所有服务统一通过google::protobuf::Service基类操作。
  3. 在服务启动前完成方法存在性检查。

Run函数:

函数功能:

  1. 启动RPC服务端网络监听
  2. 注册服务到ZooKeeper
  3. 进入事件循环

配置读取:

1
2
3
4
5
6
// 读取配置文件中的RPC服务器IP和端口
std::string ip = KrpcApplication::GetInstance().GetConfig().Load("rpcserverip");
int port = atoi(KrpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());

// 使用muduo网络库,创建地址对象
muduo::net::InetAddress address(ip, port);

依赖Krpcapplication部分从全局配置读取IP/端口。

网络服务初始化:

1
2
3
4
5
6
7
8
std::shared_ptr<muduo::net::TcpServer> server = std::make_shared<muduo::net::TcpServer>(&event_loop, address, "KrpcProvider");

// 绑定连接回调和消息回调,分离网络连接业务和消息处理业务
server->setConnectionCallback(std::bind(&KrpcProvider::OnConnection, this, std::placeholders::_1));
server->setMessageCallback(std::bind(&KrpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// 设置muduo库的线程数量
server->setThreadNum(4);

使用shared_ptr管理TCP服务对象生命周期

shared_ptr优势:

  1. 多线程环境中自动引用计数保证生命周期。
  2. 异步回调场景中通过shared_from_this延长生命周期。
  3. 异常安全场景中RALL自动释放。

回调绑定分离连接处理与消息处理。

ZooKeeper服务注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 将当前RPC节点上要发布的服务全部注册到ZooKeeper上,让RPC客户端可以在ZooKeeper上发现服务
ZkClient zkclient;
zkclient.Start(); // 连接ZooKeeper服务器
// service_name为永久节点,method_name为临时节点
for (auto &sp : service_map) {
// service_name 在ZooKeeper中的目录是"/"+service_name
std::string service_path = "/" + sp.first;
zkclient.Create(service_path.c_str(), nullptr, 0); // 创建服务节点
for (auto &mp : sp.second.method_map) {
std::string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port); // 将IP和端口信息存入节点数据
// ZOO_EPHEMERAL表示这个节点是临时节点,在客户端断开连接后,ZooKeeper会自动删除这个节点
zkclient.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}

这里服务名称节点为永久节点,而方法节点为临时节点,随服务进程退出自动删除。

事件循环:

1
2
3
4
5
std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;

// 启动网络服务
server->start();
event_loop.loop(); // 进入事件循环

start()立即返回,为非阻塞启动服务。loop()进入事件循环。

OnMessage()函数:

函数功能:

  1. 解析RPC请求报文。
  2. 路由到对应的服务方法。
  3. 动态调用并返回响应。

数据接收与初步处理:

1
std::string recv_buf = buffer->retrieveAllAsString();//清空缓冲区并获取数据。

协议头解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 使用protobuf的CodedInputStream反序列化RPC请求
google::protobuf::io::ArrayInputStream raw_input(recv_buf.data(), recv_buf.size());
google::protobuf::io::CodedInputStream coded_input(&raw_input);

uint32_t header_size{};
coded_input.ReadVarint32(&header_size); // 解析header_size

// 根据header_size读取数据头的原始字符流,反序列化数据,得到RPC请求的详细信息
std::string rpc_header_str;
Krpc::RpcHeader krpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size{};

// 设置读取限制
google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size);
coded_input.ReadString(&rpc_header_str, header_size);
// 恢复之前的限制,以便安全地继续读取其他数据
coded_input.PopLimit(msg_limit);

结构:接收数据 $\rightarrow$ varint32头部长度 $\rightarrow$ 头部数据 $\rightarrow$ 业务参数

安全设置:PushLimit确保不会读取超过header_size的数据,防止恶意构造的超大长度导致OOM。

服务方法路由:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
auto it = service_map.find(service_name);
if (it == service_map.end()) {
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto mit = it->second.method_map.find(method_name);
if (mit == it->second.method_map.end()) {
std::cout << service_name << "." << method_name << " is not exist!" << std::endl;
return;
}

google::protobuf::Service *service = it->second.service; // 获取服务对象
const google::protobuf::MethodDescriptor *method = mit->second; // 获取方法对象

通过service_name查找服务对象,通过method_name查找方法描述符。

动态调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
google::protobuf::Service *service = it->second.service;  // 获取服务对象
const google::protobuf::MethodDescriptor *method = mit->second; // 获取方法对象

// 生成RPC方法调用请求的request和响应的response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New(); // 动态创建请求对象
if (!request->ParseFromString(args_str)) {
std::cout << service_name << "." << method_name << " parse error!" << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New(); // 动态创建响应对象

// 绑定回调函数,用于在方法调用完成后发送响应
google::protobuf::Closure *done = google::protobuf::NewCallback<KrpcProvider,
const muduo::net::TcpConnectionPtr &,
google::protobuf::Message *>(this,
&KrpcProvider::SendRpcResponse,
conn, response);

// 在框架上根据远端RPC请求,调用当前RPC节点上发布的方法
service->CallMethod(method, nullptr, request, response, done); // 调用服务方法
}
  1. GetRequestPrototype动态创建参数对象

  2. CallMethod通过方法描述符触发实际调用

  3. 使用NewCallback()确保响应发送时对象存活。

松耦合设计保证业务逻辑与网络层完全隔离。

SendRpcResponse()函数:

1
2
3
4
5
6
7
8
9
10
11
// 发送RPC响应给客户端
void KrpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response) {
std::string response_str;
if (response->SerializeToString(&response_str)) {
// 序列化成功,通过网络把RPC方法执行的结果返回给RPC调用方
conn->send(response_str);
} else {
std::cout << "serialize error!" << std::endl;
}
// conn->shutdown(); // 模拟HTTP短链接,由RpcProvider主动断开连接
}

函数功能:将RPC调用的结果序列化并发送回客户端。

调用时机:在服务方法执行完成后,通过Protobuf的Closure机制触发。

参数 类型 作用
conn muduo::net::TcpConnectionPtr 代表客户端连接的智能指针
response google::protobuf::Message* 动态生成的响应消息对象

可优化点:

  1. 使用writev合并头部与参数的发送。
  2. 双缓冲:预分配内存避免频繁申请释放。
  3. 批量发送

zookeeperutil:

函数功能:

  1. 将RPC服务节点信息写入ZooKeeper
  2. 查询可用服务节点地址
  3. 维护与ZooKeeper集群的会话

global_watcher函数:

1
void global_watcher(zhandle_t *zh, int type, int status, const char *path, void *watcherCtx);
参数 类型 作用
zh zhandle_t* ZooKeeper客户端句柄,标识触发事件的连接
type int 事件类型(如会话事件、节点变更事件等)
status int 事件状态(如连接成功、认证失败等)
path const char* 触发事件的节点路径(对会话事件为NULL)
watcherCtx void* 用户自定义上下文,初始化时通过zookeeper_init传入
1
2
3
4
5
6
7
8
9
void global_watcher(zhandle_t *zh, int type, int status, const char *path, void *watcherCtx) {
if (type == ZOO_SESSION_EVENT) { // 回调消息类型和会话相关的事件
if (status == ZOO_CONNECTED_STATE) { // ZooKeeper客户端和服务器连接成功
std::lock_guard<std::mutex> lock(cv_mutex); // 加锁保护
is_connected = true; // 标记连接成功
}
}
cv.notify_all(); // 通知所有等待的线程
}
  1. 仅关注连接成功事件,忽略其他事件。
  2. 通过互斥锁保护is_connected标志。
  3. 使用条件变量实现异步回调转同步等待。

与ZooKeeper客户端的关系

  1. 每个zhandle_t实例只能有一个全局watcher。
  2. 生命周期与客户端句柄绑定,在zookeeper_close时失效。

Start函数:

函数功能:主要完成ZooKeeper客户端连接建立和会话管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 从配置文件中读取ZooKeeper服务器的IP和端口
std::string host = KrpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = KrpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port; // 拼接连接字符串

/*
zookeeper_mt:多线程版本
ZooKeeper的API客户端程序提供了三个线程:
1. API调用线程
2. 网络I/O线程(使用pthread_create和poll)
3. watcher回调线程(使用pthread_create)
*/

// 使用zookeeper_init初始化一个ZooKeeper客户端对象,异步建立与服务器的连接
m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 6000, nullptr, nullptr, 0);
if (nullptr == m_zhandle) { // 初始化失败
LOG(ERROR) << "zookeeper_init error";
exit(EXIT_FAILURE); // 退出程序
}

// 等待连接成功
std::unique_lock<std::mutex> lock(cv_mutex);
cv.wait(lock, [] { return is_connected; }); // 阻塞等待,直到连接成功
LOG(INFO) << "zookeeper_init success"; // 记录日志,表示连接成功

这里注释写的比较全面,不过多解释。

关键设计:

  1. 异步转同步机制。
  2. 多线程安全实现:
    1. is_connected变量通过mutex保护。
    2. cv.wait()保证仅在连接成功后继续执行。

Create函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 创建ZooKeeper节点
void ZkClient::Create(const char *path, const char *data, int datalen, int state) {
char path_buffer[128]; // 用于存储创建的节点路径
int bufferlen = sizeof(path_buffer);

// 检查节点是否已经存在
int flag = zoo_exists(m_zhandle, path, 0, nullptr);
if (flag == ZNONODE) { // 如果节点不存在
// 创建指定的ZooKeeper节点
flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if (flag == ZOK) { // 创建成功
LOG(INFO) << "znode create success... path:" << path;
} else { // 创建失败
LOG(ERROR) << "znode create failed... path:" << path;
exit(EXIT_FAILURE); // 退出程序
}
}
}

部分要点:

  1. zoo_exists是同步阻塞调用
  2. 服务注册使用ZOO_EPHEMERAL,客户端断开自动清除。

GetData函数:

函数功能:从ZooKeeper节点读取数据,实现服务发现能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::string ZkClient::GetData(const char *path) {
char buf[64]; // 用于存储节点数据
int bufferlen = sizeof(buf);

// 获取指定节点的数据
int flag = zoo_get(m_zhandle, path, 0, buf, &bufferlen, nullptr);
if (flag != ZOK) { // 获取失败
LOG(ERROR) << "zoo_get error";
return ""; // 返回空字符串
} else { // 获取成功
return buf; // 返回节点数据
}
return ""; // 默认返回空字符串
}

可优化点:

  1. 为service_map加锁,保证多线程注册服务不冲突
  2. 检查service指针的有效性
  3. 对象池复用
  4. 零拷贝分析
  5. 异步日志队列

KrpcControler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 构造函数,初始化控制器状态
Krpccontroller::Krpccontroller() {
m_failed = false; // 初始状态为未失败
m_errText = ""; // 错误信息初始为空
}

// 重置控制器状态,将失败标志和错误信息清空
void Krpccontroller::Reset() {
m_failed = false; // 重置失败标志
m_errText = ""; // 清空错误信息
}

// 判断当前RPC调用是否失败
bool Krpccontroller::Failed() const {
return m_failed; // 返回失败标志
}

// 获取错误信息
std::string Krpccontroller::ErrorText() const {
return m_errText; // 返回错误信息
}

// 设置RPC调用失败,并记录失败原因
void Krpccontroller::SetFailed(const std::string &reason) {
m_failed = true; // 设置失败标志
m_errText = reason; // 记录失败原因
}

项目设计思路与结构总结:

分层架构:

1
2
3
4
5
6
7
8
9
+-----------------------+
| 业务逻辑层 | (用户定义的Service实现)
+-----------------------+
| RPC框架层 | (KrpcProvider/KrpcChannel)
+-----------------------+
| 网络通信与序列化层 | (Muduo + Protobuf)
+-----------------------+
| 服务发现与协调层 | (ZooKeeper)
+-----------------------+

核心组件交互:

1
2
3
4
5
Client端:
[业务调用] → [KrpcChannel] → [序列化] → [网络传输] → [ZooKeeper服务发现]

Server端: ↓
[网络接收] → [反序列化] → [KrpcProvider] → [业务实现] ← [ZooKeeper服务注册]
类名 主要职责
KrpcApplication 框架入口,配置管理,单例模式保证全局访问
KrpcProvider 服务端核心,注册服务,处理RPC请求路由
KrpcChannel 客户端核心,管理连接,序列化请求,发送RPC调用
ZkClient ZooKeeper客户端封装,处理服务注册与发现
KrpcController RPC调用控制,错误处理
Krpcconfig 配置文件解析,支持key-value格式配置
KrpcLogger 基于Glog的日志系统,提供不同级别日志接口

关键流程设计

1. 服务启动流程

  1. 加载配置(IP/Port/ZK地址等)
  2. 注册服务到ServiceMap
  3. 连接ZooKeeper集群
  4. 将服务方法注册为ZK节点
  5. 启动Muduo网络服务

2. RPC调用流程

客户端:

  1. 通过ZK查询服务地址
  2. 建立TCP连接
  3. 序列化请求(header+args)
  4. 发送请求并等待响应
  5. 反序列化响应

服务端:

  1. 接收并解析请求头
  2. 从ServiceMap查找对应服务方法
  3. 反序列化请求参数
  4. 通过CallMethod动态调用
  5. 序列化响应并返回

3. 错误处理流程

  • 通过KrpcController记录错误状态
  • 错误类型包括:
    • 序列化/反序列化失败
    • 服务/方法不存在
    • 网络通信错误
    • ZooKeeper操作失败