跳到主要内容

给商品信息构建知识图谱

商品信息知识图谱概述

在现代电商环境中,商品信息的复杂性和关联性日益增长。传统的关系型数据库在处理商品间的多维关系、属性继承、以及复杂查询时往往力不从心。知识图谱作为一种新型的数据组织方式,能够自然地表达商品之间的关系,为电商平台提供更智能的商品管理和推荐能力。

商品知识图谱不仅能够存储结构化的商品信息,还能通过关系推理发现商品间的隐含联系,为业务决策提供数据支撑。

商品领域建模与数据结构设计

核心实体类型定义

在构建商品知识图谱时,我们需要识别和定义核心的实体类型:

属性层次结构设计

商品属性往往具有层次性和继承性,例如电子产品都有"电源规格",而手机还有"屏幕尺寸"等特有属性:

实际图模型实现

// 创建商品分类层次结构
CREATE (electronics:Category {name: "电子产品", level: 1})
CREATE (mobile:Category {name: "手机通讯", level: 2})
CREATE (smartphone:Category {name: "智能手机", level: 3})

CREATE (electronics)-[:HAS_SUBCATEGORY]->(mobile)
CREATE (mobile)-[:HAS_SUBCATEGORY]->(smartphone)

// 创建品牌和商品
CREATE (apple:Brand {
name: "Apple",
country: "美国",
founded: 1976,
reputation_score: 9.5
})

CREATE (iphone15:Product {
name: "iPhone 15 Pro",
model: "A3102",
sku: "IPH15P-256-NTL",
price: 8999.00,
release_date: "2023-09-15",
weight: 187,
dimensions: "146.6×70.6×8.25"
})

// 建立关系
CREATE (iphone15)-[:BELONGS_TO]->(smartphone)
CREATE (iphone15)-[:MANUFACTURED_BY]->(apple)

数据获取与预处理流程

多源数据整合

商品数据通常来源于多个系统和外部数据源,需要建立统一的数据整合流程:

数据清洗与标准化

import pandas as pd
import re
from typing import Dict, List, Optional

class ProductDataCleaner:
def __init__(self):
self.brand_mapping = {
"苹果": "Apple",
"三星": "Samsung",
"华为": "Huawei"
}
self.category_hierarchy = self._load_category_mapping()

def standardize_brand(self, brand_name: str) -> str:
"""品牌名称标准化"""
cleaned = re.sub(r'[^\w\s]', '', brand_name).strip()
return self.brand_mapping.get(cleaned, cleaned)

def extract_specifications(self, description: str) -> Dict[str, str]:
"""从商品描述中提取规格信息"""
specs = {}

# 提取尺寸信息
size_pattern = r'(\d+\.?\d*)"|\d+\.?\d*英寸'
size_match = re.search(size_pattern, description)
if size_match:
specs['screen_size'] = size_match.group()

# 提取内存信息
memory_pattern = r'(\d+)GB'
memory_matches = re.findall(memory_pattern, description)
if memory_matches:
specs['storage'] = max(memory_matches) + "GB"

return specs

def normalize_price(self, price_str: str) -> Optional[float]:
"""价格标准化"""
if not price_str:
return None

# 移除货币符号和特殊字符
cleaned = re.sub(r'[^\d.]', '', price_str)
try:
return float(cleaned)
except ValueError:
return None

实体链接与去重

class EntityLinker:
def __init__(self, neo4j_driver):
self.driver = neo4j_driver
self.similarity_threshold = 0.8

def find_similar_products(self, product_data: Dict) -> List[Dict]:
"""查找相似商品"""
query = """
MATCH (p:Product)
WHERE p.name CONTAINS $name_part
OR p.brand = $brand
RETURN p.id, p.name, p.brand, p.model
"""

with self.driver.session() as session:
result = session.run(query, {
'name_part': product_data['name'][:10],
'brand': product_data['brand']
})
return [record.data() for record in result]

def calculate_similarity(self, product1: Dict, product2: Dict) -> float:
"""计算商品相似度"""
# 基于多个维度计算相似度
name_sim = self._text_similarity(product1['name'], product2['name'])
brand_sim = 1.0 if product1['brand'] == product2['brand'] else 0.0
model_sim = self._text_similarity(
product1.get('model', ''),
product2.get('model', '')
)

