跳到主要内容

Go 分布式事务的底层原理与实践

分布式事务核心概念

分布式事务是指跨越多个网络节点的事务操作,需要保证所有参与节点的数据一致性。随着微服务架构的普及,分布式事务已成为现代分布式系统中的关键技术。

产生原因与挑战

  • 数据分片: 单表数据超过千万级需要分库分表
  • 服务拆分: 微服务独立数据库带来的数据一致性问题
  • 网络复杂性: 网络延迟、分区、节点故障等不确定因素

典型业务场景分析

电商下单场景

这是分布式事务最经典的应用场景,涉及多个服务的协调:

数据流转分析

  1. 库存锁定: 预留商品,防止超卖
  2. 优惠券核销: 确保优惠券只被使用一次
  3. 资金扣减: 从用户账户扣除相应金额
  4. 订单创建: 生成最终的订单记录

如果任一步骤失败,需要回滚所有已执行的操作,保证数据一致性。

转账场景

银行转账是分布式事务的另一个典型应用:

关键挑战

  • 原子性: 要么全部成功,要么全部失败
  • 一致性: 总金额保持不变
  • 隔离性: 并发转账不能相互影响
  • 持久性: 提交后的结果必须持久化

两阶段提交协议 (2PC)

2PC 是最经典的分布式事务解决方案,通过协调器统一管理所有参与者:

Go 实现示例

type TwoPhaseCommitCoordinator struct {
participants []Participant
timeout time.Duration
}

func (tpc *TwoPhaseCommitCoordinator) Execute(ctx context.Context, txID string) error {
// 第一阶段:Prepare
if err := tpc.preparePhase(ctx, txID); err != nil {
tpc.abortPhase(ctx, txID)
return err
}

// 第二阶段:Commit
return tpc.commitPhase(ctx, txID)
}

func (tpc *TwoPhaseCommitCoordinator) preparePhase(ctx context.Context, txID string) error {
var wg sync.WaitGroup
errChan := make(chan error, len(tpc.participants))

for _, participant := range tpc.participants {
wg.Add(1)
go func(p Participant) {
defer wg.Done()
if err := p.Prepare(ctx, txID); err != nil {
errChan <- err
}
}(participant)
}

wg.Wait()
close(errChan)

// 检查是否有失败
for err := range errChan {
if err != nil {
return err
}
}

return nil
}

2PC 的核心问题

  • 同步阻塞: 参与者在等待协调器决策期间被阻塞
  • 单点故障: 协调器故障导致整个系统无法工作
  • 数据不一致: 网络分区可能导致部分节点提交,部分节点回滚

TCC 补偿事务模式

TCC (Try-Confirm-Cancel) 是一种应用层的分布式事务解决方案,通过业务逻辑来保证数据一致性:

TCC 在转账场景中的应用

数据流转过程

Go 实现示例

type AccountTCCService struct {
db *sql.DB
}

// Try: 冻结账户资金
func (s *AccountTCCService) TryTransfer(txID string, fromAccount, toAccount string, amount decimal.Decimal) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

// 检查并冻结转出账户资金
if err := s.freezeAmount(tx, fromAccount, amount, txID); err != nil {
return err
}

// 为转入账户创建预增记录
if err := s.prepareIncrease(tx, toAccount, amount, txID); err != nil {
return err
}

// 记录TCC事务状态
if err := s.recordTCCTransaction(tx, txID, "TRY"); err != nil {
return err
}

return tx.Commit()
}

// Confirm: 确认转账
func (s *AccountTCCService) ConfirmTransfer(txID string) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

// 获取事务信息
tccTx, err := s.getTCCTransaction(txID)
if err != nil {
return err
}

// 真实扣减转出账户
if err := s.deductAmount(tx, tccTx.FromAccount, tccTx.Amount); err != nil {
return err
}

// 真实增加转入账户
if err := s.increaseAmount(tx, tccTx.ToAccount, tccTx.Amount); err != nil {
return err
}

// 更新事务状态
if err := s.updateTCCTransactionStatus(tx, txID, "CONFIRMED"); err != nil {
return err
}

return tx.Commit()
}

TCC 的优势与挑战

优势:

  • 无锁设计: 不会长时间占用数据库锁
  • 高性能: 异步处理,响应快速
  • 灵活性: 可以根据业务需求定制补偿逻辑

挑战:

  • 业务侵入性强: 需要为每个操作实现三个接口
  • 幂等性要求: 所有操作必须支持重复调用
  • 补偿复杂度: 复杂业务场景下补偿逻辑难以设计

Saga 长事务模式

Saga 模式将长事务分解为一系列短事务,每个事务都有对应的补偿操作:

旅行预订场景

业务流程设计

Go 实现框架

