跳到主要内容

Etcd 是什么

基础概念类面试题

1. 什么是 Etcd?请描述它的核心特性和应用场景

参考答案: Etcd 是一个分布式键值存储系统,由 CoreOS 开发,现在是 CNCF 项目。它被设计用于可靠地存储和检索关键数据,支持分布式系统的协调和一致性。

核心特性:

  1. 分布式一致性:使用 Raft 算法保证数据一致性
  2. 高可用性:通过多节点复制提供容错能力
  3. 快速响应:基于内存的数据结构,提供高性能读写
  4. 监听与通知:提供 Watch 机制,实时监听数据变化
  5. 强一致性:保证线性化读写操作

应用场景:

  • Kubernetes 集群状态存储
  • 服务发现与注册
  • 分布式锁
  • 配置管理
  • 领导选举
  • 消息发布与订阅

2. Etcd 与 Redis、Zookeeper 的区别是什么?

参考答案:

特性EtcdRedisZookeeper
一致性算法Raft主从复制ZAB
数据模型键值存储多数据结构树形节点
持久化WAL + SnapshotRDB + AOF事务日志
性能中等,强一致性高,最终一致性中等,强一致性
使用场景配置存储,服务发现缓存,会话存储配置管理,协调服务

Raft 算法与一致性面试题

3. 详细解释 Etcd 中的 Raft 算法实现机制

参考答案:

Raft 算法是 Etcd 实现分布式一致性的核心。它将一致性问题分解为:领导选举、日志复制、安全性保证。

节点状态转换:

  • Follower(跟随者):被动接收 Leader 的日志条目和心跳
  • Candidate(候选人):发起选举,争取成为 Leader
  • Leader(领导者):处理客户端请求,向 Follower 复制日志

关键机制:

  1. Term(任期):逻辑时钟,每次选举递增
  2. 心跳机制:Leader 定期发送心跳维持权威
  3. 日志复制:Leader 将操作复制到大多数节点
  4. 选举限制:只有拥有最新日志的节点才能成为 Leader

4. 描述 Etcd 的领导选举过程,包括异常情况处理

任期是怎么来的?

  • 初始值:所有节点在集群启动时 term = 0。
  • 递增方式:当 Follower 超时发起选举时,就会 local term + 1,并带着这个 term 向别的节点请求投票。
  • 更新规则:
    • 节点收到的 RPC 中的 term 更大,则立刻更新自己的 term,并转为 Follower。
    • 如果收到比自己 term 小的请求,则拒绝。

下面是完整的选举流程图

各个 Follower 的 选举超时 (Election Timeout) 是独立计时的,并且设置为一个范围内的随机值(例如 150ms–300ms)。

投票机制详解

投票规则(RequestVote RPC)

候选人发送的投票请求包含:

type RequestVoteRequest struct {
Term int64 // 候选人的任期号
CandidateID string // 候选人ID
LastLogIndex int64 // 候选人最后日志条目的索引
LastLogTerm int64 // 候选人最后日志条目的任期号
}

投票者的决策逻辑:

func (node *Node) handleVoteRequest(req RequestVoteRequest) VoteResponse {
// 1. 任期检查
if req.Term < node.currentTerm {
return VoteResponse{Term: node.currentTerm,
VoteGranted: false}
}

// 2. 更新任期
if req.Term > node.currentTerm {
node.currentTerm = req.Term
node.votedFor = ""
node.state = FOLLOWER
}

// 3. 投票限制检查
if (node.votedFor == "" || node.votedFor == req.CandidateID) &&
isLogUpToDate(req.LastLogIndex, req.LastLogTerm) {
node.votedFor = req.CandidateID
return VoteResponse{Term: node.currentTerm, VoteGranted: true}
}

return VoteResponse{Term: node.currentTerm, VoteGranted: false}
}

投票约束条件:

  1. 每个任期只能投一票:防止选出多个Leader
  2. 日志完整性检查:候选人的日志必须至少和投票者一样新
  3. 任期单调性:只接受更高或相等任期的请求

超时机制详解,为什么会有超时?

1. 选举超时(Election Timeout)

// 选举超时配置
const (
ElectionTimeoutMin = 150 * time.Millisecond
ElectionTimeoutMax = 300 * time.Millisecond
)

