Skip to content

27.2 复杂系统设计与实现

复杂系统设计概述

Claude Code 可以帮助开发者设计和实现复杂的软件系统。通过系统化的设计方法和分阶段的实现策略,开发者可以利用 Claude Code 构建可扩展、可维护的复杂系统。

系统架构设计

1. 微服务架构

示例:设计微服务架构

用户请求: "设计一个电商系统的微服务架构"

Claude Code 生成的架构设计:

python
## 系统架构

    bash


API Gateway                          │
    │  (负载均衡、认证、路由、限流、监控)                           │
    └─────────────────────────────────────────────────────────────┘

    ┌─────────────────────┼─────────────────────┐
    │                     │                     │
    ┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
    │  User Service  │  │ Product Service │  │ Order Service   │
- 用户管理     │  │  - 商品管理      │  │  - 订单管理      │
- 认证授权     │  │  - 库存管理      │  │  - 支付处理      │
- 个人资料     │  │  - 分类管理      │  │  - 物流跟踪      │
    └────────────────┘  └─────────────────┘  └─────────────────┘
    │                     │                     │
    └─────────────────────┼─────────────────────┘

    ┌─────────────────────┼─────────────────────┐
    │                     │                     │
    ┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
    │ Payment Service│  │ Inventory      │  │ Notification    │
