返回顶部
m

microservices-patterns微服务模式

# Microservices Patterns

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
1,022
下载量
免费
免费
2
收藏
概述
安装方式
版本历史

microservices-patterns

微服务模式

是什么

构建分布式系统的模式:服务分解、服务间通信、数据管理和弹性。帮助你避免分布式单体反模式。

何时使用

  • - 将单体应用拆分为微服务
  • 设计服务边界和契约
  • 实现服务间通信
  • 管理分布式事务
  • 构建弹性分布式系统

关键词

微服务、服务网格、事件驱动、Saga、断路器、API网关、服务发现、分布式事务、最终一致性、CQRS

决策框架:何时使用微服务

如果你有...那么...
小团队(<5名开发者),简单领域从单体开始
需要独立部署/扩展
考虑微服务 | | 多个团队,清晰的领域边界 | 微服务效果良好 | | 紧迫的截止日期,未知需求 | 先单体,后抽取 |

经验法则:如果你无法定义清晰的服务边界,说明你还没准备好使用微服务。



服务分解模式

模式1:按业务能力划分

围绕业务功能而非技术层组织服务。

电商示例:
├── 订单服务 # 订单生命周期
├── 支付服务 # 支付处理
├── 库存服务 # 库存管理
├── 物流服务 # 订单履行
└── 通知服务 # 邮件、短信

模式2:绞杀者模式(单体迁移)

逐步从单体中抽取,无需大规模重写。

  1. 1. 识别要抽取的有界上下文
  2. 创建新的微服务
  3. 将新流量路由到微服务
  4. 逐步迁移现有功能
  5. 完成后从单体中移除

python

迁移期间的API网关路由


async def route_orders(request):
if request.path.startswith(/api/orders/v2):
return await neworderservice.forward(request)
else:
return await legacy_monolith.forward(request)


通信模式

模式1:同步(REST/gRPC)

用于:查询,需要即时响应时。

python
import httpx
from tenacity import retry, stopafterattempt, wait_exponential

class ServiceClient:
def init(self, base_url: str):
self.baseurl = baseurl
self.client = httpx.AsyncClient(timeout=5.0)

@retry(stop=stopafterattempt(3), wait=wait_exponential(min=1, max=10))
async def get(self, path: str):
带自动重试的GET请求。
response = await self.client.get(f{self.base_url}{path})
response.raiseforstatus()
return response.json()

使用示例

payment_client = ServiceClient(http://payment-service:8001) result = await paymentclient.get(f/payments/{paymentid})

模式2:异步(事件)

用于:命令,可接受最终一致性时。

python
from aiokafka import AIOKafkaProducer
import json

@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict

class EventBus:
def init(self, bootstrap_servers: List[str]):
self.producer = AIOKafkaProducer(
bootstrapservers=bootstrapservers,
value_serializer=lambda v: json.dumps(v).encode()
)

async def publish(self, event: DomainEvent):
await self.producer.sendandwait(
event.event_type, # 主题 = 事件类型
value=asdict(event),
key=event.aggregate_id.encode()
)

订单服务发布事件

await event_bus.publish(DomainEvent( event_id=str(uuid.uuid4()), event_type=OrderCreated, aggregate_id=order.id, occurred_at=datetime.now(), data={orderid: order.id, customerid: order.customer_id} ))

库存服务订阅并响应

async def handleordercreated(event_data: dict): orderid = eventdata[data][order_id] items = event_data[data][items] await reserveinventory(orderid, items)

何时使用每种方式

同步异步
需要即时响应即发即忘
简单查询/响应
长时间运行的操作 | | 需要低延迟 | 解耦是优先考虑 | | 可接受紧耦合 | 最终一致性可接受 |

数据模式

每个服务独立数据库

每个服务拥有自己的数据。没有共享数据库。

订单服务 → orders_db (PostgreSQL)
支付服务 → payments_db (PostgreSQL)
产品服务 → products_db (MongoDB)
分析服务 → analytics_db (ClickHouse)

Saga模式(分布式事务)

用于需要跨多个服务且需要回滚能力的操作。

python
class SagaStep:
def init(self, name: str, action: Callable, compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation

class OrderFulfillmentSaga:
def init(self):
self.steps = [
SagaStep(createorder, self.createorder, self.cancel_order),
SagaStep(reserveinventory, self.reserveinventory, self.release_inventory),
SagaStep(processpayment, self.processpayment, self.refund_payment),
SagaStep(confirmorder, self.confirmorder, self.cancel_confirmation),
]

async def execute(self, order_data: dict) -> SagaResult:
completed_steps = []
context = {orderdata: orderdata}

for step in self.steps:
try:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status=failed, error=result.error)
completed_steps.append(step)
context.update(result.data)
except Exception as e:
await self.compensate(completed_steps, context)
return SagaResult(status=failed, error=str(e))

return SagaResult(status=completed, data=context)

async def compensate(self, completed_steps: List[SagaStep], context: dict):
按相反顺序执行补偿操作。
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# 记录日志但继续补偿
logger.error(fCompensation failed for {step.name}: {e})



弹性模式

断路器

当服务宕机时快速失败。防止级联故障。

python
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
CLOSED = closed # 正常运行
OPEN = open # 失败,拒绝请求
HALFOPEN = halfopen # 测试恢复

class CircuitBreaker:
def init(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failurethreshold = failurethreshold
self.recoverytimeout = recoverytimeout
self.successthreshold = successthreshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None

async def call(self, func: Callable, args, *kwargs):
if self.state == CircuitState.OPEN:
if self.shouldattempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(Service unavailable)

try:
result = await func(args, *kwargs)
self.onsuccess()
return result
except Exception as e:
self.onfailure()
raise

def onsuccess(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.successcount >= self.successthreshold:
self.state = CircuitState.CLOSED
self.success_count = 0

def onfailure(self):
self.failure_count += 1
if self.failurecount >= self.failurethreshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()

def shouldattempt_reset(self) -> bool:
return datetime.now() - self.openedat > timedelta(seconds=self.recoverytimeout)

使用示例

breaker = CircuitBreaker(failurethreshold=5, recoverytimeout=30)

async def callpaymentservice(data: dict):
return await breaker.call(payment_client.post, /payments, json=data)

指数退避重试

用于临时故障。

python
from tenacity import retry, stopafterattempt,

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 microservice-patterns-1776420082 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 microservice-patterns-1776420082 技能

通过命令行安装

skillhub install microservice-patterns-1776420082

下载

⬇ 下载 microservices-patterns v1.0.0(免费)

文件大小: 5.78 KB | 发布时间: 2026-4-17 18:19

v1.0.0 最新 2026-4-17 18:19
microservices-patterns 1.0.0

- Initial release providing a comprehensive guide to microservice architecture patterns.
- Includes decision framework for when to use microservices vs. monolith.
- Documents key decomposition (by business capability, strangler fig), communication (sync/async), and data (database-per-service, saga) patterns.
- Offers practical code examples for common patterns (API gateway migration, event-driven communication, circuit breaker, saga orchestration).
- Covers resilience strategies to prevent distributed monolith anti-pattern.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部