// 随机化选举超时
func (node *Node) resetElectionTimeout() {
timeout := ElectionTimeoutMin +
time.Duration(rand.Intn(int(ElectionTimeoutMax-ElectionTimeoutMin)))
node.electionTimer.Reset(timeout)
}

超时原因和处理:

  • Leader故障:Follower未收到心跳 → 发起选举
  • 网络延迟:消息传输慢 → 重新选举
  • 候选人冲突:多个候选人分票 → 超时后重试

2. 投票收集超时

func (candidate *Node) collectVotes() {
votes := 1 // 投票给自己
responses := make(chan VoteResponse, len(candidate.peers))

// 发送投票请求
for _, peer := range candidate.peers {
go func(peer Peer) {
resp := peer.RequestVote(candidate.buildVoteRequest())
responses <- resp
}(peer)
}

// 收集投票,设置超时
timeout := time.After(ElectionTimeoutMin)
for votes <= len(candidate.peers)/2 {
select {
case resp := <-responses:
if resp.VoteGranted {
votes++
}
case <-timeout:
// 投票收集超时,重新发起选举
return false
}
}
return true
}

网络分区场景

处理机制:

// 多数派要求
func (node *Node) hasMajority(votes int) bool {
return votes > len(node.cluster)/2
}

// 分区A:3个节点,可以选出Leader
// 分区B:2个节点,无法选出Leader(2 ≤ 5/2)

5. Etcd 如何处理脑裂问题?为什么需要奇数个节点?

参考答案:

脑裂解决机制:

  1. 多数派原则:只有获得超过半数节点同意才能成为 Leader
  2. 任期机制:更高的 term 会使低 term 的 Leader 自动退位
  3. 写操作保护:写操作必须复制到多数节点才能提交

为什么需要奇数节点:

节点数最大容错数最小可用数备注
312推荐最小配置
413与3节点容错能力相同,浪费资源
523推荐生产配置
624与5节点容错能力相同,浪费资源

奇数节点优势:

  • 避免平票情况
  • 最大化资源利用率
  • 简化多数派判断逻辑

5.1 不会同时发起选举吗?

确实 有可能多个节点同时发起选举。这是 Raft 算法里必须解决的一种情况。

为什么可能会同时发起选举?

  • 各个 Follower 的 选举超时 (Election Timeout) 是独立计时的,并且设置为一个范围内的随机值(例如 150ms–300ms)。
  • 如果两个或多个节点的超时时间刚好差不多同时到,就会“几乎同时”都转为 Candidate,并各自递增 term,向其他节点发起投票。

所以下面是脑裂预防的方法:

1. 多数派原则

type Cluster struct {
nodes []Node
quorumSize int // (n/2) + 1
}

func (c *Cluster) canElectLeader(availableNodes int) bool {
return availableNodes >= c.quorumSize
}

2. 任期单调递增

func (node *Node) handleAppendEntries(req AppendEntriesRequest) {
if req.Term < node.currentTerm {
// 拒绝来自旧任期Leader的请求
return AppendEntriesResponse{Success: false, Term: node.currentTerm}
}

if req.Term > node.currentTerm {
// 发现更高任期,立即转为Follower
node.currentTerm = req.Term
node.state = FOLLOWER
node.votedFor = ""
}
}

选举冲突解决

1. 随机化超时

// 避免同时发起选举
func randomElectionTimeout() time.Duration {
base := 150 * time.Millisecond
jitter := time.Duration(rand.Intn(150)) * time.Millisecond
return base + jitter
}

2. 快速失败和重试

func (node *Node) startElection() {
node.currentTerm++
node.state = CANDIDATE
node.votedFor = node.id

if !node.collectVotes() {
// 选举失败,快速转回Follower
node.state = FOLLOWER
node.resetElectionTimeout()
}
}

关键时间参数

const (
HeartbeatInterval = 50 * time.Millisecond // 心跳间隔
ElectionTimeoutMin = 150 * time.Millisecond // 最小选举超时
ElectionTimeoutMax = 300 * time.Millisecond // 最大选举超时
RequestTimeout = 100 * time.Millisecond // RPC请求超时
)

// 关系:HeartbeatInterval << ElectionTimeout
// 确保正常情况下不会触发不必要的选举

