Skip to content

22.3 多外掛協作

22.3.1 插件间通信机制

事件总线通信

// src/communication/event-bus.ts

/**

python
  * 插件事件总线 */ export class PluginEventBus { private listeners: Map<string, EventListener[]> = new Map(); private history: Event[] = []; private maxHistorySize: number = 100;

constructor(maxHistorySize: number = 100) { this.maxHistorySize = maxHistorySize; }

/**

  * 订阅事件 */ subscribe(eventType: string, listener: EventListener): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); }

this.listeners.get(eventType)!.push(listener); }

/**

javascript
  * 取消订阅 */ unsubscribe(eventType: string, listener: EventListener): void { const listeners = this.listeners.get(eventType);

if (listeners) { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); } } }

/**

javascript
  * 发布事件 */ async publish(event: PluginEvent): Promise<void> { // 记录历史 this.addToHistory(event);

// 获取监听器 const listeners = this.listeners.get(event.type);

if (!listeners || listeners.length === 0) { return; }

// 通知所有监听器 const promises = listeners.map(listener => this.safeNotify(listener, event) );

await Promise.all(promises); }

/**

  * 安全通知 */ private async safeNotify(listener: EventListener, event: PluginEvent): Promise<void> { try { await listener(event); } catch (error) { console.error(`Error in event listener for ${event.type}:`, error); } }

/**

  * 添加到历史 */ private addToHistory(event: PluginEvent): void { this.history.push(event);

// 限制历史大小 if (this.history.length > this.maxHistorySize) { this.history.shift(); } }

/**

javascript
  * 获取历史事件 */ getHistory(eventType?: string): PluginEvent[] { if (eventType) { return this.history.filter(event => event.type === eventType); }

return [...this.history]; }

/**

  * 清除历史 */ clearHistory(): void { this.history = []; } }

/**

javascript
  * 事件监听器 */ type EventListener = (event: PluginEvent) => Promise<void> | void;

/**

python
  * 插件事件 */ interface PluginEvent { type: string; source: string; target?: string; data: any; timestamp: Date; }

// 使用示例 const eventBus = new PluginEventBus();

// 插件 A 订阅事件 eventBus.subscribe('user.created', async (event) => { console.log(`Plugin A received user.created:`, event.data); });

// 插件 B 订阅事件 eventBus.subscribe('user.created', async (event) => { console.log(`Plugin B received user.created:`, event.data); });

// 插件 C 发布事件 await eventBus.publish({ type: 'user.created', source: 'plugin-c', data: { userId: 1, name: 'John' }, timestamp: new Date() });

// 查看历史 const history = eventBus.getHistory('user.created'); console.log('History:', history);