- 支付处理     │  │ Service        │  │ Service         │
- 退款处理     │  │  - 库存同步     │  │  - 邮件通知      │
- 交易记录     │  │  - 预警管理     │  │  - 短信通知      │
    └────────────────┘  └─────────────────┘  └─────────────────┘

    ┌─────────▼─────────┐
    │   Message Broker  │
    │  (Kafka/RabbitMQ) │
    └───────────────────┘

    ```

    ## 服务定义

```python
    ### User Service (用户服务)

    ```python

    python

    # services/user_service/app.py

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_jwt_extended import JWTManager, create_access_token
    import bcrypt

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users_db'
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'

    db = SQLAlchemy(app)
    jwt = JWTManager(app)

    class User(db.Model):
        """用户模型"""
        __tablename__ = 'users'

        id = db.Column(db.Integer, primary_key=True)
        email = db.Column(db.String(120), unique=True, nullable=False)
        password_hash = db.Column(db.String(255), nullable=False)
        name = db.Column(db.String(100), nullable=False)
        phone = db.Column(db.String(20))
        created_at = db.Column(db.DateTime, server_default=db.func.now())

        def set_password(self, password):
            """设置密码"""
            self.password_hash = bcrypt.hashpw(
                password.encode('utf-8'),
                bcrypt.gensalt()
            ).decode('utf-8')

        def check_password(self, password):
            """验证密码"""
            return bcrypt.checkpw(
                password.encode('utf-8'),
                self.password_hash.encode('utf-8')
            )

    @app.route('/register', methods=['POST'])
    def register():
        """用户注册"""
        data = request.get_json()

        user = User(
            email=data['email'],
            name=data['name'],
            phone=data.get('phone')
        )
        user.set_password(data['password'])

        db.session.add(user)
        db.session.commit()

        return jsonify({'user_id': user.id}), 201

    @app.route('/login', methods=['POST'])
    def login():
        """用户登录"""
        data = request.get_json()

        user = User.query.filter_by(email=data['email']).first()

        if not user or not user.check_password(data['password']):
            return jsonify({'error': 'Invalid credentials'}), 401

        access_token = create_access_token(identity=user.id)

        return jsonify({
            'access_token': access_token,
            'user': {
                'id': user.id,
                'email': user.email,
                'name': user.name
            }
        })

    @app.route('/users/<int:user_id>', methods=['GET'])
    def get_user(user_id):
        """获取用户信息"""
        user = User.query.get_or_404(user_id)
        return jsonify({
            'id': user.id,
            'email': user.email,
            'name': user.name,
            'phone': user.phone
        })

    ### Product Service (商品服务)

    ```python

    # services/product_service/app.py
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_migrate import Migrate
    from datetime import datetime

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/products_db'

    db = SQLAlchemy(app)
    migrate = Migrate(app, db)

    class Product(db.Model):
    """商品模型"""
    __tablename__ = 'products'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(200), nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Float, nullable=False)
    stock = db.Column(db.Integer, default=0)
    category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    category = db.relationship('Category', backref='products')

    class Category(db.Model):
    """分类模型"""
    __tablename__ = 'categories'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))

    parent = db.relationship('Category', remote_side=[id], backref='children')

    @app.route('/products', methods=['POST'])
    def create_product():
    """创建商品"""
    data = request.get_json()

    product = Product(
    name=data['name'],
    description=data.get('description'),
    price=data['price'],
    stock=data.get('stock', 0),
    category_id=data.get('category_id')
    )

    db.session.add(product)
    db.session.commit()

    return jsonify({'product_id': product.id}), 201

    @app.route('/products/<int:product_id>', methods=['GET'])
    def get_product(product_id):
    """获取商品详情"""
    product = Product.query.get_or_404(product_id)
    return jsonify({
    'id': product.id,
    'name': product.name,
    'description': product.description,
    'price': product.price,
    'stock': product.stock,
    'category': product.category.name if product.category else None
    })

    @app.route('/products/<int:product_id>/stock', methods=['PUT'])
    def update_stock(product_id):
    """更新库存"""
    data = request.get_json()

    product = Product.query.get_or_404(product_id)
    product.stock = data['stock']

    db.session.commit()

    return jsonify({'stock': product.stock})

    ### Order Service (订单服务)

    ```python

    python

    # services/order_service/app.py

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    import requests

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

    db = SQLAlchemy(app)

    class Order(db.Model):
        """订单模型"""
        __tablename__ = 'orders'

        id = db.Column(db.Integer, primary_key=True)
        user_id = db.Column(db.Integer, nullable=False)
        total_amount = db.Column(db.Float, nullable=False)
        status = db.Column(db.String(20), default='pending')
        created_at = db.Column(db.DateTime, server_default=db.func.now())
        updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    class OrderItem(db.Model):
        """订单项模型"""
        __tablename__ = 'order_items'

        id = db.Column(db.Integer, primary_key=True)
        order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
        product_id = db.Column(db.Integer, nullable=False)
        quantity = db.Column(db.Integer, nullable=False)
        price = db.Column(db.Float, nullable=False)

        order = db.relationship('Order', backref='items')

    @app.route('/orders', methods=['POST'])
    def create_order():
        """创建订单"""
        data = request.get_json()

        # 验证商品库存

        for item in data['items']:
            response = requests.get(
                f"http://product-service:5001/products/{item['product_id']}"
            )
            product = response.json()

            if product['stock'] < item['quantity']:
                return jsonify({
                    'error': f'Insufficient stock for product {item["product_id"]}'
                }), 400

        # 创建订单

        order = Order(
            user_id=data['user_id'],
            total_amount=0
        )
        db.session.add(order)
        db.session.flush()

        # 创建订单项

        total_amount = 0
        for item in data['items']:
            order_item = OrderItem(
                order_id=order.id,
                product_id=item['product_id'],
                quantity=item['quantity'],
                price=item['price']
            )
            db.session.add(order_item)
            total_amount += item['price'] * item['quantity']

        order.total_amount = total_amount
        db.session.commit()

        # 扣减库存

        for item in data['items']:
            response = requests.get(
                f"http://product-service:5001/products/{item['product_id']}"
            )
            product = response.json()
            new_stock = product['stock'] - item['quantity']

            requests.put(
                f"http://product-service:5001/products/{item['product_id']}/stock",
                json={'stock': new_stock}
            )

        return jsonify({'order_id': order.id}), 201

    @app.route('/orders/<int:order_id>', methods=['GET'])
    def get_order(order_id):
        """获取订单详情"""
        order = Order.query.get_or_404(order_id)

        return jsonify({
            'id': order.id,
            'user_id': order.user_id,
            'total_amount': order.total_amount,
            'status': order.status,
            'items': [
                {
                    'product_id': item.product_id,
                    'quantity': item.quantity,
                    'price': item.price
                }
                for item in order.items
            ]
        })

    ## API Gateway 实现

    ```python
    python

    # api_gateway/app.py
    from flask import Flask, request, jsonify
    import requests
    import jwt
    from functools import wraps

    app = Flask(__name__)

    SERVICES = {
        'user': 'http://user-service:5000',
        'product': 'http://product-service:5001',
        'order': 'http://order-service:5002'
    }

    def authenticate(f):
        """认证装饰器"""
        @wraps(f)
        def decorated_function(*args, **kwargs):
            token = request.headers.get('Authorization')

            if not token:
                return jsonify({'error': 'Missing token'}), 401

            try:
                payload = jwt.decode(token.split()[1], 'your-secret-key', algorithms=['HS256'])
                request.user_id = payload['user_id']
            except jwt.InvalidTokenError:
                return jsonify({'error': 'Invalid token'}), 401

            return f(*args, **kwargs)

        return decorated_function

    @app.route('/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
    def proxy(service, path):
        """代理请求到对应的服务"""
        if service not in SERVICES:
            return jsonify({'error': 'Service not found'}), 404

        service_url = SERVICES[service]
        url = f"{service_url}/{path}"

        headers = {key: value for key, value in request.headers if key != 'Host'}

        response = requests.request(
            method=request.method,
            url=url,
            headers=headers,
            json=request.get_json(),
            params=request.args
        )

        return jsonify(response.json()), response.status_code

    @app.route('/health')
    def health():
        """健康检查"""
        return jsonify({'status': 'healthy'})

    ```> >

    ### 2. 事件驱动架构

    ````python
    `python

    # 示例:设计事件驱动架构

    用户请求:
    "设计一个基于事件驱动的订单处理系统"

    Claude Code 生成的架构设计:

    ## 系统架构

    ```> ┌──────────────┐

    │   订单服务    │
    │  (Order      │
    │   Service)   │
    └──────┬───────┘
    │ 发布事件

    ┌─────────────────────────────────────────────────────────────┐
    │                    Message Broker                           │
    │                    (Kafka/RabbitMQ)                         │
    │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
    │  │ 订单创建  │  │ 订单支付  │  │ 订单发货  │  │ 订单完成  │  │
    │  │ 事件     │  │ 事件     │  │ 事件     │  │ 事件     │  │
    │  └──────────┘  └──────────┘  └──────────┘  └──────────┘  │
    └─────────────────────────────────────────────────────────────┘
    │              │              │              │
    ▼              ▼              ▼              ▼
    ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
    │ 库存服务  │   │ 支付服务  │   │ 物流服务  │   │ 通知服务  │
    │ Inventory│   │ Payment  │   │ Shipping │   │ Notify   │
    │ Service  │   │ Service  │   │ Service  │   │ Service  │
    └──────────┘   └──────────┘   └──────────┘   └──────────┘

    ```
    ## 事件定义

    # events/order_events.py
    from dataclasses import dataclass
    from datetime import datetime
    from typing import Dict, Any
    import json
    @dataclass
    class OrderEvent:
    """订单事件基类"""
    event_type: str
    order_id: int
    timestamp: datetime
    data: Dict[str, Any]
    def to_dict(self) -> dict:
    """转换为字典"""
    return {
    'event_type': self.event_type,
    'order_id': self.order_id,
    'timestamp': self.timestamp.isoformat(),
    'data': self.data
    }
    def to_json(self) -> str:
    """转换为 JSON"""
    return json.dumps(self.to_dict())
    @dataclass
    class OrderCreatedEvent(OrderEvent):
    """订单创建事件"""
    def __init__(self, order_id: int, user_id: int, total_amount: float, items: list):
    super().__init__(
    event_type='order.created',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'user_id': user_id,
    'total_amount': total_amount,
    'items': items
    }
    )
    @dataclass
    class OrderPaidEvent(OrderEvent):
    """订单支付事件"""
    def __init__(self, order_id: int, payment_method: str, transaction_id: str):
    super().__init__(
    event_type='order.paid',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'payment_method': payment_method,
    'transaction_id': transaction_id
    }
    )
    @dataclass
    class OrderShippedEvent(OrderEvent):
    """订单发货事件"""
    def __init__(self, order_id: int, tracking_number: str, shipping_address: dict):
    super().__init__(
    event_type='order.shipped',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'tracking_number': tracking_number,
    'shipping_address': shipping_address
    }
    )
    @dataclass
    class OrderCompletedEvent(OrderEvent):
    """订单完成事件"""
    def __init__(self, order_id: int, delivery_time: datetime):
    super().__init__(
    event_type='order.completed',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'delivery_time': delivery_time.isoformat()
    }
    )


    ## 事件发布者

    python

    # services/order_service/event_publisher.py
    import pika
    import json
    from events.order_events import OrderEvent

    class EventPublisher:
        """事件发布者"""

        def __init__(self, rabbitmq_url: str = 'amqp://localhost'):
            self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
            self.channel = self.connection.channel()

            # 声明交换机
            self.channel.exchange_declare(
                exchange='order_events',
                exchange_type='topic'
            )

        def publish(self, event: OrderEvent, routing_key: str):
            """发布事件"""
            self.channel.basic_publish(
                exchange='order_events',
                routing_key=routing_key,
                body=event.to_json(),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # 持久化
                )
            )

        def close(self):
            """关闭连接"""
            self.connection.close()

    ## 事件消费者

    # services/inventory_service/event_consumer.py
    import pika
    import json
    import requests
    class InventoryEventConsumer:
    """库存事件消费者"""
    def __init__(self, rabbitmq_url: str = 'amqp://localhost'):
    self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
    self.channel = self.connection.channel()
    # 声明交换机和队列
    self.channel.exchange_declare(
    exchange='order_events',
    exchange_type='topic'
    )
    self.channel.queue_declare(queue='inventory_queue', durable=True)
    self.channel.queue_bind(
    exchange='order_events',
    queue='inventory_queue',
    routing_key='order.created'
    )
    def handle_order_created(self, ch, method, properties, body):
    """处理订单创建事件"""
    event = json.loads(body)
    print(f"Received order.created event: {event}")
    # 扣减库存
    for item in event['data']['items']:
    response = requests.get(
    f"http://product-service:5001/products/{item['product_id']}"
    )
    product = response.json()
    new_stock = product['stock'] - item['quantity']
    requests.put(
    f"http://product-service:5001/products/{item['product_id']}/stock",
    json={'stock': new_stock}
    )
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    def start_consuming(self):
    """开始消费消息"""
    self.channel.basic_qos(prefetch_count=1)
    self.channel.basic_consume(
    queue='inventory_queue',
    on_message_callback=self.handle_order_created
    )
    print('Inventory service started consuming messages...')
    self.channel.start_consuming()
    def close(self):
    """关闭连接"""
    self.connection.close()


    ## 事件驱动订单服务

    python

    # services/order_service/app.py
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    from event_publisher import EventPublisher
    from events.order_events import OrderCreatedEvent

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

    db = SQLAlchemy(app)
    event_publisher = EventPublisher()

    class Order(db.Model):
        """订单模型"""
        __tablename__ = 'orders'

        id = db.Column(db.Integer, primary_key=True)
        user_id = db.Column(db.Integer, nullable=False)
        total_amount = db.Column(db.Float, nullable=False)
        status = db.Column(db.String(20), default='pending')
        created_at = db.Column(db.DateTime, server_default=db.func.now())
        updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    class OrderItem(db.Model):
        """订单项模型"""
        __tablename__ = 'order_items'

        id = db.Column(db.Integer, primary_key=True)
        order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
        product_id = db.Column(db.Integer, nullable=False)
        quantity = db.Column(db.Integer, nullable=False)
        price = db.Column(db.Float, nullable=False)

        order = db.relationship('Order', backref='items')

    @app.route('/orders', methods=['POST'])
    def create_order():
        """创建订单"""
        data = request.get_json()

        # 创建订单
        order = Order(
            user_id=data['user_id'],
            total_amount=0
        )
        db.session.add(order)
        db.session.flush()

        # 创建订单项
        total_amount = 0
        items = []
        for item in data['items']:
            order_item = OrderItem(
                order_id=order.id,
                product_id=item['product_id'],
                quantity=item['quantity'],
                price=item['price']
            )
            db.session.add(order_item)
            total_amount += item['price'] * item['quantity']
            items.append({
                'product_id': item['product_id'],
                'quantity': item['quantity'],
                'price': item['price']
            })

        order.total_amount = total_amount
        db.session.commit()

        # 发布订单创建事件
        event = OrderCreatedEvent(
            order_id=order.id,
            user_id=data['user_id'],
            total_amount=total_amount,
            items=items
        )
        event_publisher.publish(event, 'order.created')

        return jsonify({'order_id': order.id}), 201

    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=5002)

    ```

    ```

基于 MIT 许可发布 | 永久导航