这样的设计确保了 Etcd 在各种网络异常情况下都能正确处理领导选举,维护集群的一致性和可用性。

高可用性与故障恢复面试题

6. Etcd 如何实现高可用性?描述故障检测和恢复机制

参考答案:

高可用性实现机制:

  1. 数据复制

    • 同步复制到多数节点
    • WAL(Write-Ahead Log)确保持久性
    • 快照机制压缩日志
  2. 故障检测

    • 心跳超时检测(通常150ms-300ms)
    • 网络分区检测
    • 节点健康检查
  3. 自动故障恢复

    • 自动重新选举 Leader
    • 数据自动同步
    • 集群成员变更
  4. 数据一致性保证

    • 只有提交到多数节点的数据才对外可见
    • Leader 故障时未提交的数据会被丢弃
    • 新 Leader 拥有所有已提交的数据

7. 在 Go 中如何正确使用 Etcd 客户端处理连接失败和重试?

参考答案:

package main

import (
"context"
"fmt"
"time"

"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type EtcdClient struct {
client *clientv3.Client
config clientv3.Config
}

func NewEtcdClient(endpoints []string) (*EtcdClient, error) {
config := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
// 自动重试配置
AutoSyncInterval: 30 * time.Second,
// 健康检查
PermitWithoutStream: true,
// 日志配置
LogConfig: &zap.Config{
Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
},
}

client, err := clientv3.New(config)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %v", err)
}

return &EtcdClient{
client: client,
config: config,
}, nil
}

// 带重试的写操作
func (ec *EtcdClient) PutWithRetry(ctx context.Context, key, value string, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
// 设置操作超时
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)

_, err := ec.client.Put(opCtx, key, value)
cancel()

if err == nil {
return nil
}

// 检查错误类型
if isRetriableError(err) && i < maxRetries-1 {
// 指数退避
backoff := time.Duration(1<<uint(i)) * time.Second
time.Sleep(backoff)
continue
}

return fmt.Errorf("put operation failed after %d retries: %v", i+1, err)
}

return fmt.Errorf("max retries exceeded")
}

// 带重试的读操作
func (ec *EtcdClient) GetWithRetry(ctx context.Context, key string, maxRetries int) (*clientv3.GetResponse, error) {
for i := 0; i < maxRetries; i++ {
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)

resp, err := ec.client.Get(opCtx, key)
cancel()

if err == nil {
return resp, nil
}

if isRetriableError(err) && i < maxRetries-1 {
backoff := time.Duration(1<<uint(i)) * time.Second
time.Sleep(backoff)
continue
}

return nil, fmt.Errorf("get operation failed after %d retries: %v", i+1, err)
}

return nil, fmt.Errorf("max retries exceeded")
}

// 判断是否为可重试的错误
func isRetriableError(err error) bool {
// 网络错误、超时错误等通常是可重试的
if err == context.DeadlineExceeded ||
err == context.Canceled {
return true
}

// 可以根据具体错误信息进行判断
errStr := err.Error()
return contains(errStr, "connection refused") ||
contains(errStr, "no available endpoints") ||
contains(errStr, "context deadline exceeded")
}

// 连接健康检查
func (ec *EtcdClient) HealthCheck(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

_, err := ec.client.Status(ctx, ec.config.Endpoints[0])
return err
}

// 优雅关闭
func (ec *EtcdClient) Close() error {
return ec.client.Close()
}

最佳实践:

  1. 连接配置

    • 设置合理的超时时间
    • 配置多个 endpoints
    • 启用自动同步
  2. 错误处理

    • 区分可重试和不可重试错误
    • 实现指数退避算法
    • 设置最大重试次数
  3. 上下文管理

    • 为每个操作设置超时
    • 正确处理取消信号
    • 避免 goroutine 泄露

性能优化与最佳实践面试题

8. Etcd 的性能瓶颈在哪里?如何进行性能优化?

参考答案:

主要性能瓶颈:

  1. 磁盘 I/O

    • WAL 写入是同步操作
    • 快照生成影响性能
    • 磁盘延迟直接影响写入性能
  2. 网络延迟

    • Raft 共识需要网络通信
    • 跨数据中心部署延迟高
    • 大量小请求的网络开销
  3. 内存使用

    • 大量 key 占用内存
    • Watch 连接消耗内存
    • 历史版本数据积累

