跳到主要内容

Eino 框架的使用

Eino 框架概述

Eino 是一个基于 Go 语言构建的现代化、高性能的 AI 应用开发框架。它专为大模型应用开发而设计,提供了完整的工具链和组件生态,让开发者能够快速构建智能化应用。Eino 的设计理念是简洁、高效、可扩展,特别适合构建对话系统、智能助手、内容生成等 AI 驱动的应用。

框架核心特性

Eino 框架提供了丰富的特性来支持现代 AI 应用开发:

多模型支持与统一接口

基础使用示例

package main

import (
"context"
"fmt"
"log"

"github.com/cloudwego/eino/chat"
"github.com/cloudwego/eino/providers/openai"
)

func basicChatExample() {
// 初始化 OpenAI 提供商
provider, err := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})
if err != nil {
log.Fatal(err)
}

// 创建聊天会话
chatModel := provider.ChatModel()

// 构建消息
messages := []chat.Message{
{
Role: chat.RoleUser,
Content: "你好,请介绍一下 Go 语言的特性",
},
}

// 发送请求
ctx := context.Background()
response, err := chatModel.Chat(ctx, messages)
if err != nil {
log.Fatal(err)
}

fmt.Printf("AI 回复: %s\n", response.Content)
}

工具系统与函数调用

Eino 提供了强大的工具系统,支持 Function Calling 和外部 API 集成:

// 定义工具函数
type WeatherTool struct{}

func (w *WeatherTool) Name() string {
return "get_weather"
}

func (w *WeatherTool) Description() string {
return "获取指定城市的天气信息"
}

func (w *WeatherTool) Parameters() interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"city": map[string]interface{}{
"type": "string",
"description": "城市名称",
},
},
"required": []string{"city"},
}
}

func (w *WeatherTool) Execute(ctx context.Context, args map[string]interface{}) (interface{}, error) {
city := args["city"].(string)

// 模拟天气 API 调用
weather := fmt.Sprintf("%s今天晴朗,温度 25°C", city)
return weather, nil
}

func toolSystemExample() {
// 注册工具
toolRegistry := tool.NewRegistry()
toolRegistry.Register(&WeatherTool{})

// 创建带工具的聊天模型
provider, _ := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})

chatModel := provider.ChatModel().WithTools(toolRegistry)

messages := []chat.Message{
{
Role: chat.RoleUser,
Content: "北京的天气怎么样?",
},
}

ctx := context.Background()
response, err := chatModel.Chat(ctx, messages)
if err != nil {
log.Fatal(err)
}

fmt.Printf("AI 回复: %s\n", response.Content)
}

流程控制与管道处理

Eino 提供了灵活的流程控制机制,支持复杂的处理流水线:

流程控制实现

type Pipeline struct {
steps []PipelineStep
}

type PipelineStep interface {
Process(ctx context.Context, input interface{}) (interface{}, error)
}

// 预处理步骤
type PreprocessStep struct{}

func (p *PreprocessStep) Process(ctx context.Context, input interface{}) (interface{}, error) {
text := input.(string)

// 文本清理和格式化
cleaned := strings.TrimSpace(text)
cleaned = strings.ReplaceAll(cleaned, "\n\n", "\n")

return cleaned, nil
}

// LLM 处理步骤
type LLMStep struct {
model chat.ChatModel
}

func (l *LLMStep) Process(ctx context.Context, input interface{}) (interface{}, error) {
text := input.(string)

messages := []chat.Message{
{Role: chat.RoleUser, Content: text},
}

response, err := l.model.Chat(ctx, messages)
if err != nil {
return nil, err
}

return response.Content, nil
}

// 后处理步骤
type PostprocessStep struct{}

func (p *PostprocessStep) Process(ctx context.Context, input interface{}) (interface{}, error) {
text := input.(string)

// 内容过滤和格式化
if len(text) > 1000 {
text = text[:1000] + "..."
}

return text, nil
}

func pipelineExample() {
// 构建处理管道
provider, _ := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})

