gRPC 客户端连接的建立
这篇分析是基于 grpc-go 1.48 这个版本来的
客户端的基本结构
首先根据 Example 目录下简单的创建一个客户端
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
//建立一个链接 insecure.NewCredentials() 参数是在链接 https 服务端时不用检查服务端的证书
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
从最开始的建立连接开始:
位于 clientconn.go 里面的 DialContext,下面逐行理解
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)
为什么需要连接管理?
为了让连接变得更可靠和高效,gRPC 需要对连接进行管理。
考虑这样的一种情景,由于公司规模的扩大、流量的增加,gRPC 的服务端由单机扩展成了一个集群。这个时候,我们的客户端需要调用服务端中的某一个方法,那么这个客户端需要向哪台机器建立连接,发送数据呢?
如果我们把这个问题划分的更具体,那么可以需要解决的问题如下:
- 假设现在这个集群里面有很多台机器,那么我们该怎么告知客户端每台服务端机器的
ip:port呢? - 假设我们新增或减 少了一些 gRPC 的服务端,客户端该怎么更新它所维护的
ip:port列表呢? - 假设客户端当前请求的服务端,存在了多个
ip:port,那么这个客户端该向哪个连接发送数据呢?
这几个问题可以归结为,gRPC如何解决服务注册、服务发现、负载均衡的问题。
然而,gRPC 并没有提供诸如 Spring Cloud、Dubbo 等框架的服务注册、服务发现的功能。gRPC 这么做的原因大概是为了能够提供更灵活的服务发现和负载均衡功能。
ClientConn 结构体
首先创建一个 ClientConn 的结构体
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
// 负载均衡选择
parsedTarget resolver.Target
authority string
// 初始化可设置选项,在每一次请求会带上,看 call.go 中的 combine 方法
dopts dialOptions
csMgr *connectivityStateManager // 连接状态维护
balancerBuildOpts balancer.BuildOptions // 忽略
// 负载均衡设置
blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector // 忽略
mu sync.RWMutex
// 实现了resolver.ClientConn,位于./resolver/resolver.go中,ClientConn的上层包装器
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
// 存放连接的地方
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper // 负载均衡器上的包装器
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
开始初始化的时候
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
picker 选择器
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
}
这个 pickerWrapper 是对 balancer.Picker 的一层封装,balancer.Picker 其实是一个负载均衡器,它里面只有一个 Pick 方法,它返回一个 PickResult 结构,包含 SubConn 连接。
// PickResult 包含与为 RPC 选择的连接相关的信息。
type PickResult struct {
// SubConn 表示到 gRPC 后端服务的单个连接。
SubConn SubConn
Done func(DoneInfo)
}
client 发起一个 rpc 调用之前,需要通过 balancer 去找到一个 server 的 address,balancer 的 Picker 类返回一个 SubConn
SubConn 里面包含了多个 server 的 address,假如返回的 SubConn 是 READY 状态,grpc 会发送 RPC 请求,否则则会阻塞,等待 UpdateState 这个方法更新连接的状态并且通过 picker 获取一个新的 SubConn 连接。
连接状态管理器
其中这个 connectivityStateManager 表示连接的状态
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
}
连接的状态管理器,每个连接具有 IDLE、CONNECTING、READY、TRANSIENT_FAILURE、SHUTDOWN、Invalid-State 这几种状态。