性能优化策略:

// 1. 批量操作优化
func (ec *EtcdClient) BatchPut(ctx context.Context, kvs map[string]string) error {
// 使用事务批量写入
txn := ec.client.Txn(ctx)

ops := make([]clientv3.Op, 0, len(kvs))
for k, v := range kvs {
ops = append(ops, clientv3.OpPut(k, v))
}

_, err := txn.Then(ops...).Commit()
return err
}

// 2. 连接池管理
type EtcdPool struct {
clients []*clientv3.Client
current int64
mu sync.RWMutex
}

func (p *EtcdPool) GetClient() *clientv3.Client {
p.mu.RLock()
defer p.mu.RUnlock()

idx := atomic.AddInt64(&p.current, 1) % int64(len(p.clients))
return p.clients[idx]
}

// 3. 预取和缓存
type EtcdCache struct {
client *clientv3.Client
cache sync.Map
ttl time.Duration
}

type CacheItem struct {
Value string
ExpiresAt time.Time
}

func (c *EtcdCache) Get(ctx context.Context, key string) (string, error) {
// 先查缓存
if item, ok := c.cache.Load(key); ok {
cacheItem := item.(*CacheItem)
if time.Now().Before(cacheItem.ExpiresAt) {
return cacheItem.Value, nil
}
c.cache.Delete(key)
}

// 缓存未命中,从 etcd 读取
resp, err := c.client.Get(ctx, key)
if err != nil {
return "", err
}

if len(resp.Kvs) == 0 {
return "", nil
}

value := string(resp.Kvs[0].Value)

// 更新缓存
c.cache.Store(key, &CacheItem{
Value: value,
ExpiresAt: time.Now().Add(c.ttl),
})

return value, nil
}

系统级优化:

  1. 硬件优化

    • 使用 SSD 磁盘
    • 独立的 WAL 磁盘
    • 充足的内存
  2. 网络优化

    • 低延迟网络
    • 就近部署
    • 减少网络跳数
  3. 配置优化

# etcd 配置示例
# 心跳间隔
heartbeat-interval: 100
# 选举超时
election-timeout: 1000
# 快照计数
snapshot-count: 100000
# 最大请求大小
max-request-bytes: 1572864
# 配额后端大小
quota-backend-bytes: 8589934592

9. 如何设计一个基于 Etcd 的服务发现系统?

参考答案:

完整实现代码:

package main

import (
"context"
"encoding/json"
"fmt"
"path"
"strings"
"sync"
"time"

"go.etcd.io/etcd/clientv3"
)

// ServiceInstance 服务实例信息
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
Health bool `json:"health"`
}

// ServiceDiscovery 服务发现接口
type ServiceDiscovery interface {
Register(ctx context.Context, instance *ServiceInstance) error
Deregister(ctx context.Context, serviceID string) error
Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
}

// EtcdServiceDiscovery 基于 Etcd 的服务发现实现
type EtcdServiceDiscovery struct {
client *clientv3.Client
keyPrefix string
leaseTTL int64
leases map[string]clientv3.LeaseID
leaseMutex sync.RWMutex
}

func NewEtcdServiceDiscovery(client *clientv3.Client, keyPrefix string, ttl int64) *EtcdServiceDiscovery {
return &EtcdServiceDiscovery{
client: client,
keyPrefix: keyPrefix,
leaseTTL: ttl,
leases: make(map[string]clientv3.LeaseID),
}
}

// Register 注册服务实例
func (esd *EtcdServiceDiscovery) Register(ctx context.Context, instance *ServiceInstance) error {
// 创建租约
lease, err := esd.client.Grant(ctx, esd.leaseTTL)
if err != nil {
return fmt.Errorf("failed to create lease: %v", err)
}

// 序列化服务实例信息
data, err := json.Marshal(instance)
if err != nil {
return fmt.Errorf("failed to marshal instance: %v", err)
}

// 构造 key
key := esd.buildServiceKey(instance.Name, instance.ID)

// 注册服务实例
_, err = esd.client.Put(ctx, key, string(data), clientv3.WithLease(lease.ID))
if err != nil {
// 如果注册失败,撤销租约
esd.client.Revoke(ctx, lease.ID)
return fmt.Errorf("failed to register service: %v", err)
}

// 保存租约信息
esd.leaseMutex.Lock()
esd.leases[instance.ID] = lease.ID
esd.leaseMutex.Unlock()

// 启动租约续期
go esd.keepAlive(ctx, lease.ID, instance.ID)

return nil
}