type SagaStep struct {
Name string
Action func(ctx context.Context, data interface{}) error
Compensate func(ctx context.Context, data interface{}) error
}

type SagaCoordinator struct {
steps []SagaStep
}

func (s *SagaCoordinator) Execute(ctx context.Context, data interface{}) error {
completedSteps := make([]int, 0)

// 正向执行
for i, step := range s.steps {
if err := step.Action(ctx, data); err != nil {
// 执行失败,进行补偿
s.compensate(ctx, completedSteps, data)
return fmt.Errorf("step %s failed: %w", step.Name, err)
}
completedSteps = append(completedSteps, i)
}

return nil
}

func (s *SagaCoordinator) compensate(ctx context.Context, completedSteps []int, data interface{}) {
// 逆序执行补偿
for i := len(completedSteps) - 1; i >= 0; i-- {
stepIndex := completedSteps[i]
step := s.steps[stepIndex]

if err := step.Compensate(ctx, data); err != nil {
log.Errorf("compensate step %s failed: %v", step.Name, err)
// 补偿失败需要告警,可能需要人工介入
}
}
}

消息事务模式

基于消息队列的最终一致性方案,适用于对实时性要求不高的场景:

订单支付场景

数据流转机制

Go 实现示例

type OutboxMessage struct {
ID string `json:"id"`
Topic string `json:"topic"`
Payload []byte `json:"payload"`
Status string `json:"status"` // PENDING, SENT, CONFIRMED
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

type OutboxService struct {
db *sql.DB
producer MessageProducer
}

// 在业务事务中插入消息
func (s *OutboxService) PublishWithTransaction(tx *sql.Tx, topic string, payload interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return err
}

message := &OutboxMessage{
ID: uuid.New().String(),
Topic: topic,
Payload: data,
Status: "PENDING",
CreatedAt: time.Now(),
}

return s.insertMessage(tx, message)
}

// 异步发送待发送的消息
func (s *OutboxService) ProcessPendingMessages(ctx context.Context) {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.sendPendingMessages()
}
}
}

func (s *OutboxService) sendPendingMessages() {
messages, err := s.getPendingMessages(100)
if err != nil {
log.Errorf("get pending messages failed: %v", err)
return
}

for _, msg := range messages {
if err := s.producer.Send(msg.Topic, msg.Payload); err != nil {
log.Errorf("send message %s failed: %v", msg.ID, err)
continue
}

// 更新消息状态
s.updateMessageStatus(msg.ID, "SENT")
}
}

方案选型指导

不同的业务场景适合不同的分布式事务方案:

选型决策矩阵

场景特征2PCTCCSaga消息事务
强一致性
高性能
复杂业务
开发成本
业务侵入

故障处理与监控

悬挂事务处理

TCC 模式中常见的悬挂事务问题:

防悬挂实现

func (s *TCCService) Try(txID string, amount decimal.Decimal) error {
// 检查是否存在Cancel记录
if s.hasCancelRecord(txID) {
return errors.New("transaction already cancelled, hanging detected")
}

// 幂等检查
if s.hasTryRecord(txID) {
return nil // 已经执行过Try
}

// 执行Try逻辑
return s.doTry(txID, amount)
}

func (s *TCCService) Cancel(txID string, amount decimal.Decimal) error {
// 空回滚:没有Try就Cancel
if !s.hasTryRecord(txID) {
// 记录Cancel状态,防止后续Try
return s.recordCancelOnly(txID)
}

// 幂等检查
if s.hasCancelRecord(txID) {
return nil // 已经执行过Cancel
}

// 执行Cancel逻辑
return s.doCancel(txID, amount)
}

监控与告警

关键监控指标

type TransactionMetrics struct {
// 成功率指标
TotalTransactions prometheus.Counter
SuccessTransactions prometheus.Counter
FailedTransactions prometheus.Counter

// 性能指标
TransactionDuration prometheus.Histogram

// 状态指标
PendingTransactions prometheus.Gauge
TimeoutTransactions prometheus.Counter

// 补偿指标
CompensationCount prometheus.Counter
CompensationFailure prometheus.Counter
}

func (m *TransactionMetrics) RecordTransaction(status string, duration time.Duration) {
m.TotalTransactions.Inc()
m.TransactionDuration.Observe(duration.Seconds())

switch status {
case "SUCCESS":
m.SuccessTransactions.Inc()
case "FAILED":
m.FailedTransactions.Inc()
case "TIMEOUT":
m.TimeoutTransactions.Inc()
}
}

分布式事务是分布式系统中的核心挑战之一,需要在一致性、性能、可用性之间做出权衡。通过深入理解各种方案的原理和适用场景,我们能够为不同的业务需求选择最合适的解决方案,构建可靠的分布式系统。