### 消息队列通信

    bash


    typescript

    // src/communication/message-queue.ts

    /**
     * 插件消息队列
     */
    export class PluginMessageQueue {
      private queues: Map<string, Message[]> = new Map();
      private consumers: Map<string, MessageConsumer[]> = new Map();
      private processing: boolean = false;

      /**
       * 发送消息
       */
      send(queueName: string, message: Message): void {
        if (!this.queues.has(queueName)) {
          this.queues.set(queueName, []);
        }

        this.queues.get(queueName)!.push(message);

        // 触发处理
        this.processQueue(queueName);
      }

      /**
       * 接收消息
       */
      receive(queueName: string, consumer: MessageConsumer): void {
        if (!this.consumers.has(queueName)) {
          this.consumers.set(queueName, []);
        }

        this.consumers.get(queueName)!.push(consumer);
      }

      /**
       * 处理队列
       */
      private async processQueue(queueName: string): Promise<void> {
        if (this.processing) {
          return;
        }

        this.processing = true;

        try {
          const queue = this.queues.get(queueName);
          const consumers = this.consumers.get(queueName);

          if (!queue || queue.length === 0 || !consumers || consumers.length === 0) {
            return;
          }

          // 取出消息
          const message = queue.shift()!;

          // 分发给消费者
          const promises = consumers.map(consumer =>
            this.safeConsume(consumer, message)
          );

          await Promise.all(promises);

          // 继续处理下一条消息
          if (queue.length > 0) {
            await this.processQueue(queueName);
          }
        } finally {
          this.processing = false;
        }
      }

      /**
       * 安全消费
       */
      private async safeConsume(consumer: MessageConsumer, message: Message): Promise<void> {
        try {
          await consumer(message);
        } catch (error) {
          console.error(`Error in message consumer:`, error);
        }
      }

      /**
       * 获取队列大小
       */
      getQueueSize(queueName: string): number {
        const queue = this.queues.get(queueName);
        return queue ? queue.length : 0;
      }

      /**
       * 清空队列
       */
      clearQueue(queueName: string): void {
        this.queues.set(queueName, []);
      }
    }

    /**
     * 消息接口
     */
    interface Message {
      id: string;
      source: string;
      target?: string;
      data: any;
      timestamp: Date;
    }

    /**
     * 消息消费者
     */
    type MessageConsumer = (message: Message) => Promise<void> | void;

    // 使用示例
    const messageQueue = new PluginMessageQueue();

    // 插件 A 注册消费者
    messageQueue.receive('user-queue', async (message) => {
      console.log(`Plugin A received message:`, message.data);
    });

    // 插件 B 注册消费者
    messageQueue.receive('user-queue', async (message) => {
      console.log(`Plugin B received message:`, message.data);
    });

    // 插件 C 发送消息
    messageQueue.send('user-queue', {
      id: 'msg-1',
      source: 'plugin-c',
      data: { userId: 1, name: 'John' },
      timestamp: new Date()
    });

    // 查看队列大小
    const size = messageQueue.getQueueSize('user-queue');
    console.log('Queue size:', size);

    ### RPC 通信

    // src/communication/rpc.ts
    /**
    * 插件 RPC 服务
    */
    export class PluginRPCService {
    private services: Map<string, RPCHandler> = new Map();
    private clients: Map<string, RPCClient> = new Map();
    /**
    * 注册服务
    */
    registerService(serviceName: string, handler: RPCHandler): void {
    this.services.set(serviceName, handler);
    }
    /**
    * 注销服务
    */
    unregisterService(serviceName: string): void {
    this.services.delete(serviceName);
    }
    /**
    * 调用服务
    */
    async call(serviceName: string, method: string, params: any): Promise<any> {
    const handler = this.services.get(serviceName);
    if (!handler) {
    throw new Error(`Service not found: ${serviceName}`);
    }
    return handler(method, params);
    }
    /**
    * 创建客户端
    */
    createClient(serviceName: string): RPCClient {
    const client = new RPCClient(this, serviceName);
    this.clients.set(serviceName, client);
    return client;
    }
    /**
    * 获取客户端
    */
    getClient(serviceName: string): RPCClient | undefined {
    return this.clients.get(serviceName);
    }
    }
    /**
    * RPC 处理器
    */
    type RPCHandler = (method: string, params: any) => Promise<any>;
    /**
    * RPC 客户端
    */
    export class RPCClient {
    constructor(
    private rpcService: PluginRPCService,
    private serviceName: string
    ) {}
    /**
    * 调用方法
    */
    async call(method: string, params?: any): Promise<any> {
    return this.rpcService.call(this.serviceName, method, params || {});
    }
    }
    // 使用示例
    const rpcService = new PluginRPCService();
    // 插件 A 注册服务
    rpcService.registerService('user-service', async (method, params) => {
    switch (method) {
    case 'getUser':
    return { id: params.id, name: 'John Doe' };
    case 'createUser':
    return { id: Date.now(), ...params };
    default:
    throw new Error(`Unknown method: ${method}`);
    }
    });
    // 插件 B 注册服务
    rpcService.registerService('order-service', async (method, params) => {
    switch (method) {
    case 'getOrder':
    return { id: params.id, userId: 1, total: 100 };
    case 'createOrder':
    return { id: Date.now(), ...params };
    default:
    throw new Error(`Unknown method: ${method}`);
    }
    });
    // 插件 C 创建客户端并调用服务
    const userClient = rpcService.createClient('user-service');
    const orderClient = rpcService.createClient('order-service');
    const user = await userClient.call('getUser', { id: 1 });
    console.log('User:', user);
    const order = await orderClient.call('getOrder', { id: 1 });
    console.log('Order:', order);

## 22.3.2 插件依赖管理