// keepAlive 租约续期
func (esd *EtcdServiceDiscovery) keepAlive(ctx context.Context, leaseID clientv3.LeaseID, serviceID string) {
ch, kaerr := esd.client.KeepAlive(ctx, leaseID)
if kaerr != nil {
fmt.Printf("Failed to keep alive lease for service %s: %v\n", serviceID, kaerr)
return
}

for {
select {
case ka, ok := <-ch:
if !ok {
fmt.Printf("Keep alive channel closed for service %s\n", serviceID)
return
}
// 可以在这里记录续期成功的日志
_ = ka
case <-ctx.Done():
return
}
}
}

// Deregister 注销服务实例
func (esd *EtcdServiceDiscovery) Deregister(ctx context.Context, serviceID string) error {
esd.leaseMutex.RLock()
leaseID, exists := esd.leases[serviceID]
esd.leaseMutex.RUnlock()

if !exists {
return fmt.Errorf("service %s not found", serviceID)
}

// 撤销租约,这会自动删除相关的 key
_, err := esd.client.Revoke(ctx, leaseID)
if err != nil {
return fmt.Errorf("failed to revoke lease: %v", err)
}

// 清理本地租约记录
esd.leaseMutex.Lock()
delete(esd.leases, serviceID)
esd.leaseMutex.Unlock()

return nil
}

// Discover 发现服务实例
func (esd *EtcdServiceDiscovery) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
prefix := esd.buildServicePrefix(serviceName)

resp, err := esd.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to discover services: %v", err)
}

var instances []*ServiceInstance
for _, kv := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(kv.Value, &instance); err != nil {
continue // 跳过无法解析的数据
}
instances = append(instances, &instance)
}

return instances, nil
}

// Watch 监听服务变化
func (esd *EtcdServiceDiscovery) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
prefix := esd.buildServicePrefix(serviceName)
resultChan := make(chan []*ServiceInstance, 1)

// 先获取当前的服务列表
instances, err := esd.Discover(ctx, serviceName)
if err != nil {
close(resultChan)
return nil, err
}

// 发送初始数据
select {
case resultChan <- instances:
case <-ctx.Done():
close(resultChan)
return nil, ctx.Err()
}

// 启动监听
go func() {
defer close(resultChan)

watchChan := esd.client.Watch(ctx, prefix, clientv3.WithPrefix())
for {
select {
case watchResp, ok := <-watchChan:
if !ok {
return
}

if watchResp.Err() != nil {
fmt.Printf("Watch error: %v\n", watchResp.Err())
return
}

// 重新获取服务列表
currentInstances, err := esd.Discover(ctx, serviceName)
if err != nil {
fmt.Printf("Failed to discover services during watch: %v\n", err)
continue
}

select {
case resultChan <- currentInstances:
case <-ctx.Done():
return
}

case <-ctx.Done():
return
}
}
}()

return resultChan, nil
}

// 构建服务 key
func (esd *EtcdServiceDiscovery) buildServiceKey(serviceName, serviceID string) string {
return path.Join(esd.keyPrefix, serviceName, serviceID)
}

// 构建服务前缀
func (esd *EtcdServiceDiscovery) buildServicePrefix(serviceName string) string {
return path.Join(esd.keyPrefix, serviceName) + "/"
}

// 负载均衡器
type LoadBalancer interface {
Select(instances []*ServiceInstance) *ServiceInstance
}

// 轮询负载均衡
type RoundRobinLoadBalancer struct {
counter int64
}

func (lb *RoundRobinLoadBalancer) Select(instances []*ServiceInstance) *ServiceInstance {
if len(instances) == 0 {
return nil
}

// 过滤健康的实例
healthyInstances := make([]*ServiceInstance, 0, len(instances))
for _, instance := range instances {
if instance.Health {
healthyInstances = append(healthyInstances, instance)
}
}

if len(healthyInstances) == 0 {
return nil
}

idx := atomic.AddInt64(&lb.counter, 1) % int64(len(healthyInstances))
return healthyInstances[idx]
}

