# Microservices Patterns
| 如果你有... | 那么... |
|---|---|
| 小团队(<5名开发者),简单领域 | 从单体开始 |
| 需要独立部署/扩展 |
经验法则:如果你无法定义清晰的服务边界,说明你还没准备好使用微服务。
围绕业务功能而非技术层组织服务。
电商示例:
├── 订单服务 # 订单生命周期
├── 支付服务 # 支付处理
├── 库存服务 # 库存管理
├── 物流服务 # 订单履行
└── 通知服务 # 邮件、短信
逐步从单体中抽取,无需大规模重写。
python
用于:查询,需要即时响应时。
python
import httpx
from tenacity import retry, stopafterattempt, wait_exponential
class ServiceClient:
def init(self, base_url: str):
self.baseurl = baseurl
self.client = httpx.AsyncClient(timeout=5.0)
@retry(stop=stopafterattempt(3), wait=wait_exponential(min=1, max=10))
async def get(self, path: str):
带自动重试的GET请求。
response = await self.client.get(f{self.base_url}{path})
response.raiseforstatus()
return response.json()
用于:命令,可接受最终一致性时。
python
from aiokafka import AIOKafkaProducer
import json
@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict
class EventBus:
def init(self, bootstrap_servers: List[str]):
self.producer = AIOKafkaProducer(
bootstrapservers=bootstrapservers,
value_serializer=lambda v: json.dumps(v).encode()
)
async def publish(self, event: DomainEvent):
await self.producer.sendandwait(
event.event_type, # 主题 = 事件类型
value=asdict(event),
key=event.aggregate_id.encode()
)
| 同步 | 异步 |
|---|---|
| 需要即时响应 | 即发即忘 |
| 简单查询/响应 |
每个服务拥有自己的数据。没有共享数据库。
订单服务 → orders_db (PostgreSQL)
支付服务 → payments_db (PostgreSQL)
产品服务 → products_db (MongoDB)
分析服务 → analytics_db (ClickHouse)
用于需要跨多个服务且需要回滚能力的操作。
python
class SagaStep:
def init(self, name: str, action: Callable, compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation
class OrderFulfillmentSaga:
def init(self):
self.steps = [
SagaStep(createorder, self.createorder, self.cancel_order),
SagaStep(reserveinventory, self.reserveinventory, self.release_inventory),
SagaStep(processpayment, self.processpayment, self.refund_payment),
SagaStep(confirmorder, self.confirmorder, self.cancel_confirmation),
]
async def execute(self, order_data: dict) -> SagaResult:
completed_steps = []
context = {orderdata: orderdata}
for step in self.steps:
try:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status=failed, error=result.error)
completed_steps.append(step)
context.update(result.data)
except Exception as e:
await self.compensate(completed_steps, context)
return SagaResult(status=failed, error=str(e))
return SagaResult(status=completed, data=context)
async def compensate(self, completed_steps: List[SagaStep], context: dict):
按相反顺序执行补偿操作。
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# 记录日志但继续补偿
logger.error(fCompensation failed for {step.name}: {e})
当服务宕机时快速失败。防止级联故障。
python
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = closed # 正常运行
OPEN = open # 失败,拒绝请求
HALFOPEN = halfopen # 测试恢复
class CircuitBreaker:
def init(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failurethreshold = failurethreshold
self.recoverytimeout = recoverytimeout
self.successthreshold = successthreshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func: Callable, args, *kwargs):
if self.state == CircuitState.OPEN:
if self.shouldattempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(Service unavailable)
try:
result = await func(args, *kwargs)
self.onsuccess()
return result
except Exception as e:
self.onfailure()
raise
def onsuccess(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.successcount >= self.successthreshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def onfailure(self):
self.failure_count += 1
if self.failurecount >= self.failurethreshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
def shouldattempt_reset(self) -> bool:
return datetime.now() - self.openedat > timedelta(seconds=self.recoverytimeout)
async def callpaymentservice(data: dict):
return await breaker.call(payment_client.post, /payments, json=data)
用于临时故障。
python
from tenacity import retry, stopafterattempt,
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 microservice-patterns-1776420082 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 microservice-patterns-1776420082 技能
skillhub install microservice-patterns-1776420082
文件大小: 5.78 KB | 发布时间: 2026-4-17 18:19