- Published on
Mongodb 优化实践 | 并行查询
- Authors
- Name
- Shelton Ma
1. Mongodb 并发查询优化
1. 将彼此独立的查询并行查询
假如有一个接口需要多个collection的聚合查询结果, 且查询条件彼此独立, 可以通过并行查询提升响应速度
实现
console.time("all") const queries = [ ['query1', Query1.aggregate().match(queryMatch1).group(queryGroup1).exec()], ['query2', Query2.aggregate().match(queryMatch2).group(queryGroup2).exec()], ... ] const results: Array<any[]> = await Promise.all( queries.map(([_name, p]) => { console.time(_name) return p.then((res:any[]) => { console.timeEnd(_name) return res }) }) ) console.timeEnd("all")
注意事项
- 根据版本不同, 不加
.exec()
可能不能实现完全并行, 可以通过调试每个请求的查询时间来确认并行查询生效 - 在使用
$in
时, 如果数组过大会影响效率, 甚至全表扫描, 所以需要检查执行计划来判断索引是否生效 - 如果并发任务过多, 需要考虑分组, 以及检查连接池的大小, 确保查询效率
- 根据版本不同, 不加
2. 进一步优化, 将查询条件也拆分并行查询
使用场景: 查询使用$in
, 而查询又特别的大(比如1w), 查询如果耗时很久, 可以将数组拆分后并行查询再合并查询结果
控制并发速度
// 当前的项目是基于 CommonJS (CJS) 的,而 p-limit 是 ESM-only 模块 export async function runWithConcurrencyLimit(tasks: any[], limit = 5) { const results = [] const executing: Promise<any>[] = [] for (const task of tasks) { const p = task() results.push(p) const e = p.then(() => executing.splice(executing.indexOf(e), 1)) executing.push(e) if (executing.length >= limit) { await Promise.race(executing) } } return Promise.all(results) }
构造任务控制并发
const BATCH_SIZE = 3000 console.log(`BATCH_SIZE: ${BATCH_SIZE}`) const idChunks = [] for (let i = 0; i < proIds.length; i += BATCH_SIZE) { idChunks.push(proIds.slice(i, i + BATCH_SIZE)) } const queryMatch = { createdAt: { $gte: startDate, $lt: endDate } } const queryGroup = { _id: "$projectId", count: { $sum: 1 }, } const queryTasks = idChunks.map((ids) => async () => { const match = { ...queryMatch, projectId: { $in: ids } } return await QueryRepository.aggregate().match(match).group(queryGroup).exec() } ) let [ queryResult, ... ]: Array<any[]> = await Promise.all([ runWithConcurrencyLimit(queryTasks, 5), runWithConcurrencyLimit(..., 5), runWithConcurrencyLimit(..., 5), runWithConcurrencyLimit(..., 5) ] ) // 合并查询结果 queryResult = queryResult.reduce((acc, curr) => acc.concat(curr), [])
使用plimit
// npm install p-limit import pLimit from 'p-limit'; // 创建并发限制器,最多 5 个任务并发执行 const limit = pLimit(5); // 任务列表 const tasks = items.map((item) => limit(() => fetchData(item)) ); // 使用 Promise.all 启动所有任务,但实际并发被限制 const results = await Promise.all(tasks);