// 使用示例
func main() {
// 创建 etcd 客户端
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer client.Close()

// 创建服务发现实例
sd := NewEtcdServiceDiscovery(client, "/services", 30)

ctx := context.Background()

// 注册服务
instance := &ServiceInstance{
ID: "user-service-1",
Name: "user-service",
Address: "192.168.1.100",
Port: 8080,
Health: true,
Metadata: map[string]string{
"version": "1.0.0",
"env": "production",
},
}

if err := sd.Register(ctx, instance); err != nil {
panic(err)
}

// 发现服务
instances, err := sd.Discover(ctx, "user-service")
if err != nil {
panic(err)
}

fmt.Printf("Discovered %d instances\n", len(instances))

// 监听服务变化
watchCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

watchChan, err := sd.Watch(watchCtx, "user-service")
if err != nil {
panic(err)
}

go func() {
for instances := range watchChan {
fmt.Printf("Service instances updated: %d instances\n", len(instances))
}
}()

// 模拟运行一段时间
time.Sleep(5 * time.Minute)

// 注销服务
if err := sd.Deregister(ctx, "user-service-1"); err != nil {
panic(err)
}
}

设计要点:

  1. 服务注册

    • 使用租约机制实现自动过期
    • 定期续期保持服务在线状态
    • 存储完整的服务元数据
  2. 服务发现

    • 支持前缀查询获取同类服务
    • 实时监听服务变化
    • 提供负载均衡能力
  3. 高可用设计

    • 租约续期失败自动重试
    • 客户端缓存减少查询延迟
    • 健康检查机制

10. 在微服务架构中,如何使用 Etcd 实现分布式锁?

参考答案:

package main

import (
"context"
"fmt"
"sync"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)

// DistributedLock 分布式锁接口
type DistributedLock interface {
Lock(ctx context.Context) error
TryLock(ctx context.Context) (bool, error)
Unlock(ctx context.Context) error
IsLocked() bool
}

// EtcdMutex 基于 Etcd 的分布式锁实现
type EtcdMutex struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
key string
ttl int
locked bool
lockMutex sync.RWMutex
}

func NewEtcdMutex(client *clientv3.Client, key string, ttl int) (*EtcdMutex, error) {
// 创建会话
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
}

// 创建互斥锁
mutex := concurrency.NewMutex(session, key)

return &EtcdMutex{
client: client,
session: session,
mutex: mutex,
key: key,
ttl: ttl,
locked: false,
}, nil
}

// Lock 加锁(阻塞)
func (em *EtcdMutex) Lock(ctx context.Context) error {
err := em.mutex.Lock(ctx)
if err != nil {
return fmt.Errorf("failed to acquire lock: %v", err)
}

em.lockMutex.Lock()
em.locked = true
em.lockMutex.Unlock()

return nil
}

// TryLock 尝试加锁(非阻塞)
func (em *EtcdMutex) TryLock(ctx context.Context) (bool, error) {
// 设置短超时时间实现非阻塞
tryCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

err := em.mutex.Lock(tryCtx)
if err != nil {
if err == context.DeadlineExceeded {
return false, nil // 获取锁失败,但不是错误
}
return false, fmt.Errorf("failed to try lock: %v", err)
}

em.lockMutex.Lock()
em.locked = true
em.lockMutex.Unlock()

return true, nil
}

// Unlock 释放锁
func (em *EtcdMutex) Unlock(ctx context.Context) error {
em.lockMutex.Lock()
defer em.lockMutex.Unlock()

if !em.locked {
return fmt.Errorf("lock is not held")
}

err := em.mutex.Unlock(ctx)
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}

em.locked = false
return nil
}

// IsLocked 检查是否已加锁
func (em *EtcdMutex) IsLocked() bool {
em.lockMutex.RLock()
defer em.lockMutex.RUnlock()
return em.locked
}

// Close 关闭锁和会话
func (em *EtcdMutex) Close() error {
if em.locked {
if err := em.Unlock(context.Background()); err != nil {
return err
}
}
return em.session.Close()
}

// 自定义分布式锁实现(基于原生 etcd 操作)
type CustomEtcdLock struct {
client *clientv3.Client
key string
leaseID clientv3.LeaseID
locked bool
lockMutex sync.RWMutex
}