### 依赖解析

    bash


    typescript

    // src/dependencies/dependency-resolver.ts

    /**
     * 插件依赖解析器
     */
    export class PluginDependencyResolver {
      private plugins: Map<string, PluginInfo> = new Map();

      /**
       * 添加插件
       */
      addPlugin(pluginInfo: PluginInfo): void {
        this.plugins.set(pluginInfo.name, pluginInfo);
      }

      /**
       * 移除插件
       */
      removePlugin(pluginName: string): void {
        this.plugins.delete(pluginName);
      }

      /**
       * 解析依赖顺序
       */
      resolve(): string[] {
        const visited: Set<string> = new Set();
        const visiting: Set<string> = new Set();
        const order: string[] = [];

        for (const pluginName of this.plugins.keys()) {
          if (!visited.has(pluginName)) {
            this.visit(pluginName, visited, visiting, order);
          }
        }

        return order;
      }

      /**
       * 访问插件
       */
      private visit(
        pluginName: string,
        visited: Set<string>,
        visiting: Set<string>,
        order: string[]
      ): void {
        if (visiting.has(pluginName)) {
          throw new Error(`Circular dependency detected: ${pluginName}`);
        }

        if (visited.has(pluginName)) {
          return;
        }

        visiting.add(pluginName);

        const plugin = this.plugins.get(pluginName);
        if (plugin) {
          for (const dep of plugin.dependencies || []) {
            this.visit(dep, visited, visiting, order);
          }
        }

        visiting.delete(pluginName);
        visited.add(pluginName);
        order.push(pluginName);
      }

      /**
       * 检查依赖
       */
      checkDependencies(): DependencyCheckResult {
        const errors: string[] = [];
        const warnings: string[] = [];

        for (const [name, plugin] of this.plugins.entries()) {
          for (const dep of plugin.dependencies || []) {
            if (!this.plugins.has(dep)) {
              errors.push(`Plugin ${name} depends on missing plugin: ${dep}`);
            }
          }
        }

        return { errors, warnings };
      }
    }

    /**
     * 插件信息
     */
    interface PluginInfo {
      name: string;
      version: string;
      dependencies?: string[];
    }

    /**
     * 依赖检查结果
     */
    interface DependencyCheckResult {
      errors: string[];
      warnings: string[];
    }

    // 使用示例
    const resolver = new PluginDependencyResolver();

    // 添加插件
    resolver.addPlugin({
      name: 'plugin-a',
      version: '1.0.0',
      dependencies: []
    });

    resolver.addPlugin({
      name: 'plugin-b',
      version: '1.0.0',
      dependencies: ['plugin-a']
    });

    resolver.addPlugin({
      name: 'plugin-c',
      version: '1.0.0',
      dependencies: ['plugin-a', 'plugin-b']
    });

    // 检查依赖
    const checkResult = resolver.checkDependencies();
    console.log('Dependency check:', checkResult);

    // 解析依赖顺序
    const order = resolver.resolve();
    console.log('Load order:', order);
    // ['plugin-a', 'plugin-b', 'plugin-c']

    ### 依赖注入

    // src/dependencies/dependency-injection.ts
    /**
    * 插件依赖注入容器
    */
    export class PluginDIContainer {
    private services: Map<string, ServiceDefinition> = new Map();
    private instances: Map<string, any> = new Map();
    /**
    * 注册服务
    */
    registerService(name: string, definition: ServiceDefinition): void {
    this.services.set(name, definition);
    }
    /**
    * 解析服务
    */
    resolve(name: string): any {
    // 检查是否已实例化
    if (this.instances.has(name)) {
    return this.instances.get(name);
    }
    // 获取服务定义
    const definition = this.services.get(name);
    if (!definition) {
    throw new Error(`Service not found: ${name}`);
    }
    // 解析依赖
    const dependencies = (definition.dependencies || []).map(dep =>
    this.resolve(dep)
    );
    // 创建实例
    const instance = definition.factory(...dependencies);
    // 如果是单例,缓存实例
    if (definition.singleton) {
    this.instances.set(name, instance);
    }
    return instance;
    }
    /**
    * 清除实例
    */
    clear(): void {
    this.instances.clear();
    }
    }
    /**
    * 服务定义
    */
    interface ServiceDefinition {
    factory: (...args: any[]) => any;
    dependencies?: string[];
    singleton?: boolean;
    }
    // 使用示例
    const container = new PluginDIContainer();
    // 注册服务
    container.registerService('logger', {
    factory: () => ({
    log: (message: string) => console.log(`[LOG] ${message}`)
    }),
    singleton: true
    });
    container.registerService('database', {
    factory: (logger: any) => ({
    query: (sql: string) => {
    logger.log(`Executing query: ${sql}`);
    return [];
    }
    }),
    dependencies: ['logger'],
    singleton: true
    });
    container.registerService('userService', {
    factory: (database: any, logger: any) => ({
    getUser: (id: number) => {
    logger.log(`Getting user ${id}`);
    return database.query(`SELECT * FROM users WHERE id = ${id}`);
    }
    }),
    dependencies: ['database', 'logger'],
    singleton: false
    });
    // 解析服务
    const userService = container.resolve('userService');
    const user = userService.getUser(1);
    console.log('User:', user);