# 加权平均
return 0.5 * name_sim + 0.3 * brand_sim + 0.2 * model_sim

关系建模与语义增强

商品关系类型定义

商品间存在多种复杂关系,需要精确建模以支持后续的查询和推理:

type ProductRelationships struct {
// 层次关系
BELONGS_TO string // 属于某个分类
HAS_VARIANT string // 商品变体关系
SUPERSEDES string // 替代关系(新老版本)

// 商业关系
MANUFACTURED_BY string // 制造商
DISTRIBUTED_BY string // 经销商
COMPETES_WITH string // 竞品关系

// 功能关系
COMPATIBLE_WITH string // 兼容性关系
REQUIRES string // 依赖关系
ENHANCES string // 增强关系

// 推荐关系
FREQUENTLY_BOUGHT_TOGETHER string // 经常一起购买
VIEWED_TOGETHER string // 经常一起浏览
SIMILAR_TO string // 相似商品
}

关系权重与动态更新

实际的关系更新实现:

// 基于用户行为更新商品关系权重
MATCH (p1:Product)<-[:CONTAINS]-(o:Order)-[:CONTAINS]->(p2:Product)
WHERE p1.id <> p2.id
WITH p1, p2, count(*) as co_purchase_count
MERGE (p1)-[r:FREQUENTLY_BOUGHT_TOGETHER]-(p2)
SET r.weight = co_purchase_count * 0.1,
r.last_updated = datetime()

// 基于浏览行为发现相似商品
MATCH (u:User)-[:VIEWED]->(p1:Product)
MATCH (u)-[:VIEWED]->(p2:Product)
WHERE p1.id <> p2.id
WITH p1, p2, count(DISTINCT u) as common_viewers
WHERE common_viewers > 10
MERGE (p1)-[r:SIMILAR_TO]-(p2)
SET r.similarity_score = common_viewers * 0.01,
r.basis = "user_behavior"

属性继承与推理

智能查询与推荐系统

语义搜索实现

基于知识图谱的语义搜索能够理解用户意图,提供更精准的搜索结果:

class SemanticSearchEngine:
def __init__(self, neo4j_driver, embedding_model):
self.driver = neo4j_driver
self.embedding_model = embedding_model

def semantic_search(self, query: str, filters: Dict = None) -> List[Dict]:
"""语义搜索商品"""
# 1. 意图识别和实体抽取
entities = self._extract_entities(query)
intent = self._classify_intent(query)

# 2. 构建图查询
cypher_query = self._build_semantic_query(entities, intent, filters)

# 3. 执行查询并排序
with self.driver.session() as session:
results = session.run(cypher_query)
products = [record.data() for record in results]

# 4. 语义相似度重排序
return self._rerank_by_semantic_similarity(query, products)

def _build_semantic_query(self, entities: Dict, intent: str, filters: Dict) -> str:
"""构建语义查询"""
base_query = "MATCH (p:Product)"

# 根据实体添加条件
conditions = []
if entities.get('brand'):
conditions.append("(p)-[:MANUFACTURED_BY]->(:Brand {name: $brand})")
if entities.get('category'):
conditions.append("(p)-[:BELONGS_TO]->(:Category {name: $category})")

# 根据意图调整查询策略
if intent == "comparison":
base_query += """\
-[:SIMILAR_TO|COMPETES_WITH]-(similar:Product)
RETURN p, similar,
[(p)-[r:SIMILAR_TO]-(similar) | r.similarity_score][0] as score
"""
elif intent == "recommendation":
base_query += """\
-[:FREQUENTLY_BOUGHT_TOGETHER]-(recommended:Product)
RETURN p, recommended,
[(p)-[r:FREQUENTLY_BOUGHT_TOGETHER]-(recommended) | r.weight][0] as weight
"""

return base_query

个性化推荐引擎

推荐算法的图查询实现:

// 基于协同过滤的商品推荐
MATCH (target_user:User {id: $userId})-[:PURCHASED]->(p:Product)
WITH target_user, collect(p) as user_products