pipeline := &Pipeline{
steps: []PipelineStep{
&PreprocessStep{},
&LLMStep{model: provider.ChatModel()},
&PostprocessStep{},
},
}

// 执行管道
ctx := context.Background()
input := " 请详细介绍一下 Eino 框架的特性 \n\n"

var result interface{} = input
for _, step := range pipeline.steps {
var err error
result, err = step.Process(ctx, result)
if err != nil {
log.Fatal(err)
}
}

fmt.Printf("处理结果: %s\n", result)
}

记忆管理与上下文存储

对于需要维护对话历史的应用,Eino 提供了完善的记忆管理系统:

记忆管理实现

type ConversationMemory struct {
sessionID string
messages []chat.Message
maxSize int
storage MemoryStorage
}

type MemoryStorage interface {
Save(sessionID string, messages []chat.Message) error
Load(sessionID string) ([]chat.Message, error)
Clear(sessionID string) error
}

// Redis 存储实现
type RedisMemoryStorage struct {
client *redis.Client
}

func (r *RedisMemoryStorage) Save(sessionID string, messages []chat.Message) error {
data, err := json.Marshal(messages)
if err != nil {
return err
}

key := fmt.Sprintf("conversation:%s", sessionID)
return r.client.Set(context.Background(), key, data, time.Hour*24).Err()
}

func (r *RedisMemoryStorage) Load(sessionID string) ([]chat.Message, error) {
key := fmt.Sprintf("conversation:%s", sessionID)
data, err := r.client.Get(context.Background(), key).Result()
if err != nil {
return nil, err
}

var messages []chat.Message
err = json.Unmarshal([]byte(data), &messages)
return messages, err
}

// 带记忆的聊天会话
type ChatSession struct {
model chat.ChatModel
memory *ConversationMemory
}

func (c *ChatSession) Chat(ctx context.Context, userInput string) (string, error) {
// 加载历史消息
history, _ := c.memory.Load()

// 添加用户消息
newMessage := chat.Message{
Role: chat.RoleUser,
Content: userInput,
}

// 构建完整的消息列表
messages := append(history, newMessage)

// 发送到 LLM
response, err := c.model.Chat(ctx, messages)
if err != nil {
return "", err
}

// 保存对话历史
messages = append(messages, chat.Message{
Role: chat.RoleAssistant,
Content: response.Content,
})

c.memory.Save(messages)

return response.Content, nil
}

func memoryExample() {
// 创建带记忆的聊天会话
provider, _ := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})

storage := &RedisMemoryStorage{
client: redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}),
}

memory := &ConversationMemory{
sessionID: "user-123",
maxSize: 50, // 最多保存50条消息
storage: storage,
}

session := &ChatSession{
model: provider.ChatModel(),
memory: memory,
}

// 多轮对话
ctx := context.Background()

response1, _ := session.Chat(ctx, "我叫张三,来自北京")
fmt.Printf("回复1: %s\n", response1)

response2, _ := session.Chat(ctx, "你还记得我的名字吗?")
fmt.Printf("回复2: %s\n", response2)
}

高级特性与扩展

流式响应处理

对于需要实时反馈的应用,Eino 支持流式响应:

func streamingChatExample() {
provider, _ := openai.NewProvider(openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})

chatModel := provider.ChatModel()

messages := []chat.Message{
{
Role: chat.RoleUser,
Content: "请写一首关于 Go 语言的诗",
},
}

ctx := context.Background()
stream, err := chatModel.ChatStream(ctx, messages)
if err != nil {
log.Fatal(err)
}
defer stream.Close()

fmt.Print("AI 正在创作: ")
for {
chunk, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}

fmt.Print(chunk.Content) // 实时输出
}
fmt.Println()
}

插件系统

Eino 支持插件化扩展,方便集成第三方服务:

type Plugin interface {
Name() string
Initialize(config map[string]interface{}) error
Execute(ctx context.Context, input interface{}) (interface{}, error)
}

// 翻译插件示例
type TranslationPlugin struct {
apiKey string
client *http.Client
}

func (t *TranslationPlugin) Name() string {
return "translation"
}