## 22.3.3 插件生命周期协调

### 生命周期管理器

    bash


    typescript

    // src/lifecycle/lifecycle-manager.ts

    /**
     * 插件生命周期管理器
     */
    export class PluginLifecycleManager {
      private plugins: Map<string, ManagedPlugin> = new Map();
      private state: LifecycleState = LifecycleState.IDLE;

      /**
       * 添加插件
       */
      addPlugin(plugin: ManagedPlugin): void {
        this.plugins.set(plugin.name, plugin);
      }

      /**
       * 移除插件
       */
      removePlugin(pluginName: string): void {
        this.plugins.delete(pluginName);
      }

      /**
       * 初始化所有插件
       */
      async initializeAll(): Promise<void> {
        this.state = LifecycleState.INITIALIZING;

        const resolver = new PluginDependencyResolver();

        // 添加插件到解析器
        for (const plugin of this.plugins.values()) {
          resolver.addPlugin({
            name: plugin.name,
            version: plugin.version,
            dependencies: plugin.dependencies
          });
        }

        // 解析依赖顺序
        const order = resolver.resolve();

        // 按顺序初始化
        for (const pluginName of order) {
          const plugin = this.plugins.get(pluginName);
          if (plugin) {
            await plugin.initialize();
          }
        }

        this.state = LifecycleState.INITIALIZED;
      }

      /**
       * 启动所有插件
       */
      async startAll(): Promise<void> {
        this.state = LifecycleState.STARTING;

        const resolver = new PluginDependencyResolver();

        // 添加插件到解析器
        for (const plugin of this.plugins.values()) {
          resolver.addPlugin({
            name: plugin.name,
            version: plugin.version,
            dependencies: plugin.dependencies
          });
        }

        // 解析依赖顺序
        const order = resolver.resolve();

        // 按顺序启动
        for (const pluginName of order) {
          const plugin = this.plugins.get(pluginName);
          if (plugin) {
            await plugin.start();
          }
        }

        this.state = LifecycleState.RUNNING;
      }

      /**
       * 停止所有插件
       */
      async stopAll(): Promise<void> {
        this.state = LifecycleState.STOPPING;

        const resolver = new PluginDependencyResolver();

        // 添加插件到解析器
        for (const plugin of this.plugins.values()) {
          resolver.addPlugin({
            name: plugin.name,
            version: plugin.version,
            dependencies: plugin.dependencies
          });
        }

        // 解析依赖顺序(反向)
        const order = resolver.resolve().reverse();

        // 按反向顺序停止
        for (const pluginName of order) {
          const plugin = this.plugins.get(pluginName);
          if (plugin) {
            await plugin.stop();
          }
        }

        this.state = LifecycleState.STOPPED;
      }

      /**
       * 清理所有插件
       */
      async cleanupAll(): Promise<void> {
        const resolver = new PluginDependencyResolver();

        // 添加插件到解析器
        for (const plugin of this.plugins.values()) {
          resolver.addPlugin({
            name: plugin.name,
            version: plugin.version,
            dependencies: plugin.dependencies
          });
        }

        // 解析依赖顺序(反向)
        const order = resolver.resolve().reverse();

        // 按反向顺序清理
        for (const pluginName of order) {
          const plugin = this.plugins.get(pluginName);
          if (plugin) {
            await plugin.cleanup();
          }
        }

        this.state = LifecycleState.IDLE;
      }

      /**
       * 获取状态
       */
      getState(): LifecycleState {
        return this.state;
      }
    }

    /**
     * 管理的插件
     */
    interface ManagedPlugin {
      name: string;
      version: string;
      dependencies?: string[];
      initialize: () => Promise<void>;
      start: () => Promise<void>;
      stop: () => Promise<void>;
      cleanup: () => Promise<void>;
    }

    /**
     * 生命周期状态
     */
    enum LifecycleState {
      IDLE = 'IDLE',
      INITIALIZING = 'INITIALIZING',
      INITIALIZED = 'INITIALIZED',
      STARTING = 'STARTING',
      RUNNING = 'RUNNING',
      STOPPING = 'STOPPING',
      STOPPED = 'STOPPED'
    }

    // 使用示例
    const manager = new PluginLifecycleManager();

    // 添加插件
    manager.addPlugin({
      name: 'plugin-a',
      version: '1.0.0',
      dependencies: [],
      initialize: async () => console.log('Plugin A initialized'),
      start: async () => console.log('Plugin A started'),
      stop: async () => console.log('Plugin A stopped'),
      cleanup: async () => console.log('Plugin A cleaned up')
    });

    manager.addPlugin({
      name: 'plugin-b',
      version: '1.0.0',
      dependencies: ['plugin-a'],
      initialize: async () => console.log('Plugin B initialized'),
      start: async () => console.log('Plugin B started'),
      stop: async () => console.log('Plugin B stopped'),
      cleanup: async () => console.log('Plugin B cleaned up')
    });

    manager.addPlugin({
      name: 'plugin-c',
      version: '1.0.0',
      dependencies: ['plugin-a', 'plugin-b'],
      initialize: async () => console.log('Plugin C initialized'),
      start: async () => console.log('Plugin C started'),
      stop: async () => console.log('Plugin C stopped'),
      cleanup: async () => console.log('Plugin C cleaned up')
    });

    // 初始化
    await manager.initializeAll();

    // 启动
    await manager.startAll();

    // 停止
    await manager.stopAll();

    // 清理
    await manager.cleanupAll();

    ### 生命周期事件

    // src/lifecycle/lifecycle-events.ts
    /**
    * 插件生命周期事件
    */
    export class PluginLifecycleEvents {
    private listeners: Map<string, LifecycleEventListener[]> = new Map();
    /**
    * 订阅事件
    */
    subscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void {
    if (!this.listeners.has(eventType)) {
    this.listeners.set(eventType, []);
    }
    this.listeners.get(eventType)!.push(listener);
    }
    /**
    * 取消订阅
    */
    unsubscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void {
    const listeners = this.listeners.get(eventType);
    if (listeners) {
    const index = listeners.indexOf(listener);
    if (index > -1) {
    listeners.splice(index, 1);
    }
    }
    }
    /**
    * 触发事件
    */
    async emit(event: LifecycleEvent): Promise<void> {
    const listeners = this.listeners.get(event.type);
    if (!listeners || listeners.length === 0) {
    return;
    }
    const promises = listeners.map(listener =>
    this.safeNotify(listener, event)
    );
    await Promise.all(promises);
    }
    /**
    * 安全通知
    */
    private async safeNotify(listener: LifecycleEventListener, event: LifecycleEvent): Promise<void> {
    try {
    await listener(event);
    } catch (error) {
    console.error(`Error in lifecycle event listener:`, error);
    }
    }
    }
    /**
    * 生命周期事件类型
    */
    enum LifecycleEventType {
    BEFORE_INITIALIZE = 'BEFORE_INITIALIZE',
    AFTER_INITIALIZE = 'AFTER_INITIALIZE',
    BEFORE_START = 'BEFORE_START',
    AFTER_START = 'AFTER_START',
    BEFORE_STOP = 'BEFORE_STOP',
    AFTER_STOP = 'AFTER_STOP',
    BEFORE_CLEANUP = 'BEFORE_CLEANUP',
    AFTER_CLEANUP = 'AFTER_CLEANUP'
    }
    /**
    * 生命周期事件
    */
    interface LifecycleEvent {
    type: LifecycleEventType;
    pluginName: string;
    timestamp: Date;
    }
    /**
    * 生命周期事件监听器
    */
    type LifecycleEventListener = (event: LifecycleEvent) => Promise<void> | void;
    // 使用示例
    const events = new PluginLifecycleEvents();
    // 订阅事件
    events.subscribe(LifecycleEventType.BEFORE_START, async (event) => {
    console.log(`Before starting plugin: ${event.pluginName}`);
    });
    events.subscribe(LifecycleEventType.AFTER_START, async (event) => {
    console.log(`After starting plugin: ${event.pluginName}`);
    });
    // 触发事件
    await events.emit({
    type: LifecycleEventType.BEFORE_START,
    pluginName: 'plugin-a',
    timestamp: new Date()
    });
    await events.emit({
    type: LifecycleEventType.AFTER_START,
    pluginName: 'plugin-a',
    timestamp: new Date()
    });

基于 MIT 许可发布 | 永久导航