- Published on
面试 Web 后端
- Authors
- Name
- Shelton Ma
web后端
1. 使用redis做缓存的功能
Redis 缓存的工作原理:
- 缓存命中(Cache Hit):当请求的数据存在于 Redis 中时,就直接返回 Redis 的缓存数据,这样就能避免去数据库查询,提高响应速度。
- 缓存未命中(Cache Miss):当请求的数据不在 Redis 中时,系统会查询数据库获取数据,并将数据存入 Redis,以便下次访问时能够命中缓存。
- 缓存失效(Cache Expiration):缓存中的数据设置了过期时间,到了过期时间后,数据就会从 Redis 中删除。这个机制可以有效避免缓存的“脏数据”问题。
关键点
- 缓存过期:通过 setex 方法可以为缓存设置过期时间,防止缓存数据长期存在导致不一致。
- 缓存穿透:如果查询的内容非常频繁,缓存的数据被清除后,可能会导致数据库压力增大。可以通过布隆过滤器等方式来避免缓存穿透。
- 缓存雪崩:如果大量缓存数据在同一时刻过期,可能导致数据库压力增大。可以通过随机设置过期时间来分散过期时间,减少瞬间压力。
- 缓存击穿: 热点数据实效, 导致同一请求高并发访问数据库. 预热缓存, 定期更新
2. 消息队列
Redis 和 RabbitMQ 都是常见的消息中间件,但它们的设计目标、功能和适用场景有所不同。下面是它们的对比分析:
- redis 主要用于高速读写、数据缓存、实时数据处理、临时存储等场景。
- RabbitMQ 主要用于高可靠性、消息队列、事件驱动架构、任务调度等场景。
特性 | Redis | RabbitMQ |
---|---|---|
设计目标 | 高速缓存、临时数据存储、Pub/Sub | 消息队列、任务调度、异步消息传递 |
消息持久化 | 不持久化(默认),可以持久化数据 | 支持消息持久化,确保消息不丢失 |
消息确认 | 不支持消息确认 | 支持消息确认(ACK) |
性能 | 极高的性能,低延迟,高吞吐量 | 性能较低,但高可靠性 |
可靠性 | 消息丢失风险大 | 消息可靠性高,支持消息重试和确认 |
适用场景 | 高并发、实时数据、缓存、Pub/Sub | 任务调度、异步处理、事件驱动架构 |
扩展性 | 支持水平扩展(Redis Cluster) | 支持集群模式和高可用队列 |
3. 关系型数据库与非关系型数据库
非关系型数据库: 数据结构灵活(如爬虫数据), 简单数据查询, 不涉及复杂查询和连接操作
4. JWT(JSON Web Token)
JWT(JSON Web Token)包含三部分:
头部(Header)
头部通常由两部分组成:
alg:签名算法,常见的有HS256、RS256等。
typ:令牌类型,通常是JWT。
{ "alg": "HS256", "typ": "JWT" }
载荷(Payload)
签名(Signature)签名部分用于验证令牌的完整性以及身份验证。签名是使用头部中的alg指定的算法,通过对头部和载荷进行编码,并与密钥一起计算得到的。其目的是确保JWT在传输过程中未被篡改。
5. JWT 和 Session 存储的方式和机制
简而言之:
- JWT 存储在客户端,是无状态的,适合于分布式、无状态的应用。
- Session 存储在服务器端,是有状态的,适合于单一应用或需要维护会话的场景。
6. 缓存设置策略, 避免缓存击穿
- 使用合适的缓存键
- 缓存空数据(缓存空结果)
- 使用 Bloom Filter(布隆过滤器)
- 限制查询请求频率
- 缓存数据的预处理和策略: 按需缓存, 定时缓存, 强制刷新缓存
- 设置适当的缓存过期时间
- 缓存更新机制:数据变更时,主动清除或更新相关缓存。
- 异常请求监控和报警
7. 对比prisma/drizzle优势, 及便利之处
Prisma 和 Drizzle 都是非常流行的数据库工具,它们各自在数据库操作方面有不同的优势和便利之处。
- Prisma
- 优势
- 类型安全
- 生成数据库客户端
- 强大的数据库迁移功能
- 简洁且一致的 API
- 广泛的数据库支持
- 强大的文档和社区支持
- 不足
- 性能开销
- 复杂查询的局限性
- 优势
- Drizzle
- 优势
- 轻量级且快速
- TypeScript
- 极简的查询接口
- 灵活的 SQL 生成
- 更精细的控制
- 不足
- 功能较少
- 社区较小
- 优势
8. 接口幂等性
接口的 幂等性(Idempotency)是指无论一个操作被执行多少次,产生的副作用都相同。
- 为什么幂等性很重要?
- 防止重复操作
- 提高系统稳定性
- 增强用户体验
- 幂等性的常见示例, GET/DELETE/PUT设计为幂等性,
- POST确保幂等
- 客户端生成唯一标识符并附带在请求头中, 服务端检查X-Idempotency-Key, 如果存在则返回原来的结果
- 数据库唯一约束
- 状态管理
9. 消息队列
- 消息队列的使用场景
- 异步任务处理(airflow任务启动/aigc内容获取)
- 流量削峰(避免恶意攻击)
- 系统可靠性(确保任务不丢失)
- 使用场景
RabbitMQ
const amqp = require('amqplib'); async function sendMessage() { const conn = await amqp.connect('amqp://localhost'); const channel = await conn.createChannel(); const queue = 'task_queue'; // 发送消息到队列 const msg = 'Hello, RabbitMQ!'; await channel.assertQueue(queue, { durable: true }); channel.sendToQueue(queue, Buffer.from(msg), { persistent: true }); console.log(" [x] Sent %s", msg); // 关闭连接 setTimeout(() => { channel.close(); conn.close(); }, 500); } sendMessage().catch(console.error); // 消费者 const amqp = require('amqplib'); async function receiveMessage() { const conn = await amqp.connect('amqp://localhost'); const channel = await conn.createChannel(); const queue = 'task_queue'; await channel.assertQueue(queue, { durable: true }); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C"); channel.consume(queue, (msg) => { if (msg !== null) { console.log(" [x] Received %s", msg.content.toString()); channel.ack(msg); // 确认消息 } }, { noAck: false }); } receiveMessage().catch(console.error);
kafka
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'], }); const producer = kafka.producer(); async function sendMessage() { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [{ value: 'Hello Kafka!' }], }); console.log('Message sent to Kafka'); await producer.disconnect(); } sendMessage().catch(console.error); // 消费者 const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'my-group' }); async function consumeMessage() { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log(`Received message: ${message.value.toString()}`); }, }); } consumeMessage().catch(console.error);
10. 数据库事务概述
数据库事务(Database Transaction)是一组操作的集合,这些操作要么全部执行成功,要么全部撤销。事务确保了数据的一致性、完整性、隔离性和持久性(ACID原则)
。
使用 Prisma 实现事务控制
const { PrismaClient } = require('@prisma/client'); const prisma = new PrismaClient(); async function runTransaction() { const transaction = await prisma.$transaction(async (prisma) => { try { // 执行多个数据库操作 const user = await prisma.user.create({ data: { name: 'Alice', email: 'alice@example.com', }, }); const post = await prisma.post.create({ data: { title: 'Alice Post', content: 'This is a post by Alice.', authorId: user.id, }, }); // 如果某个操作失败,则手动抛出错误,回滚所有操作 if (!user || !post) { throw new Error('Transaction failed!'); } return post; // 成功返回结果 } catch (error) { console.error(error); throw error; // 抛出错误触发回滚 } }); console.log('Transaction successful:', transaction); } runTransaction().catch((error) => { console.log('Transaction failed and rolled back:', error); });
11. Redis集群
- 分片
- 主从, 主写, 从读
- 集群的高可用, 主故障, 从切换
12. Redis 分布式锁
Redis 分布式锁的工作原理
Redis 分布式锁通常是基于 Redis 的 SETNX 命令(set if not exists)来实现的。这个命令会在键不存在时设置键值,并返回设置成功的结果。利用这一特性,可以创建一个锁机制:
- 客户端尝试向 Redis 设置一个唯一的锁标识(通常是一个带有过期时间的键)。
- 如果该键不存在,表示锁没有被占用,客户端成功获取锁并开始执行关键业务逻辑。
- 如果该键已经存在,表示锁已被占用,客户端无法获得锁,通常会进行重试或返回失败。
- 为了防止锁失效后未能及时释放造成死锁,需要设置锁的 过期时间,使得如果客户端在持有锁时崩溃或超时,锁会自动释放。
实现分布式锁的基本步骤
锁的获取(SETNX + EXPIRE)
使用 SETNX 命令确保只在锁键不存在时设置锁值,同时可以通过 EXPIRE 命令设置锁的过期时间,防止死锁。
锁的释放
释放锁的操作需要确保只有持有锁的客户端才能释放锁。通常可以通过对比 lock_key 的值来确保这一点
锁的获取失败处理
当客户端无法获取到锁时,需要根据业务需求选择合适的重试策略:
- 立即失败:如果不能获取到锁,则直接返回失败信息。
- 重试机制:客户端在获取不到锁时可以设置重试间隔,并循环尝试一段时间后再返回失败。
- 队列等待:在一些场景下,可以将请求放入等待队列中,让客户端按顺序等待锁的释放。
锁的过期处理
为了避免客户端崩溃或长时间占用锁导致死锁,必须设置锁的过期时间。Redis 允许使用 EXPIRE 或者在 SET 命令中使用 PX 参数来设置锁的超时时间。
Redis 分布式锁的优化和改进
红锁(RedLock): Redis 官方文档和许多分布式锁实现库(如 Redisson)推荐使用 红锁算法,它能在多个 Redis 实例之间提供更高的可靠性。红锁的基本思想是,在多个独立的 Redis 实例上分别尝试获得锁,只有当多数 Redis 实例都获得锁时,才认为客户端成功获取锁。这能够有效避免 Redis 单点故障的影响。
算法:假设有 N 个 Redis 实例,客户端需要在大多数实例(例如 N/2+1 个实例)上获得锁才能认为获取成功。如果其中有一个实例不可用,仍然能通过其他实例保证锁的正确性。
锁的可重入性: 默认的 Redis 锁是不支持可重入的,这意味着如果同一个客户端在持有锁的情况下再次请求获取锁,可能会导致死锁。在某些场景下,你可以使用可重入的锁实现,或者通过扩展 Redis 锁的逻辑来实现。
锁的有效期与延续: 锁的有效期可以设置得比较长,但如果某些操作超出了锁的有效期,锁会被意外释放,导致并发问题。为了防止这种情况,通常可以通过定期刷新锁的过期时间(续期)来保证锁不会意外过期。
性能考虑: Redis 的分布式锁通过网络请求实现,所以在高并发的情况下可能会带来一定的性能压力。可以通过减少锁的持有时间、增加 Redis 集群的负载均衡、优化客户端的请求频率等方式来缓解性能问题。
令牌桶设计
令牌桶(Token Bucket)是一种常用的流量控制算法,广泛用于限制访问频率或速率控制。在限制访问频率时,令牌桶算法的设计可以根据不同的需求进行调整,但它的基本原理和设计理念是相对固定的。
令牌桶算法的基本概念
- 令牌桶:一个桶,里面包含若干个“令牌”,每个令牌表示一个允许通过的请求。当桶中有令牌时,请求就可以通过,否则就被拒绝或延迟。
- 令牌产生速率:系统以固定速率将令牌放入桶中,令牌的生成速度通常是以“令牌/秒”来表示。即每单位时间(比如每秒)会放入一定数量的令牌。
- 桶的容量:桶有最大容量限制,一旦桶满,新的令牌会被丢弃。这样可以防止过多的请求积压,避免令牌超负荷。
- 请求与令牌消耗:每当有一个请求到达时,它需要从桶中取出一个令牌。如果桶中有令牌,令牌被消耗,请求允许通过。如果没有令牌,则请求被拒绝或被延迟。
令牌桶的工作原理
- 令牌生成:系统以恒定的速率(例如每秒生成 10 个令牌)生成令牌,并放入桶中。如果桶已满,多余的令牌会丢弃。
- 请求处理:每当有请求到来时,系统检查桶中是否有令牌。如果有令牌,令牌被取出,允许请求通过。如果没有令牌,取决于设计要求,可以选择拒绝请求、延迟请求或返回错误。
设计令牌桶
令牌生成速率
桶的容量
令牌消耗
令牌桶算法的实现
class TokenBucket { constructor(tokenGenerationRate, bucketCapacity) { this.tokenGenerationRate = tokenGenerationRate; // 每秒生成的令牌数量 this.bucketCapacity = bucketCapacity; // 桶的最大容量 this.tokens = 0; // 当前令牌数 this.lastCheck = Date.now(); // 上次生成令牌的时间 } // 更新桶中的令牌数 updateTokens() { const now = Date.now(); const timeElapsed = (now - this.lastCheck) / 1000; // 计算时间间隔,单位为秒 // 根据时间间隔生成新的令牌 this.tokens = Math.min(this.bucketCapacity, this.tokens + timeElapsed * this.tokenGenerationRate); this.lastCheck = now; } // 尝试获取一个令牌,返回是否可以请求通过 tryConsume() { this.updateTokens(); if (this.tokens >= 1) { this.tokens -= 1; // 消耗一个令牌 return true; // 允许通过 } else { return false; // 拒绝请求 } } } // 创建一个令牌桶,令牌生成速率为每秒 10 个,桶容量为 100 个 const tokenBucket = new TokenBucket(10, 100); // 模拟请求 function handleRequest() { if (tokenBucket.tryConsume()) { console.log("请求被允许通过"); } else { console.log("请求被拒绝,令牌不足"); } } // 模拟每秒 5 次请求 setInterval(handleRequest, 200);
桶的溢出处理 当令牌桶满时,如果系统继续生成令牌,你需要决定如何处理溢出的令牌。一种常见的做法是丢弃多余的令牌,而另一种做法是让其进入一个“待处理”的队列(如果有实现延迟请求的需求)。
并发控制 在高并发环境中,可能会有多个请求同时到达,因此你需要确保令牌的操作是线程安全的。在 Node.js 中,你可以通过适当的锁机制或队列来处理并发问题。
配置缓存或会话管理
为了在多个请求之间保留令牌桶的状态,你可能需要考虑将令牌桶的状态存储在缓存中,或者与用户会话相关联。对于高并发或分布式应用,使用 Redis 或其他缓存技术是常见的做法,这样令牌桶状态能够跨多个请求和服务器实例共享。
使用 Redis 存储令牌桶状态
import Redis from 'ioredis'; const redis = new Redis(); // 连接到本地的 Redis 实例 class TokenBucket { constructor(tokenGenerationRate, bucketCapacity, userId) { this.tokenGenerationRate = tokenGenerationRate; this.bucketCapacity = bucketCapacity; this.userId = userId; // 假设每个用户都有唯一的 userId this.redisKey = `token_bucket:${userId}`; } async updateTokens() { const lastCheck = await redis.get(`${this.redisKey}:lastCheck`); const now = Date.now(); const timeElapsed = (now - lastCheck) / 1000; const currentTokens = await redis.get(`${this.redisKey}:tokens`); const tokens = Math.min(this.bucketCapacity, parseInt(currentTokens || '0') + timeElapsed * this.tokenGenerationRate); await redis.set(`${this.redisKey}:tokens`, tokens); await redis.set(`${this.redisKey}:lastCheck`, now); } async tryConsume() { await this.updateTokens(); const tokens = await redis.get(`${this.redisKey}:tokens`); if (parseInt(tokens || '0') >= 1) { await redis.decr(`${this.redisKey}:tokens`); return true; } else { return false; } } } export default TokenBucket;
在 Node.js 中直接运行 TypeScript 项目
使用 ts-node 直接运行 TypeScript 项目
安装
# ts-node 是一个可以直接在 Node.js 中执行 TypeScript 代码的工具。它会自动处理 TypeScript 的编译过程,使你无需手动运行 tsc 编译器来转换代码。 npm install ts-node typescript
配置
// tsconfig.json { "compilerOptions": { "target": "ES6", // 设置编译的目标 JavaScript 版本 "module": "CommonJS", // 设置模块解析方式 "outDir": "./dist", // 输出目录(编译后的 JavaScript 文件将保存在此目录中) "rootDir": "./src", // 输入文件目录 "strict": true, // 启用严格模式 "esModuleInterop": true, // 允许默认导入非 ECMAScript 模块 "skipLibCheck": true, // 跳过库文件检查 "forceConsistentCasingInFileNames": true }, "include": ["src/**/*.ts"], // 指定编译的 TypeScript 文件路径 "exclude": ["node_modules"] // 排除 node_modules 目录 }
直接运行
npx ts-node src/index.ts ts-node src/index.ts
使用 tsc 编译并运行
npx tsc node dist/index.js
调试 TypeScript
// launch.json { "version": "0.2.0", "configurations": [ { "type": "node", "request": "launch", "name": "Launch Program", "skipFiles": ["<node_internals>/**"], "program": "${workspaceFolder}/src/index.ts", // 指定 TypeScript 文件路径 "outFiles": ["${workspaceFolder}/dist/**/*.js"] // 指定编译后的 JS 文件路径 } ] }
TypeScript 项目的构建过程
项目结构
/my-typescript-project /src 存放 TypeScript 源代码文件 index.ts app.ts /dist 存放编译后的 JavaScript 文件 tsconfig.json TypeScript 编译配置文件 package.json npm 配置文件,定义项目的依赖和构建脚本
安装依赖
创建 tsconfig.json 配置文件
编译 TypeScript
npx tsc
打包
- 使用 Webpack(适用于浏览器端应用)
- 使用 Rollup(适用于库和小型项目)
生产构建和优化
- 压缩和优化 JavaScript:通过 Webpack、Rollup 或 Babel 对 JavaScript 文件进行压缩和优化,减少文件体积。
- 代码拆分:使用 Webpack 或 Rollup 等工具将代码拆分成多个较小的文件,按需加载,提高应用的加载速度。
- Source Maps:使用 Source Map 来在生产环境中调试 TypeScript 代码。你可以在 tsconfig.json 中启用 sourceMap 选项来生成对应的 .map 文件。
- 环境变量:在构建过程中,通过设置环境变量来调整应用的行为,例如区分开发环境和生产环境的配置。
使用 npm 脚本来自动化构建过程
TypeScript 项目的日志处理
morgan 是一个常用的 HTTP 请求日志中间件,默认情况下,它会将日志输出到控制台。为了写入到外部文件,我们可以结合 fs.createWriteStream 来存储日志到文件中,或者将其集成到更强大的日志管理工具(如 Winston 或 ELK)。
1. 基本写入日志到文件
// npm install morgan
const express = require("express");
const fs = require("fs");
const path = require("path");
const morgan = require("morgan");
const app = express();
// 创建一个写入流(append 模式)
const logStream = fs.createWriteStream(path.join(__dirname, "access.log"), {
flags: "a", // 'a' 追加模式,不会覆盖已有日志
});
// 使用 morgan 记录日志到文件
app.use(morgan("combined", { stream: logStream }));
app.get("/", (req, res) => {
res.send("Hello World!");
});
app.listen(3000, () => {
console.log("Server is running on port 3000");
});
2. 自定义日志格式
app.use(
morgan(":method :url :status :response-time ms - :res[content-length]", {
stream: logStream,
})
);
3. 按日期自动生成日志文件
// npm install rotating-file-stream
const rfs = require("rotating-file-stream");
const accessLogStream = rfs.createStream("access.log", {
interval: "1d", // 每天生成一个新日志
path: path.join(__dirname, "logs"),
});
app.use(morgan("combined", { stream: accessLogStream }));
4. 结合 Winston 进行日志管理
如果想要更高级的日志管理(比如存储到数据库或 JSON 格式存储),可以结合 winston。
// npm install winston
const winston = require("winston");
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: "logs/error.log", level: "error" }),
new winston.transports.File({ filename: "logs/combined.log" }),
],
});
app.use(
morgan("combined", {
stream: { write: (message) => logger.info(message.trim()) },
})
);
5. 发送日志到远程存储
app.use(
morgan("combined", {
stream: {
write: (message) => {
// 发送日志到远程服务(假设有日志收集 API)
fetch("http://log-server.example.com/api/logs", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ log: message.trim() }),
});
},
},
})
);
RPAC & RBAC
RPAC(Resource-based Policy Access Control) 是一种基于资源的访问控制策略,它允许资源本身定义访问权限,而不是通过角色或用户分配权限。RPAC 常用于云存储(如 AWS S3)或微服务架构中,以确保资源访问的灵活性和安全性。
1. RPAC vs RBAC
对比项 | RPAC(基于资源的访问控制) | RBAC(基于角色的访问控制) |
---|---|---|
控制点 | 资源(Resource) | 角色(Role) |
权限存储 | 资源本身存储访问策略 | 角色关联用户和权限 |
适用场景 | 云存储(AWS S3)、微服务 API 权限 | 企业内部系统,权限基于角色 |
动态性 | 更灵活,资源可自行定义访问控制 | 需要角色层级调整,较固定 |
2. RPAC 在 Node.js/NestJS 中的实现
源权限定义(模拟数据库)
// resource-policies.ts export const resourcePolicies = { 'file-123': { owner: 'user1', allowedUsers: ['user2'] }, 'file-456': { owner: 'user2', allowedUsers: ['user3'] }, };
创建 RpacGuard
import { CanActivate, ExecutionContext, Injectable, ForbiddenException } from '@nestjs/common'; import { resourcePolicies } from './resource-policies'; @Injectable() export class RpacGuard implements CanActivate { canActivate(context: ExecutionContext): boolean { const request = context.switchToHttp().getRequest(); const user = request.user; // 假设 AuthMiddleware 解析了用户身份 const fileId = request.params.id; const policy = resourcePolicies[fileId]; if (!policy) { throw new ForbiddenException('Resource not found'); } // 判断用户是否有权限访问 if (policy.owner === user.id || policy.allowedUsers.includes(user.id)) { return true; } else { throw new ForbiddenException('Access Denied'); } } }
在 Controller 里应用 RPAC
import { Controller, Get, Param, UseGuards, Req } from '@nestjs/common'; import { RpacGuard } from './rpac.guard'; @Controller('files') export class FileController { @Get(':id') @UseGuards(RpacGuard) // 使用 RPAC 保护 API getFile(@Param('id') id: string, @Req() req) { return { message: `User ${req.user.id} can access file ${id}` }; } }
用户访问示例
curl -H "Authorization: Bearer user1-token" http://localhost:3000/files/file-123
OIDC(OpenID Connect) & Oauth 2.0
OIDC(OpenID Connect) 是基于 OAuth 2.0 的身份认证协议,允许应用安全地验证用户身份,并获取用户信息。
1. OIDC vs OAuth 2.0 的区别
- OAuth 2.0 主要用于 授权,应用可以获取 access_token,用于访问 API,但无法确认用户身份。
- OIDC 主要用于 身份认证,在 OAuth 2.0 的基础上增加了 id_token,可以确认用户身份。
2. OIDC 认证流程
- 用户访问应用,应用需要确认用户身份。
- 应用将用户重定向到身份提供者(IdP),例如 Authing、Google、GitHub 等。
- 用户登录后,IdP 生成 id_token 和 access_token,并返回给应用。
- 应用解析 id_token,获取用户信息(如 sub,即用户唯一 ID)。
- 用户成功登录,应用存储 session 或 JWT,允许后续访问。
3. OIDC Token 类型
- id_token(核心):JWT 格式,包含用户身份信息,如 sub(用户 ID)、email、name 等。
- access_token:用于访问 API 资源。
- refresh_token(可选):用于获取新的 access_token,避免频繁登录。
4. OIDC 适用于哪些场景?
- 单点登录(SSO)(例如:Google 登录)
- 第三方应用认证(如 GitHub 登录到 Notion)
- 多端一致身份认证(Web、App、API 统一认证)
总结来说,OIDC 解决的是“你是谁?”的问题,而 OAuth 2.0 解决的是“你能访问什么?”的问题。
5. OAuth 2.0 授权流程(授权码模式)
用户授权:
- 用户在 Notion 中点击 “用 GitHub 登录”,Notion 重定向到 GitHub 授权页面。
- 用户输入 GitHub 账号密码,并同意授权 Notion 访问 GitHub 资料
GitHub 颁发 authorization_code(授权码), 重定向回 Notion,并携带授权码: https://notion.so/callback?code=123456
Notion 用授权码换取 access_token
POST https://github.com/login/oauth/access_token Content-Type: application/json { "client_id": "NOTION_CLIENT_ID", "client_secret": "NOTION_SECRET", "code": "123456", "redirect_uri": "https://notion.so/callback" } { "access_token": "abcd1234", "token_type": "Bearer", "expires_in": 3600 }
Notion 使用 access_token 访问 GitHub API, GitHub 返回用户信息
GET https://api.github.com/user Authorization: Bearer abcd1234
用户成功登录 Notion, Notion 获取 GitHub 账号信息,创建用户,完成 OAuth 登录。
6. OAuth 2.0 安全风险 & 解决方案
- 令牌泄露(Token Leakage): 使用 HTTPS,短时效 access_token + Refresh Token,HttpOnly Cookie 存储 Token
- CSRF(跨站请求伪造): 使用 state 参数校验授权请求 state参数是一种用于防止CSRF攻击的机制。它是一个随机生成的字符串,用于确保请求来自于客户端本身,而不是来自于其他网站。state参数通常与请求一起发送,并在服务器端进行验证。如果state参数不匹配,则表示请求可能是恶意的,服务器可以拒绝处理该请求。
- 重放攻击(Replay Attack): 使用 nonce,在 id_token 中加入唯一随机数
- XSS(跨站脚本攻击) 窃取 Token: 使用 HttpOnly + Secure Cookie,避免 localStorage 存储 Token
7. OAuth 2.0 + OIDC 的最佳实践
- 使用 authorization_code 模式(更安全,避免 Token 泄露)。
- access_token 用于 API 访问,id_token 用于用户认证。
- 存储 Token 时使用 HttpOnly + Secure Cookie,避免 XSS 窃取。
- 启用 PKCE(Proof Key for Code Exchange)防止授权码被劫持。
- 在 id_token 中使用 nonce 防止重放攻击。
- 使用 scope=openid profile email 以获取 OIDC 认证信息。
8. 多个微服务如何共享 Authing 认证信息?
- JWT + Gateway(API Gateway 负责解析 JWT,微服务不需要重复验证)
- Authing Webhook(同步用户状态,如封禁、权限变更)
- Redis 共享 Session(存储 Token 解析结果,减少重复请求)
9. 如何使用 Redis + Authing 优化身份认证?
- 存储 Token 解析结果,减少对 Authing 的 API 请求
- 限制 Token 请求频率,防止 DDoS 攻击
- 缓存用户权限信息,减少数据库查询
10. 如果 Authing 宕机了,你的系统如何保证部分功能可用?
- 本地缓存 Token 解析结果(Redis + JWT)
- 降级策略:允许已登录用户继续使用系统,但限制高敏感操作(如支付)
- 预请求机制:提前获取 Authing 用户数据,避免实时依赖
令牌泄露和重放攻击
令牌泄露是指攻击者窃取了用户的 access_token 或 id_token,然后冒充合法用户访问受保护的 API 或资源。
如何防止令牌泄露?
- 使用 HTTPS 传输所有令牌数据,防止中间人攻击(MITM)。
- 使用 HttpOnly + Secure Cookie 存储 access_token,避免 XSS 窃取。
- 使用短时效 access_token(如 15 分钟)+ Refresh Token 机制,即使令牌被盗,攻击者可利用的时间也有限。
- 避免将 access_token 放在 URL 参数中,改用 Authorization: Bearer token 方式传输。
- 限制 Token 作用范围(Scope),比如用户只允许访问自己的数据,避免被滥用。
- 启用 Token 绑定 IP / 设备,如果 access_token 在新的 IP 或设备上使用,强制重新认证。
重放攻击(Replay Attack)是指攻击者截获用户的请求(包括 access_token),然后在稍后重复发送相同的请求,以冒充用户。
如何防止重放攻击?
- 使用一次性 Token(Nonce):在请求中加入一个随机数 nonce,服务器检查是否被重复使用。
- 使用 iat(签发时间)+ 过期时间检查:如果 Token 过期,则拒绝请求。
- Token 绑定 IP 或设备:如果令牌突然在新的 IP 地址或设备上使用,要求用户重新登录。
- 使用签名请求(HMAC):在每个请求中加入 HMAC 签名,确保请求未被篡改。
- 使用 Webhooks / 事件监听:检测用户在多个 IP 地址上短时间内使用相同 access_token,自动吊销令牌。
Webhook & 使用 Webhook 触发 Redis 缓存更新的场景
Webhook 是一种基于 HTTP 回调的机制,当某个事件发生时,服务器会主动向指定的 URL 发送请求,通知外部系统,而不是外部系统定期轮询
使用 Webhook 触发 Redis 缓存更新的场景
- 用户数据变更(Authing/Clerk)
- 当用户资料(如头像、用户名、权限)发生变更时,Authing/Clerk 触发 Webhook,通知你的服务器更新缓存。
- 你的服务器收到 Webhook 后,更新 Redis 中的用户信息。
- 订单状态变更(支付系统)
- 用户付款成功后,支付平台(如 Stripe)会发送 Webhook,通知你的服务器订单已支付。
- 你的服务器收到 Webhook 后,更新 Redis 中的订单状态,避免重复查询数据库。
- 库存更新(电商系统)
- 商品库存变更时,供应链管理系统发送 Webhook,通知你的服务器更新缓存。
- 服务器更新 Redis,确保用户看到最新的库存数据。
如何结合 Webhook + Redis 缓存更新?
注册 Webhook, 在 Authing/Clerk/Stripe 等服务中,注册一个 Webhook 监听 URL
https://yourapp.com/webhook
后端处理 Webhook 并更新 Redis
在你的 Node.js 服务器上,使用 Express 或 Hono 处理 Webhook。
import { Hono } from 'hono' import { Redis } from 'ioredis' const app = new Hono() const redis = new Redis('redis://localhost:6379') app.post('/webhook', async (c) => { const body = await c.req.json() // 确保 Webhook 事件有效 if (!body.event) { return c.json({ error: 'Invalid Webhook' }, 400) } if (body.event === 'user.updated') { const userId = body.data.id const newUserData = body.data // 更新 Redis 缓存 await redis.set(`user:${userId}`, JSON.stringify(newUserData)) console.log(`用户 ${userId} 数据已更新到 Redis`) } return c.json({ message: 'Webhook received' }) }) app.fire()
设置缓存过期时间 & 失效策略
await redis.set(`user:${userId}`, JSON.stringify(newUserData), 'EX', 3600) // 1 小时后过期
- 缓存预热 + Webhook 缓存不存在时,查询数据库,同时监听 Webhook 进行主动更新。
设计一个高并发商品抢购系统(如秒杀)
架构设计
组件 | 作用 |
---|---|
Redis | 作为缓存,存储库存、用户抢购状态,减少数据库压力 |
MySQL | 持久化存储订单、用户购买记录 |
消息队列(RabbitMQ/Kafka) | 处理高并发下的订单异步写入 MySQL,削峰 |
分布式锁(Redlock/ETCD/ZooKeeper) | 防止超卖,确保库存正确更新 |
CDN + API 网关 | 缓解请求压力,过滤非法流量 |
限流(Redis+令牌桶算法) | 防止恶意刷单,提高系统稳定性 |
日志 & 监控(ELK、Prometheus) | 监控抢购过程,发现异常 |
系统流程
- 预热阶段
- 商品信息提前存入 Redis
- 库存信息存入 Redis(key: stock:product_id)
- 限流策略(如令牌桶) 控制请求速率
- 用户请求抢购
请求流量控制
- CDN + API 网关拦截
- Redis 限流(防止恶意请求)
- 用户鉴权(防止重复下单)
库存检查
const stock = await redis.decr('stock:1001') // 先减库存 if (stock < 0) { await redis.incr('stock:1001') // 还原库存 return { error: '商品已售罄' } }
订单处理
- 异步队列(防止超卖)
- Redis 成功扣减库存后,不直接写入 MySQL
- 订单请求发送到消息队列(Kafka/RabbitMQ)
- 订单服务消费队列消息,异步写入 MySQL
订单写入 MySQL
消息队列消费者处理订单
数据库事务:确保库存 & 订单一致
BEGIN; UPDATE product_stock SET stock = stock - 1 WHERE product_id = 1001 AND stock > 0; INSERT INTO orders (user_id, product_id) VALUES (123, 1001); COMMIT;
防止超卖的 SQL 条件
UPDATE product_stock SET stock = stock - 1 WHERE product_id = 1001 AND stock > 0;
支付 & 订单确认
- 订单状态变更后,发送 Webhook 更新 Redis
- 订单超时未支付(设置 Redis 过期时间),超时后自动释放库存
关键优化
防止超卖
Redis 预减库存
MySQL 更新时加 WHERE stock > 0 条件
分布式锁(Redlock)
const lock = await redlock.lock('lock:1001', 1000) // 1000ms 过期 if (!lock) return { error: '请稍后重试' }
限流
令牌桶算法(Redis + Lua)
用户防刷
const userKey = `limit:user:${userId}` const count = await redis.incr(userKey) if (count > 5) return { error: '请求过于频繁' }
日志 & 监控
- ELK(ElasticSearch + Kibana)
- Prometheus 监控请求 & 延迟
代码细节
生产者
// producer.ts import { Kafka } from 'kafkajs' import { Redis } from 'ioredis' const kafka = new Kafka({ clientId: 'seckill', brokers: ['localhost:9092'], // Kafka 服务器地址 }) const producer = kafka.producer() const redis = new Redis('redis://localhost:6379') async function createOrder(userId: string, productId: string) { await producer.connect() // 预扣 Redis 库存 const stock = await redis.decr(`stock:${productId}`) if (stock < 0) { await redis.incr(`stock:${productId}`) // 还原库存 return { error: '商品已售罄' } } // 将订单消息推送到 Kafka await producer.send({ topic: 'order', messages: [{ key: userId, value: JSON.stringify({ userId, productId }) }], }) console.log(`订单请求已推送 Kafka:用户 ${userId} 购买商品 ${productId}`) await producer.disconnect() return { success: '抢购成功,订单处理中' } } // 示例调用 createOrder('user_123', 'product_1001')
消费者
// consumer.ts import { Kafka } from 'kafkajs' import mysql from 'mysql2/promise' const kafka = new Kafka({ clientId: 'seckill', brokers: ['localhost:9092'], }) const consumer = kafka.consumer({ groupId: 'order-group' }) // 连接 MySQL const db = await mysql.createConnection({ host: 'localhost', user: 'root', password: 'password', database: 'seckill', }) async function processOrder() { await consumer.connect() await consumer.subscribe({ topic: 'order', fromBeginning: false }) await consumer.run({ eachMessage: async ({ message }) => { const { userId, productId } = JSON.parse(message.value.toString()) console.log(`处理订单:用户 ${userId} 购买商品 ${productId}`) try { // 使用 MySQL 事务确保订单 & 库存一致 await db.beginTransaction() const [rows] = await db.execute( 'UPDATE product_stock SET stock = stock - 1 WHERE product_id = ? AND stock > 0', [productId] ) if (rows.affectedRows === 0) { console.log(`库存不足,回滚订单:用户 ${userId} 购买 ${productId}`) await db.rollback() return } await db.execute( 'INSERT INTO orders (user_id, product_id) VALUES (?, ?)', [userId, productId] ) await db.commit() console.log(`✅ 订单成功写入 MySQL:用户 ${userId} 购买 ${productId}`) } catch (error) { console.error('❌ 订单写入失败:', error) await db.rollback() } }, }) } // 启动消费者 processOrder()
index.ts整合
// index.ts import Redlock from 'redlock' import { Kafka } from 'kafkajs' import { Redis } from 'ioredis' import mysql from 'mysql2/promise' // Kafka 配置 const kafka = new Kafka({ clientId: 'seckill', brokers: ['localhost:9092'], // Kafka 服务器地址 }) const producer = kafka.producer() const consumer = kafka.consumer({ groupId: 'order-group' }) const redis = new Redis('redis://localhost:6379') // 初始化 Redlock(支持多个 Redis 实例,提高可靠性) const redlock = new Redlock([redis], { driftFactor: 0.01, // 时钟漂移因子 retryCount: 3, // 最大重试次数 retryDelay: 200, // 重试间隔 retryJitter: 50 // 随机抖动 }) // 连接 MySQL const dbConfig = { host: 'localhost', user: 'root', password: 'password', database: 'seckill', } async function start() { try { console.log('🚀 启动秒杀系统...') // 1️⃣ 连接 Kafka 生产者 await producer.connect() console.log('✅ Kafka 生产者已连接') // 2️⃣ 连接 Kafka 消费者 await consumer.connect() await consumer.subscribe({ topic: 'order', fromBeginning: false }) console.log('✅ Kafka 消费者已连接') // 3️⃣ 连接 Redis await redis.set('stock:1001', 500, 'EX', 3600) // 预热库存 console.log('✅ Redis 连接成功,库存初始化完成') // 4️⃣ 连接 MySQL const db = await mysql.createConnection(dbConfig) console.log('✅ MySQL 连接成功') // 5️⃣ 监听订单消息(消费者) consumer.run({ eachMessage: async ({ message }) => { const { userId, productId } = JSON.parse(message.value.toString()) console.log(`📦 处理订单:用户 ${userId} 购买商品 ${productId}`) try { await db.beginTransaction() const [rows] = await db.execute( 'UPDATE product_stock SET stock = stock - 1 WHERE product_id = ? AND stock > 0', [productId] ) if (rows.affectedRows === 0) { console.log(`⚠️ 库存不足,回滚订单:用户 ${userId} 购买 ${productId}`) await db.rollback() return } await db.execute( 'INSERT INTO orders (user_id, product_id) VALUES (?, ?)', [userId, productId] ) await db.commit() console.log(`✅ 订单成功写入 MySQL:用户 ${userId} 购买 ${productId}`) } catch (error) { console.error('❌ 订单写入失败:', error) await db.rollback() } }, }) // 6️⃣ 启动 HTTP 接口(用于模拟抢购请求) const express = require('express') const app = express() app.use(express.json()) app.post('/seckill', async (req, res) => { const { userId, productId } = req.body const lockKey = `lock:stock:${productId}` try{ const lock = await redlock.acquire([lockKey], 3000) console.log(`🔒 用户 ${userId} 获取锁成功`) // 预扣 Redis 库存 const stock = await redis.decr(`stock:${productId}`) if (stock < 0) { await redis.incr(`stock:${productId}`) return res.status(400).json({ error: '商品已售罄' }) } // 推送订单到 Kafka await producer.send({ topic: 'order', messages: [{ key: userId, value: JSON.stringify({ userId, productId }) }], }) // 释放锁 await lock.release() console.log(`🔓 用户 ${userId} 释放锁`) res.json({ success: '抢购成功,订单处理中' }) }catch (error) { console.error('❌ 获取锁失败:', error) return { error: '系统繁忙,请稍后重试' } } }) app.listen(3000, () => console.log('✅ 秒杀系统 API 运行在 http://localhost:3000')) } catch (error) { console.error('❌ 系统启动失败:', error) } } // 启动系统 start()
Express 常见中间件及示例
Express 中间件用于拦截 HTTP 请求,进行日志记录、身份验证、错误处理等操作。以下是常见的中间件类型
内置
- express.json() 解析 JSON
- express.urlencoded() 解析 URL 编码数据
- express.static() 静态文件托管
第三方
- cors 解决跨域
- morgan 记录日志
- helmet 安全防护
- compression 启用 Gzip 压缩
自定义中间件
- 记录请求时间
- 认证中间件
- 错误处理
微服务
1. 创建 Express 项目
require("dotenv").config();
const express = require("express");
const cors = require("cors");
const { auth } = require("./auth");
const app = express();
app.use(cors());
app.use(express.json());
// 认证中间件
app.use(auth);
app.get("/", (req, res) => res.send("Hello World"));
app.post("/process", async (req, res) => {
const lock = await lockResource("process-lock");
try {
// 业务逻辑
res.json({ message: "Processing completed" });
} finally {
await lock.unlock();
}
});
app.listen(3000, () => console.log("Server running on port 3000"));
2. 集成 NextAuth 进行认证
配置NextAuth, 支持accessToken, refreshToken
// auth.js const { NextAuth } = require("next-auth"); const Providers = require("next-auth/providers"); const options = { providers: [ Providers.Credentials({ name: "Credentials", authorize: async (credentials) => { if (credentials.username === "admin" && credentials.password === "password") { return { id: 1, name: "Admin" }; } throw new Error("Invalid credentials"); }, }), ], callbacks: { async jwt({ token, user }) { if (user) { token.accessToken = jwt.sign( { sub: user.id, name: user.name }, process.env.JWT_SECRET!, { expiresIn: "15m" } ); token.refreshToken = jwt.sign( { sub: user.id, name: user.name }, process.env.REFRESH_SECRET!, { expiresIn: "30d" } ); token.accessTokenExpires = Date.now() + 15 * 60 * 1000; } // 如果 accessToken 过期,则刷新 if (Date.now() > token.accessTokenExpires) { console.log("Refreshing access token..."); try { const newAccessToken = jwt.sign( { sub: token.sub }, process.env.JWT_SECRET!, { expiresIn: "15m" } ); return { ...token, accessToken: newAccessToken, accessTokenExpires: Date.now() + 15 * 60 * 1000, }; } catch (error) { console.error("Refresh token failed", error); return { ...token, error: "RefreshTokenError" }; } } return token; }, async session({ session, token }) { session.accessToken = token.accessToken; session.refreshToken = token.refreshToken; return session; }, }, secret: process.env.AUTH_SECRET, }; const auth = (req, res, next) => { NextAuth(req, res, options) .then((session) => { req.user = session?.user; next(); }) .catch(() => res.status(401).json({ error: "Unauthorized" })); }; module.exports = { auth };
微服务验证
中间件验证 access_token
// authMiddleware.js const jwt = require("jsonwebtoken"); const authMiddleware = (req, res, next) => { const authHeader = req.headers.authorization; if (!authHeader || !authHeader.startsWith("Bearer ")) { return res.status(401).json({ error: "Unauthorized" }); } const token = authHeader.split(" ")[1]; try { const decoded = jwt.verify(token, process.env.JWT_SECRET); req.user = decoded; next(); } catch (err) { if (err.name === "TokenExpiredError") { return res.status(401).json({ error: "Access token expired" }); } return res.status(403).json({ error: "Invalid or expired token" }); } }; module.exports = authMiddleware;
微服务 refresh-token API
app.post("/refresh-token", (req, res) => { const { refreshToken } = req.body; if (!refreshToken) { return res.status(401).json({ error: "No refresh token provided" }); } try { const decoded = jwt.verify(refreshToken, process.env.REFRESH_SECRET); const newAccessToken = jwt.sign( { sub: decoded.sub }, process.env.JWT_SECRET, { expiresIn: "15m" } ); return res.json({ accessToken: newAccessToken }); } catch (err) { return res.status(403).json({ error: "Invalid or expired refresh token" }); } });
使用
前端使用accessToken
const session = await getSession(); const accessToken = session?.accessToken;
后端使用
// server.js const authMiddleware = require("./authMiddleware"); app.get("/protected", authMiddleware, (req, res) => { res.json({ message: "This is a protected route", user: req.user }); });
前端刷新accessToken
async function fetchWithAuth(url: string, options = {}) { let session = await getSession(); const response = await fetch(url, { ...options, headers: { ...options.headers, Authorization: `Bearer ${session?.accessToken}`, }, }); if (response.status === 401) { console.log("Access token expired, refreshing..."); const refreshResponse = await fetch("/api/auth/refresh-token", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ refreshToken: session?.refreshToken }), }); if (refreshResponse.ok) { const { accessToken } = await refreshResponse.json(); session = await updateSessionToken(accessToken); return fetch(url, { ...options, headers: { ...options.headers, Authorization: `Bearer ${accessToken}`, }, }); } } return response; } async function updateSessionToken(newAccessToken: string) { const session = await getSession(); session.accessToken = newAccessToken; return session; }
3. 使用 Redis 缓存
//redis.js
const Redis = require("ioredis");
const redis = new Redis(process.env.REDIS_URL);
const cacheMiddleware = async (req, res, next) => {
const key = `cache:${req.originalUrl}`;
const cached = await redis.get(key);
if (cached) {
return res.json(JSON.parse(cached));
}
res.sendResponse = res.json;
res.json = (body) => {
redis.setex(key, 3600, JSON.stringify(body)); // 1-hour cache
res.sendResponse(body);
};
next();
};
module.exports = { redis, cacheMiddleware };
4. 实现 Redis 红锁
//redis.js
const Redlock = require("redlock");
const redlock = new Redlock([redis], {
driftFactor: 0.01,
retryCount: 3,
retryDelay: 200,
retryJitter: 200,
});
const lockResource = async (key, ttl = 5000) => {
return await redlock.lock(key, ttl);
};
module.exports = { redis, cacheMiddleware, lockResource };
5. ELK 记录日志
安装 pino
npm install pino pino-http
在 server.js 配置日志
// server.js const pino = require("pino"); const pinoHttp = require("pino-http"); const logger = pino(); app.use(pinoHttp({ logger })); module.exports = { logger };
docker中添加elk
logstash配置
// logstash.conf input { tcp { port => 5044 codec => json } } output { elasticsearch { hosts => ["http://elasticsearch:9200"] index => "express-logs" } stdout { codec => rubydebug } }
6. Docker 部署
创建 Dockerfile
FROM node:18 WORKDIR /app COPY package*.json ./ RUN npm install COPY . . EXPOSE 3000 CMD ["node", "server.js"]
创建 docker-compose.yml
version: "3" services: app: build: . ports: - "3000:3000" environment: - REDIS_URL=redis://redis:6379 depends_on: - redis redis: image: redis:latest ports: - "6379:6379" elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.5.1 environment: - discovery.type=single-node ports: - "9200:9200" logstash: image: docker.elastic.co/logstash/logstash:8.5.1 volumes: - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf depends_on: - elasticsearch ports: - "5044:5044" kibana: image: docker.elastic.co/kibana/kibana:8.5.1 ports: - "5601:5601" depends_on: - elasticsearch
7. 设立请求阈值报警
使用 express-rate-limit
npm install express-rate-limit
设置
// server.js const rateLimit = require("express-rate-limit"); const limiter = rateLimit({ windowMs: 60 * 1000, // 1 minute max: 100, // Limit each IP to 100 requests per minute message: "Too many requests, please try again later", }); app.use(limiter);
配合ELK报警
if (req.rateLimit.remaining === 0) { logger.warn("High request traffic detected!"); }
8. 运行项目
docker-compose up -d
9. 其他讨论
- 在哪里实现 refresh_token 逻辑
- 前端 + NextAuth 服务器管理 Refresh Token(推荐)
- 你已经有一个 NextAuth 服务器,它负责用户认证,而微服务主要是数据 API,不希望额外存储 refresh_token
- 微服务管理 Refresh Token(适用于多客户端)
- 前端 + NextAuth 服务器管理 Refresh Token(推荐)