func (t *TranslationPlugin) Initialize(config map[string]interface{}) error {
t.apiKey = config["api_key"].(string)
t.client = &http.Client{Timeout: 30 * time.Second}
return nil
}

func (t *TranslationPlugin) Execute(ctx context.Context, input interface{}) (interface{}, error) {
text := input.(string)

// 调用翻译 API
result := fmt.Sprintf("翻译结果: %s", text)
return result, nil
}

// 插件管理器
type PluginManager struct {
plugins map[string]Plugin
}

func (p *PluginManager) Register(plugin Plugin) {
p.plugins[plugin.Name()] = plugin
}

func (p *PluginManager) Execute(name string, ctx context.Context, input interface{}) (interface{}, error) {
plugin, exists := p.plugins[name]
if !exists {
return nil, fmt.Errorf("plugin %s not found", name)
}

return plugin.Execute(ctx, input)
}

性能优化与监控

请求池化与缓存

性能优化实现

// 请求缓存
type ResponseCache struct {
cache map[string]CacheEntry
mutex sync.RWMutex
ttl time.Duration
}

type CacheEntry struct {
Response string
Timestamp time.Time
}

func (c *ResponseCache) Get(key string) (string, bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()

entry, exists := c.cache[key]
if !exists {
return "", false
}

// 检查是否过期
if time.Since(entry.Timestamp) > c.ttl {
delete(c.cache, key)
return "", false
}

return entry.Response, true
}

func (c *ResponseCache) Set(key, response string) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.cache[key] = CacheEntry{
Response: response,
Timestamp: time.Now(),
}
}

// 性能监控
type PerformanceMonitor struct {
metrics map[string]*Metric
mutex sync.RWMutex
}

type Metric struct {
Count int64
TotalTime time.Duration
Errors int64
}

func (p *PerformanceMonitor) Record(operation string, duration time.Duration, err error) {
p.mutex.Lock()
defer p.mutex.Unlock()

metric, exists := p.metrics[operation]
if !exists {
metric = &Metric{}
p.metrics[operation] = metric
}

metric.Count++
metric.TotalTime += duration
if err != nil {
metric.Errors++
}
}

func (p *PerformanceMonitor) GetStats(operation string) (avgTime time.Duration, successRate float64) {
p.mutex.RLock()
defer p.mutex.RUnlock()

metric := p.metrics[operation]
if metric.Count == 0 {
return 0, 0
}

avgTime = metric.TotalTime / time.Duration(metric.Count)
successRate = float64(metric.Count-metric.Errors) / float64(metric.Count)
return
}

最佳实践与部署

生产环境配置

type ProductionConfig struct {
// API 配置
OpenAI struct {
APIKey string
Model string
MaxTokens int
Temperature float64
}

// 缓存配置
Cache struct {
Redis struct {
Addr string
Password string
DB int
}
TTL time.Duration
}

// 监控配置
Monitoring struct {
Enabled bool
MetricsURL string
LogLevel string
}

// 限流配置
RateLimit struct {
RequestsPerMinute int
BurstSize int
}
}

func initProductionApp() *EinoApp {
config := loadConfig() // 从环境变量或配置文件加载

// 初始化应用
app := &EinoApp{
config: config,
cache: initCache(config.Cache),
monitor: initMonitor(config.Monitoring),
limiter: initRateLimiter(config.RateLimit),
}

return app
}

// 优雅关闭
func (app *EinoApp) Shutdown(ctx context.Context) error {
// 停止接收新请求
app.server.Shutdown(ctx)

// 等待现有请求完成
app.waitGroup.Wait()

// 清理资源
app.cache.Close()
app.monitor.Flush()

return nil
}

Docker 部署示例

FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go build -o eino-app main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/eino-app .
COPY --from=builder /app/config.yaml .

EXPOSE 8080
CMD ["./eino-app"]

Eino 框架为 AI 应用开发提供了完整的解决方案,从基础的模型调用到复杂的工作流编排,都有相应的支持。其模块化的设计和丰富的扩展性使得开发者可以根据具体需求灵活构建应用,无论是简单的聊天机器人还是复杂的智能助手系统。