func NewCustomEtcdLock(client *clientv3.Client, key string, ttl int64) *CustomEtcdLock {
return &CustomEtcdLock{
client: client,
key: key,
locked: false,
}
}

// Lock 加锁实现
func (cel *CustomEtcdLock) Lock(ctx context.Context, ttl int64) error {
// 创建租约
lease, err := cel.client.Grant(ctx, ttl)
if err != nil {
return fmt.Errorf("failed to create lease: %v", err)
}

cel.leaseID = lease.ID

// 启动租约续期
ch, kaerr := cel.client.KeepAlive(ctx, lease.ID)
if kaerr != nil {
cel.client.Revoke(ctx, lease.ID)
return fmt.Errorf("failed to keep alive lease: %v", kaerr)
}

// 消费续期响应,防止缓冲区满
go func() {
for range ch {
// 处理续期响应
}
}()

// 尝试获取锁
for {
// 使用事务确保原子性
txn := cel.client.Txn(ctx)

// 如果 key 不存在(锁未被持有),则创建 key
txnResp, err := txn.
If(clientv3.Compare(clientv3.CreateRevision(cel.key), "=", 0)).
Then(clientv3.OpPut(cel.key, "locked", clientv3.WithLease(lease.ID))).
Commit()

if err != nil {
cel.client.Revoke(ctx, lease.ID)
return fmt.Errorf("failed to execute transaction: %v", err)
}

if txnResp.Succeeded {
// 成功获取锁
cel.lockMutex.Lock()
cel.locked = true
cel.lockMutex.Unlock()
return nil
}

// 锁被其他客户端持有,等待锁释放
watchChan := cel.client.Watch(ctx, cel.key)
select {
case watchResp := <-watchChan:
if watchResp.Err() != nil {
cel.client.Revoke(ctx, lease.ID)
return fmt.Errorf("watch failed: %v", watchResp.Err())
}

// 检查是否有删除事件(锁释放)
for _, event := range watchResp.Events {
if event.Type == clientv3.EventTypeDelete {
break // 跳出 select,重新尝试获取锁
}
}

case <-ctx.Done():
cel.client.Revoke(ctx, lease.ID)
return ctx.Err()
}
}
}

// TryLock 非阻塞加锁
func (cel *CustomEtcdLock) TryLock(ctx context.Context, ttl int64) (bool, error) {
// 创建租约
lease, err := cel.client.Grant(ctx, ttl)
if err != nil {
return false, fmt.Errorf("failed to create lease: %v", err)
}

cel.leaseID = lease.ID

// 尝试获取锁
txn := cel.client.Txn(ctx)
txnResp, err := txn.
If(clientv3.Compare(clientv3.CreateRevision(cel.key), "=", 0)).
Then(clientv3.OpPut(cel.key, "locked", clientv3.WithLease(lease.ID))).
Commit()

if err != nil {
cel.client.Revoke(ctx, lease.ID)
return false, fmt.Errorf("failed to execute transaction: %v", err)
}

if txnResp.Succeeded {
// 成功获取锁
cel.lockMutex.Lock()
cel.locked = true
cel.lockMutex.Unlock()

// 启动租约续期
ch, kaerr := cel.client.KeepAlive(ctx, lease.ID)
if kaerr != nil {
cel.Unlock(ctx)
return false, fmt.Errorf("failed to keep alive lease: %v", kaerr)
}

go func() {
for range ch {
// 处理续期响应
}
}()

return true, nil
}

// 获取锁失败
cel.client.Revoke(ctx, lease.ID)
return false, nil
}

// Unlock 释放锁
func (cel *CustomEtcdLock) Unlock(ctx context.Context) error {
cel.lockMutex.Lock()
defer cel.lockMutex.Unlock()

if !cel.locked {
return fmt.Errorf("lock is not held")
}

// 撤销租约,自动删除 key
_, err := cel.client.Revoke(ctx, cel.leaseID)
if err != nil {
return fmt.Errorf("failed to revoke lease: %v", err)
}

cel.locked = false
cel.leaseID = 0

return nil
}

// IsLocked 检查是否已加锁
func (cel *CustomEtcdLock) IsLocked() bool {
cel.lockMutex.RLock()
defer cel.lockMutex.RUnlock()
return cel.locked
}