MATCH (similar_user:User)-[:PURCHASED]->(shared:Product)
WHERE shared IN user_products AND similar_user <> target_user
WITH target_user, similar_user, count(shared) as shared_count
ORDER BY shared_count DESC
LIMIT 10

MATCH (similar_user)-[:PURCHASED]->(recommended:Product)
WHERE NOT (target_user)-[:PURCHASED]->(recommended)
WITH recommended, count(*) as recommendation_score
ORDER BY recommendation_score DESC
LIMIT 20

RETURN recommended.name, recommended.price, recommendation_score

动态定价策略

竞品价格分析查询:

// 分析竞品定价策略
MATCH (product:Product {id: $productId})-[:COMPETES_WITH]-(competitor:Product)
MATCH (competitor)-[:BELONGS_TO]->(category:Category)
RETURN competitor.name, competitor.price, competitor.brand,
category.name as category,
product.price - competitor.price as price_diff
ORDER BY abs(price_diff)

性能优化与扩展性

索引策略与查询优化

索引创建与维护:

// 创建核心索引
CREATE CONSTRAINT product_id_unique FOR (p:Product) REQUIRE p.id IS UNIQUE;
CREATE INDEX product_name_text FOR (p:Product) ON (p.name);
CREATE INDEX product_price_range FOR (p:Product) ON (p.price);
CREATE INDEX product_category_brand FOR (p:Product) ON (p.category, p.brand);

// 创建关系索引
CREATE INDEX rel_purchase_date FOR ()-[r:PURCHASED]-() ON (r.date);
CREATE INDEX rel_similarity_score FOR ()-[r:SIMILAR_TO]-() ON (r.similarity_score);

分布式架构设计

实时数据同步

class RealTimeDataSync:
def __init__(self, neo4j_driver, kafka_consumer):
self.driver = neo4j_driver
self.consumer = kafka_consumer

def sync_product_updates(self):
"""实时同步商品数据更新"""
for message in self.consumer:
event_data = json.loads(message.value)

if event_data['event_type'] == 'product_updated':
self._update_product_node(event_data['product'])
elif event_data['event_type'] == 'user_purchase':
self._update_purchase_relationship(event_data)
elif event_data['event_type'] == 'product_viewed':
self._update_view_relationship(event_data)

def _update_product_node(self, product_data: Dict):
"""更新商品节点"""
query = """
MERGE (p:Product {id: $product_id})
SET p.name = $name,
p.price = $price,
p.last_updated = datetime()
"""

with self.driver.session() as session:
session.run(query, product_data)

业务应用场景扩展

供应链管理优化

知识图谱可以帮助优化供应链管理,通过分析商品间的关系和市场需求预测库存需求:

// 供应链风险分析
MATCH (product:Product)-[:MANUFACTURED_BY]->(supplier:Supplier)
MATCH (product)-[:BELONGS_TO]->(category:Category)
WITH supplier, category, count(product) as product_count
WHERE product_count > 5

MATCH (supplier)-[:LOCATED_IN]->(region:Region)
WHERE region.risk_level > 3

RETURN supplier.name, category.name, product_count, region.risk_level
ORDER BY region.risk_level DESC, product_count DESC

营销活动优化

质量管控与召回

通过知识图谱可以快速定位问题商品的影响范围:

// 产品召回影响分析
MATCH (defective:Product {id: $defectiveProductId})
MATCH (defective)-[:HAS_COMPONENT|SHARES_COMPONENT*1..3]-(related:Product)
MATCH (related)<-[:PURCHASED]-(order:Order)<-[:PLACED]-(user:User)
WHERE order.date > date('2024-01-01')

RETURN user.id, user.email, related.name, order.id
ORDER BY order.date DESC

通过构建完整的商品知识图谱,电商企业能够:

  • 提升搜索体验: 基于语义的智能搜索,理解用户真实需求
  • 优化推荐效果: 多维度关系分析,提供个性化推荐
  • 增强运营效率: 自动化的商品关系发现和管理
  • 支持决策分析: 基于图的复杂分析,支持商业决策
  • 提高用户满意度: 更精准的商品匹配和服务体验