Published on

Mongodb 优化实践 | 并行查询

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. Mongodb 并发查询优化

1. 将彼此独立的查询并行查询

假如有一个接口需要多个collection的聚合查询结果, 且查询条件彼此独立, 可以通过并行查询提升响应速度

  1. 实现

    console.time("all")
    const queries = [
      ['query1', Query1.aggregate().match(queryMatch1).group(queryGroup1).exec()],
      ['query2', Query2.aggregate().match(queryMatch2).group(queryGroup2).exec()],
      ...
    ]
    
    const results: Array<any[]> =
      await Promise.all(
        queries.map(([_name, p]) => {
          console.time(_name)
          return p.then((res:any[]) => {
            console.timeEnd(_name)
            return res
          })
        })
      )
    console.timeEnd("all")
    
  2. 注意事项

    1. 根据版本不同, 不加.exec()可能不能实现完全并行, 可以通过调试每个请求的查询时间来确认并行查询生效
    2. 在使用$in时, 如果数组过大会影响效率, 甚至全表扫描, 所以需要检查执行计划来判断索引是否生效
    3. 如果并发任务过多, 需要考虑分组, 以及检查连接池的大小, 确保查询效率

2. 进一步优化, 将查询条件也拆分并行查询

使用场景: 查询使用$in, 而查询又特别的大(比如1w), 查询如果耗时很久, 可以将数组拆分后并行查询再合并查询结果

  1. 控制并发速度

    // 当前的项目是基于 CommonJS (CJS) 的,而 p-limit 是 ESM-only 模块
    export async function runWithConcurrencyLimit(tasks: any[], limit = 5) {
      const results = []
      const executing: Promise<any>[] = []
    
      for (const task of tasks) {
        const p = task()
        results.push(p)
    
        const e = p.then(() => executing.splice(executing.indexOf(e), 1))
        executing.push(e)
    
        if (executing.length >= limit) {
          await Promise.race(executing)
        }
      }
    
      return Promise.all(results)
    }
    
  2. 构造任务控制并发

    const BATCH_SIZE = 3000
    console.log(`BATCH_SIZE: ${BATCH_SIZE}`)
    const idChunks = []
    for (let i = 0; i < proIds.length; i += BATCH_SIZE) {
      idChunks.push(proIds.slice(i, i + BATCH_SIZE))
    }
    const queryMatch = { createdAt: { $gte: startDate, $lt: endDate } }
    const queryGroup = {
      _id: "$projectId",
      count: { $sum: 1 },
    }
    const queryTasks = idChunks.map((ids) =>
      async () => {
        const match = { ...queryMatch, projectId: { $in: ids } }
        return await QueryRepository.aggregate().match(match).group(queryGroup).exec()
      }
    )
    let [
      queryResult,
      ...
    ]: Array<any[]> =
    await Promise.all([
      runWithConcurrencyLimit(queryTasks, 5),
      runWithConcurrencyLimit(..., 5),
      runWithConcurrencyLimit(..., 5),
      runWithConcurrencyLimit(..., 5)
    ]
    )
    // 合并查询结果
    queryResult = queryResult.reduce((acc, curr) => acc.concat(curr), [])
    
  3. 使用plimit

    // npm install p-limit
    import pLimit from 'p-limit';
    
    // 创建并发限制器,最多 5 个任务并发执行
    const limit = pLimit(5);
    
    // 任务列表
    const tasks = items.map((item) =>
      limit(() => fetchData(item))
    );
    
    // 使用 Promise.all 启动所有任务,但实际并发被限制
    const results = await Promise.all(tasks);