// 分布式锁管理器
type LockManager struct {
client *clientv3.Client
locks map[string]*EtcdMutex
mutex sync.RWMutex
}

func NewLockManager(client *clientv3.Client) *LockManager {
return &LockManager{
client: client,
locks: make(map[string]*EtcdMutex),
}
}

// GetLock 获取或创建锁
func (lm *LockManager) GetLock(key string, ttl int) (*EtcdMutex, error) {
lm.mutex.Lock()
defer lm.mutex.Unlock()

if lock, exists := lm.locks[key]; exists {
return lock, nil
}

lock, err := NewEtcdMutex(lm.client, key, ttl)
if err != nil {
return nil, err
}

lm.locks[key] = lock
return lock, nil
}

// ReleaseLock 释放锁
func (lm *LockManager) ReleaseLock(key string) error {
lm.mutex.Lock()
defer lm.mutex.Unlock()

if lock, exists := lm.locks[key]; exists {
err := lock.Close()
delete(lm.locks, key)
return err
}

return fmt.Errorf("lock %s not found", key)
}

// 使用示例
func main() {
// 创建 etcd 客户端
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer client.Close()

ctx := context.Background()

// 示例1:使用 concurrency 包的锁
lock1, err := NewEtcdMutex(client, "/locks/resource1", 30)
if err != nil {
panic(err)
}
defer lock1.Close()

fmt.Println("Trying to acquire lock...")
if err := lock1.Lock(ctx); err != nil {
panic(err)
}

fmt.Println("Lock acquired, doing critical work...")
time.Sleep(5 * time.Second)

if err := lock1.Unlock(ctx); err != nil {
panic(err)
}
fmt.Println("Lock released")

// 示例2:使用自定义锁
customLock := NewCustomEtcdLock(client, "/locks/resource2")

acquired, err := customLock.TryLock(ctx, 30)
if err != nil {
panic(err)
}

if acquired {
fmt.Println("Custom lock acquired")
time.Sleep(3 * time.Second)
customLock.Unlock(ctx)
fmt.Println("Custom lock released")
} else {
fmt.Println("Failed to acquire custom lock")
}

// 示例3:并发测试
testConcurrentLocks(client)
}

func testConcurrentLocks(client *clientv3.Client) {
const numGoroutines = 10
const lockKey = "/locks/test"

var wg sync.WaitGroup
counter := 0

for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

lock, err := NewEtcdMutex(client, lockKey, 30)
if err != nil {
fmt.Printf("Goroutine %d failed to create lock: %v\n", id, err)
return
}
defer lock.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := lock.Lock(ctx); err != nil {
fmt.Printf("Goroutine %d failed to acquire lock: %v\n", id, err)
return
}

// 临界区
oldCounter := counter
time.Sleep(100 * time.Millisecond) // 模拟工作
counter = oldCounter + 1
fmt.Printf("Goroutine %d: counter = %d\n", id, counter)

if err := lock.Unlock(ctx); err != nil {
fmt.Printf("Goroutine %d failed to release lock: %v\n", id, err)
}
}(i)
}

wg.Wait()
fmt.Printf("Final counter value: %d (expected: %d)\n", counter, numGoroutines)
}

分布式锁时序图:

关键设计点:

  1. 基于租约的锁

    • 自动过期防止死锁
    • 租约续期保持锁活跃
    • 租约撤销立即释放锁
  2. 公平性保证

    • Watch 机制实现等待队列
    • 先到先得的获取顺序
    • 避免锁饥饿问题
  3. 高可用性

    • 支持 Etcd 集群故障转移
    • 网络分区时自动释放锁
    • 客户端重连后重新获取锁

总结

这些面试题涵盖了 Etcd 的核心概念、实现原理、最佳实践和实际应用场景。在面试中,面试官通常会从基础概念开始,逐步深入到具体的实现细节和实际应用中的问题解决。

重点掌握的知识点:

  1. Raft 算法原理:选举、日志复制、安全性
  2. 高可用性设计:故障检测、自动恢复、数据一致性
  3. 性能优化:批量操作、连接复用、缓存策略
  4. 实际应用:服务发现、分布式锁、配置管理
  5. Go 语言实践:正确使用 etcd 客户端、错误处理、并发控制

References