Published on

面试 Web 后端

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

web后端

1. 使用redis做缓存的功能

Redis 缓存的工作原理:

  1. 缓存命中(Cache Hit):当请求的数据存在于 Redis 中时,就直接返回 Redis 的缓存数据,这样就能避免去数据库查询,提高响应速度。
  2. 缓存未命中(Cache Miss):当请求的数据不在 Redis 中时,系统会查询数据库获取数据,并将数据存入 Redis,以便下次访问时能够命中缓存。
  3. 缓存失效(Cache Expiration):缓存中的数据设置了过期时间,到了过期时间后,数据就会从 Redis 中删除。这个机制可以有效避免缓存的“脏数据”问题。

关键点

  • 缓存过期:通过 setex 方法可以为缓存设置过期时间,防止缓存数据长期存在导致不一致。
  • 缓存穿透:如果查询的内容非常频繁,缓存的数据被清除后,可能会导致数据库压力增大。可以通过布隆过滤器等方式来避免缓存穿透。
  • 缓存雪崩:如果大量缓存数据在同一时刻过期,可能导致数据库压力增大。可以通过随机设置过期时间来分散过期时间,减少瞬间压力。
  • 缓存击穿: 热点数据实效, 导致同一请求高并发访问数据库. 预热缓存, 定期更新

2. 消息队列

Redis 和 RabbitMQ 都是常见的消息中间件,但它们的设计目标、功能和适用场景有所不同。下面是它们的对比分析:

  1. redis 主要用于高速读写、数据缓存、实时数据处理、临时存储等场景。
  2. RabbitMQ 主要用于高可靠性、消息队列、事件驱动架构、任务调度等场景。
特性RedisRabbitMQ
设计目标高速缓存、临时数据存储、Pub/Sub消息队列、任务调度、异步消息传递
消息持久化不持久化(默认),可以持久化数据支持消息持久化,确保消息不丢失
消息确认不支持消息确认支持消息确认(ACK)
性能极高的性能,低延迟,高吞吐量性能较低,但高可靠性
可靠性消息丢失风险大消息可靠性高,支持消息重试和确认
适用场景高并发、实时数据、缓存、Pub/Sub任务调度、异步处理、事件驱动架构
扩展性支持水平扩展(Redis Cluster)支持集群模式和高可用队列

3. 关系型数据库与非关系型数据库

非关系型数据库: 数据结构灵活(如爬虫数据), 简单数据查询, 不涉及复杂查询和连接操作

4. JWT(JSON Web Token)

JWT(JSON Web Token)包含三部分:

  1. 头部(Header)

    • 头部通常由两部分组成:

    • alg:签名算法,常见的有HS256、RS256等。

    • typ:令牌类型,通常是JWT。

        { 
          "alg": "HS256",
          "typ": "JWT"
        }
      
  2. 载荷(Payload)

  3. 签名(Signature)签名部分用于验证令牌的完整性以及身份验证。签名是使用头部中的alg指定的算法,通过对头部和载荷进行编码,并与密钥一起计算得到的。其目的是确保JWT在传输过程中未被篡改。

5. JWT 和 Session 存储的方式和机制

简而言之:

  • JWT 存储在客户端,是无状态的,适合于分布式、无状态的应用。
  • Session 存储在服务器端,是有状态的,适合于单一应用或需要维护会话的场景。

6. 缓存设置策略, 避免缓存击穿

  1. 使用合适的缓存键
  2. 缓存空数据(缓存空结果)
  3. 使用 Bloom Filter(布隆过滤器)
  4. 限制查询请求频率
  5. 缓存数据的预处理和策略: 按需缓存, 定时缓存, 强制刷新缓存
  6. 设置适当的缓存过期时间
  7. 缓存更新机制:数据变更时,主动清除或更新相关缓存。
  8. 异常请求监控和报警

7. 对比prisma/drizzle优势, 及便利之处

Prisma 和 Drizzle 都是非常流行的数据库工具,它们各自在数据库操作方面有不同的优势和便利之处。

  1. Prisma
    1. 优势
      • 类型安全
      • 生成数据库客户端
      • 强大的数据库迁移功能
      • 简洁且一致的 API
      • 广泛的数据库支持
      • 强大的文档和社区支持
    2. 不足
      • 性能开销
      • 复杂查询的局限性
  2. Drizzle
    1. 优势
      • 轻量级且快速
      • TypeScript
      • 极简的查询接口
      • 灵活的 SQL 生成
      • 更精细的控制
    2. 不足
      • 功能较少
      • 社区较小

8. 接口幂等性

接口的 幂等性(Idempotency)是指无论一个操作被执行多少次,产生的副作用都相同。

  1. 为什么幂等性很重要?
    • 防止重复操作
    • 提高系统稳定性
    • 增强用户体验
  2. 幂等性的常见示例, GET/DELETE/PUT设计为幂等性,
  3. POST确保幂等
    • 客户端生成唯一标识符并附带在请求头中, 服务端检查X-Idempotency-Key, 如果存在则返回原来的结果
    • 数据库唯一约束
    • 状态管理

9. 消息队列

  1. 消息队列的使用场景
    1. 异步任务处理(airflow任务启动/aigc内容获取)
    2. 流量削峰(避免恶意攻击)
    3. 系统可靠性(确保任务不丢失)
  2. 使用场景
    1. RabbitMQ

      const amqp = require('amqplib');
      
      async function sendMessage() {
        const conn = await amqp.connect('amqp://localhost');
        const channel = await conn.createChannel();
        const queue = 'task_queue';
      
        // 发送消息到队列
        const msg = 'Hello, RabbitMQ!';
        await channel.assertQueue(queue, { durable: true });
        channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
      
        console.log(" [x] Sent %s", msg);
      
        // 关闭连接
        setTimeout(() => { 
          channel.close();
          conn.close();
        }, 500);
      }
      
      sendMessage().catch(console.error);
      
      // 消费者
      const amqp = require('amqplib');
      
      async function receiveMessage() {
        const conn = await amqp.connect('amqp://localhost');
        const channel = await conn.createChannel();
        const queue = 'task_queue';
      
        await channel.assertQueue(queue, { durable: true });
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      
        channel.consume(queue, (msg) => {
          if (msg !== null) {
            console.log(" [x] Received %s", msg.content.toString());
            channel.ack(msg);  // 确认消息
          }
        }, { noAck: false });
      }
      
      receiveMessage().catch(console.error);
      
    2. kafka

      const { Kafka } = require('kafkajs');
      
      const kafka = new Kafka({
        clientId: 'my-app',
        brokers: ['localhost:9092'],
      });
      
      const producer = kafka.producer();
      
      async function sendMessage() {
        await producer.connect();
        await producer.send({
          topic: 'my-topic',
          messages: [{ value: 'Hello Kafka!' }],
        });
        console.log('Message sent to Kafka');
        await producer.disconnect();
      }
      
      sendMessage().catch(console.error);
      // 消费者
      const { Kafka } = require('kafkajs');
      
      const kafka = new Kafka({
        clientId: 'my-app',
        brokers: ['localhost:9092'],
      });
      
      const consumer = kafka.consumer({ groupId: 'my-group' });
      
      async function consumeMessage() {
        await consumer.connect();
        await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
      
        await consumer.run({
          eachMessage: async ({ topic, partition, message }) => {
            console.log(`Received message: ${message.value.toString()}`);
          },
        });
      }
      
      consumeMessage().catch(console.error);
      

10. 数据库事务概述

数据库事务(Database Transaction)是一组操作的集合,这些操作要么全部执行成功,要么全部撤销。事务确保了数据的一致性、完整性、隔离性和持久性(ACID原则)

  1. 使用 Prisma 实现事务控制

    const { PrismaClient } = require('@prisma/client');
    const prisma = new PrismaClient();
    
    async function runTransaction() {
      const transaction = await prisma.$transaction(async (prisma) => {
        try {
          // 执行多个数据库操作
          const user = await prisma.user.create({
            data: {
              name: 'Alice',
              email: 'alice@example.com',
            },
          });
    
          const post = await prisma.post.create({
            data: {
              title: 'Alice Post',
              content: 'This is a post by Alice.',
              authorId: user.id,
            },
          });
    
          // 如果某个操作失败,则手动抛出错误,回滚所有操作
          if (!user || !post) {
            throw new Error('Transaction failed!');
          }
    
          return post; // 成功返回结果
        } catch (error) {
          console.error(error);
          throw error; // 抛出错误触发回滚
        }
      });
    
      console.log('Transaction successful:', transaction);
    }
    
    runTransaction().catch((error) => {
      console.log('Transaction failed and rolled back:', error);
    });
    

11. Redis集群

  1. 分片
  2. 主从, 主写, 从读
  3. 集群的高可用, 主故障, 从切换

12. Redis 分布式锁

Redis 分布式锁的工作原理

Redis 分布式锁通常是基于 Redis 的 SETNX 命令(set if not exists)来实现的。这个命令会在键不存在时设置键值,并返回设置成功的结果。利用这一特性,可以创建一个锁机制:

  1. 客户端尝试向 Redis 设置一个唯一的锁标识(通常是一个带有过期时间的键)。
  2. 如果该键不存在,表示锁没有被占用,客户端成功获取锁并开始执行关键业务逻辑。
  3. 如果该键已经存在,表示锁已被占用,客户端无法获得锁,通常会进行重试或返回失败。
  4. 为了防止锁失效后未能及时释放造成死锁,需要设置锁的 过期时间,使得如果客户端在持有锁时崩溃或超时,锁会自动释放。

实现分布式锁的基本步骤

  1. 锁的获取(SETNX + EXPIRE)

    使用 SETNX 命令确保只在锁键不存在时设置锁值,同时可以通过 EXPIRE 命令设置锁的过期时间,防止死锁。

  2. 锁的释放

    释放锁的操作需要确保只有持有锁的客户端才能释放锁。通常可以通过对比 lock_key 的值来确保这一点

  3. 锁的获取失败处理

    当客户端无法获取到锁时,需要根据业务需求选择合适的重试策略:

    • 立即失败:如果不能获取到锁,则直接返回失败信息。
    • 重试机制:客户端在获取不到锁时可以设置重试间隔,并循环尝试一段时间后再返回失败。
    • 队列等待:在一些场景下,可以将请求放入等待队列中,让客户端按顺序等待锁的释放。
  4. 锁的过期处理

    为了避免客户端崩溃或长时间占用锁导致死锁,必须设置锁的过期时间。Redis 允许使用 EXPIRE 或者在 SET 命令中使用 PX 参数来设置锁的超时时间。

Redis 分布式锁的优化和改进

  1. 红锁(RedLock): Redis 官方文档和许多分布式锁实现库(如 Redisson)推荐使用 红锁算法,它能在多个 Redis 实例之间提供更高的可靠性。红锁的基本思想是,在多个独立的 Redis 实例上分别尝试获得锁,只有当多数 Redis 实例都获得锁时,才认为客户端成功获取锁。这能够有效避免 Redis 单点故障的影响。

    算法:假设有 N 个 Redis 实例,客户端需要在大多数实例(例如 N/2+1 个实例)上获得锁才能认为获取成功。如果其中有一个实例不可用,仍然能通过其他实例保证锁的正确性。

  2. 锁的可重入性: 默认的 Redis 锁是不支持可重入的,这意味着如果同一个客户端在持有锁的情况下再次请求获取锁,可能会导致死锁。在某些场景下,你可以使用可重入的锁实现,或者通过扩展 Redis 锁的逻辑来实现。

  3. 锁的有效期与延续: 锁的有效期可以设置得比较长,但如果某些操作超出了锁的有效期,锁会被意外释放,导致并发问题。为了防止这种情况,通常可以通过定期刷新锁的过期时间(续期)来保证锁不会意外过期。

  4. 性能考虑: Redis 的分布式锁通过网络请求实现,所以在高并发的情况下可能会带来一定的性能压力。可以通过减少锁的持有时间、增加 Redis 集群的负载均衡、优化客户端的请求频率等方式来缓解性能问题。

令牌桶设计

令牌桶(Token Bucket)是一种常用的流量控制算法,广泛用于限制访问频率或速率控制。在限制访问频率时,令牌桶算法的设计可以根据不同的需求进行调整,但它的基本原理和设计理念是相对固定的。

令牌桶算法的基本概念

  1. 令牌桶:一个桶,里面包含若干个“令牌”,每个令牌表示一个允许通过的请求。当桶中有令牌时,请求就可以通过,否则就被拒绝或延迟。
  2. 令牌产生速率:系统以固定速率将令牌放入桶中,令牌的生成速度通常是以“令牌/秒”来表示。即每单位时间(比如每秒)会放入一定数量的令牌。
  3. 桶的容量:桶有最大容量限制,一旦桶满,新的令牌会被丢弃。这样可以防止过多的请求积压,避免令牌超负荷。
  4. 请求与令牌消耗:每当有一个请求到达时,它需要从桶中取出一个令牌。如果桶中有令牌,令牌被消耗,请求允许通过。如果没有令牌,则请求被拒绝或被延迟。

令牌桶的工作原理

  • 令牌生成:系统以恒定的速率(例如每秒生成 10 个令牌)生成令牌,并放入桶中。如果桶已满,多余的令牌会丢弃。
  • 请求处理:每当有请求到来时,系统检查桶中是否有令牌。如果有令牌,令牌被取出,允许请求通过。如果没有令牌,取决于设计要求,可以选择拒绝请求、延迟请求或返回错误。

设计令牌桶

  1. 令牌生成速率

  2. 桶的容量

  3. 令牌消耗

  4. 令牌桶算法的实现

    class TokenBucket {
      constructor(tokenGenerationRate, bucketCapacity) {
        this.tokenGenerationRate = tokenGenerationRate; // 每秒生成的令牌数量
        this.bucketCapacity = bucketCapacity; // 桶的最大容量
        this.tokens = 0; // 当前令牌数
        this.lastCheck = Date.now(); // 上次生成令牌的时间
      }
    
      // 更新桶中的令牌数
      updateTokens() {
        const now = Date.now();
        const timeElapsed = (now - this.lastCheck) / 1000; // 计算时间间隔,单位为秒
    
        // 根据时间间隔生成新的令牌
        this.tokens = Math.min(this.bucketCapacity, this.tokens + timeElapsed * this.tokenGenerationRate);
        this.lastCheck = now;
      }
    
      // 尝试获取一个令牌,返回是否可以请求通过
      tryConsume() {
        this.updateTokens();
        
        if (this.tokens >= 1) {
          this.tokens -= 1; // 消耗一个令牌
          return true; // 允许通过
        } else {
          return false; // 拒绝请求
        }
      }
    }
    
    // 创建一个令牌桶,令牌生成速率为每秒 10 个,桶容量为 100 个
    const tokenBucket = new TokenBucket(10, 100);
    
    // 模拟请求
    function handleRequest() {
      if (tokenBucket.tryConsume()) {
        console.log("请求被允许通过");
      } else {
        console.log("请求被拒绝,令牌不足");
      }
    }
    
    // 模拟每秒 5 次请求
    setInterval(handleRequest, 200);
    
  5. 桶的溢出处理 当令牌桶满时,如果系统继续生成令牌,你需要决定如何处理溢出的令牌。一种常见的做法是丢弃多余的令牌,而另一种做法是让其进入一个“待处理”的队列(如果有实现延迟请求的需求)。

  6. 并发控制 在高并发环境中,可能会有多个请求同时到达,因此你需要确保令牌的操作是线程安全的。在 Node.js 中,你可以通过适当的锁机制或队列来处理并发问题。

配置缓存或会话管理

为了在多个请求之间保留令牌桶的状态,你可能需要考虑将令牌桶的状态存储在缓存中,或者与用户会话相关联。对于高并发或分布式应用,使用 Redis 或其他缓存技术是常见的做法,这样令牌桶状态能够跨多个请求和服务器实例共享。

  1. 使用 Redis 存储令牌桶状态

    import Redis from 'ioredis';
    
    const redis = new Redis(); // 连接到本地的 Redis 实例
    
    class TokenBucket {
      constructor(tokenGenerationRate, bucketCapacity, userId) {
        this.tokenGenerationRate = tokenGenerationRate;
        this.bucketCapacity = bucketCapacity;
        this.userId = userId; // 假设每个用户都有唯一的 userId
        this.redisKey = `token_bucket:${userId}`;
      }
    
      async updateTokens() {
        const lastCheck = await redis.get(`${this.redisKey}:lastCheck`);
        const now = Date.now();
        const timeElapsed = (now - lastCheck) / 1000;
    
        const currentTokens = await redis.get(`${this.redisKey}:tokens`);
        const tokens = Math.min(this.bucketCapacity, parseInt(currentTokens || '0') + timeElapsed * this.tokenGenerationRate);
    
        await redis.set(`${this.redisKey}:tokens`, tokens);
        await redis.set(`${this.redisKey}:lastCheck`, now);
      }
    
      async tryConsume() {
        await this.updateTokens();
    
        const tokens = await redis.get(`${this.redisKey}:tokens`);
    
        if (parseInt(tokens || '0') >= 1) {
          await redis.decr(`${this.redisKey}:tokens`);
          return true;
        } else {
          return false;
        }
      }
    }
    
    export default TokenBucket;
    

在 Node.js 中直接运行 TypeScript 项目

  1. 使用 ts-node 直接运行 TypeScript 项目

    • 安装

      # ts-node 是一个可以直接在 Node.js 中执行 TypeScript 代码的工具。它会自动处理 TypeScript 的编译过程,使你无需手动运行 tsc 编译器来转换代码。
      npm install ts-node typescript
      
    • 配置

      // tsconfig.json
      {
        "compilerOptions": {
          "target": "ES6",                // 设置编译的目标 JavaScript 版本
          "module": "CommonJS",           // 设置模块解析方式
          "outDir": "./dist",             // 输出目录(编译后的 JavaScript 文件将保存在此目录中)
          "rootDir": "./src",             // 输入文件目录
          "strict": true,                 // 启用严格模式
          "esModuleInterop": true,        // 允许默认导入非 ECMAScript 模块
          "skipLibCheck": true,           // 跳过库文件检查
          "forceConsistentCasingInFileNames": true
        },
        "include": ["src/**/*.ts"],       // 指定编译的 TypeScript 文件路径
        "exclude": ["node_modules"]       // 排除 node_modules 目录
      }
      
    • 直接运行

      npx ts-node src/index.ts
      ts-node src/index.ts
      
  2. 使用 tsc 编译并运行

    npx tsc
    
    node dist/index.js
    
  3. 调试 TypeScript

    // launch.json
    {
      "version": "0.2.0",
      "configurations": [
        {
          "type": "node",
          "request": "launch",
          "name": "Launch Program",
          "skipFiles": ["<node_internals>/**"],
          "program": "${workspaceFolder}/src/index.ts", // 指定 TypeScript 文件路径
          "outFiles": ["${workspaceFolder}/dist/**/*.js"] // 指定编译后的 JS 文件路径
        }
      ]
    }
    

TypeScript 项目的构建过程

  1. 项目结构

    /my-typescript-project
      /src                    存放 TypeScript 源代码文件
        index.ts
        app.ts
      /dist                   存放编译后的 JavaScript 文件
      tsconfig.json           TypeScript 编译配置文件
      package.json            npm 配置文件,定义项目的依赖和构建脚本
    
  2. 安装依赖

  3. 创建 tsconfig.json 配置文件

  4. 编译 TypeScript npx tsc

  5. 打包

    • 使用 Webpack(适用于浏览器端应用)
    • 使用 Rollup(适用于库和小型项目)
  6. 生产构建和优化

    1. 压缩和优化 JavaScript:通过 Webpack、Rollup 或 Babel 对 JavaScript 文件进行压缩和优化,减少文件体积。
    2. 代码拆分:使用 Webpack 或 Rollup 等工具将代码拆分成多个较小的文件,按需加载,提高应用的加载速度。
    3. Source Maps:使用 Source Map 来在生产环境中调试 TypeScript 代码。你可以在 tsconfig.json 中启用 sourceMap 选项来生成对应的 .map 文件。
    4. 环境变量:在构建过程中,通过设置环境变量来调整应用的行为,例如区分开发环境和生产环境的配置。
  7. 使用 npm 脚本来自动化构建过程

TypeScript 项目的日志处理

morgan 是一个常用的 HTTP 请求日志中间件,默认情况下,它会将日志输出到控制台。为了写入到外部文件,我们可以结合 fs.createWriteStream 来存储日志到文件中,或者将其集成到更强大的日志管理工具(如 Winston 或 ELK)。

1. 基本写入日志到文件

// npm install morgan
const express = require("express");
const fs = require("fs");
const path = require("path");
const morgan = require("morgan");

const app = express();

// 创建一个写入流(append 模式)
const logStream = fs.createWriteStream(path.join(__dirname, "access.log"), {
  flags: "a", // 'a' 追加模式,不会覆盖已有日志
});

// 使用 morgan 记录日志到文件
app.use(morgan("combined", { stream: logStream }));

app.get("/", (req, res) => {
  res.send("Hello World!");
});

app.listen(3000, () => {
  console.log("Server is running on port 3000");
});

2. 自定义日志格式

app.use(
  morgan(":method :url :status :response-time ms - :res[content-length]", {
    stream: logStream,
  })
);

3. 按日期自动生成日志文件

// npm install rotating-file-stream
const rfs = require("rotating-file-stream");

const accessLogStream = rfs.createStream("access.log", {
  interval: "1d", // 每天生成一个新日志
  path: path.join(__dirname, "logs"),
});

app.use(morgan("combined", { stream: accessLogStream }));

4. 结合 Winston 进行日志管理

如果想要更高级的日志管理(比如存储到数据库或 JSON 格式存储),可以结合 winston。

// npm install winston
const winston = require("winston");

const logger = winston.createLogger({
  level: "info",
  format: winston.format.json(),
  transports: [
    new winston.transports.File({ filename: "logs/error.log", level: "error" }),
    new winston.transports.File({ filename: "logs/combined.log" }),
  ],
});

app.use(
  morgan("combined", {
    stream: { write: (message) => logger.info(message.trim()) },
  })
);

5. 发送日志到远程存储

app.use(
  morgan("combined", {
    stream: {
      write: (message) => {
        // 发送日志到远程服务(假设有日志收集 API)
        fetch("http://log-server.example.com/api/logs", {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify({ log: message.trim() }),
        });
      },
    },
  })
);

RPAC & RBAC

RPAC(Resource-based Policy Access Control) 是一种基于资源的访问控制策略,它允许资源本身定义访问权限,而不是通过角色或用户分配权限。RPAC 常用于云存储(如 AWS S3)或微服务架构中,以确保资源访问的灵活性和安全性。

1. RPAC vs RBAC

对比项RPAC(基于资源的访问控制)RBAC(基于角色的访问控制)
控制点资源(Resource)角色(Role)
权限存储资源本身存储访问策略角色关联用户和权限
适用场景云存储(AWS S3)、微服务 API 权限企业内部系统,权限基于角色
动态性更灵活,资源可自行定义访问控制需要角色层级调整,较固定

2. RPAC 在 Node.js/NestJS 中的实现

  1. 源权限定义(模拟数据库)

    // resource-policies.ts
    export const resourcePolicies = {
      'file-123': { owner: 'user1', allowedUsers: ['user2'] },
      'file-456': { owner: 'user2', allowedUsers: ['user3'] },
    };
    
  2. 创建 RpacGuard

    import { CanActivate, ExecutionContext, Injectable, ForbiddenException } from '@nestjs/common';
    import { resourcePolicies } from './resource-policies';
    
    @Injectable()
    export class RpacGuard implements CanActivate {
      canActivate(context: ExecutionContext): boolean {
        const request = context.switchToHttp().getRequest();
        const user = request.user; // 假设 AuthMiddleware 解析了用户身份
        const fileId = request.params.id;
    
        const policy = resourcePolicies[fileId];
        if (!policy) {
          throw new ForbiddenException('Resource not found');
        }
    
        // 判断用户是否有权限访问
        if (policy.owner === user.id || policy.allowedUsers.includes(user.id)) {
          return true;
        } else {
          throw new ForbiddenException('Access Denied');
        }
      }
    }
    
  3. 在 Controller 里应用 RPAC

    import { Controller, Get, Param, UseGuards, Req } from '@nestjs/common';
    import { RpacGuard } from './rpac.guard';
    
    @Controller('files')
    export class FileController {
      @Get(':id')
      @UseGuards(RpacGuard) // 使用 RPAC 保护 API
      getFile(@Param('id') id: string, @Req() req) {
        return { message: `User ${req.user.id} can access file ${id}` };
      }
    }
    
  4. 用户访问示例

    curl -H "Authorization: Bearer user1-token" http://localhost:3000/files/file-123
    

OIDC(OpenID Connect) & Oauth 2.0

OIDC(OpenID Connect) 是基于 OAuth 2.0 的身份认证协议,允许应用安全地验证用户身份,并获取用户信息。

1. OIDC vs OAuth 2.0 的区别

  • OAuth 2.0 主要用于 授权,应用可以获取 access_token,用于访问 API,但无法确认用户身份。
  • OIDC 主要用于 身份认证,在 OAuth 2.0 的基础上增加了 id_token,可以确认用户身份。

2. OIDC 认证流程

  1. 用户访问应用,应用需要确认用户身份。
  2. 应用将用户重定向到身份提供者(IdP),例如 Authing、Google、GitHub 等。
  3. 用户登录后,IdP 生成 id_token 和 access_token,并返回给应用。
  4. 应用解析 id_token,获取用户信息(如 sub,即用户唯一 ID)。
  5. 用户成功登录,应用存储 session 或 JWT,允许后续访问。

3. OIDC Token 类型

  • id_token(核心):JWT 格式,包含用户身份信息,如 sub(用户 ID)、email、name 等。
  • access_token:用于访问 API 资源。
  • refresh_token(可选):用于获取新的 access_token,避免频繁登录。

4. OIDC 适用于哪些场景?

  • 单点登录(SSO)(例如:Google 登录)
  • 第三方应用认证(如 GitHub 登录到 Notion)
  • 多端一致身份认证(Web、App、API 统一认证)

总结来说,OIDC 解决的是“你是谁?”的问题,而 OAuth 2.0 解决的是“你能访问什么?”的问题。

5. OAuth 2.0 授权流程(授权码模式)

  1. 用户授权:

    • 用户在 Notion 中点击 “用 GitHub 登录”,Notion 重定向到 GitHub 授权页面。
    • 用户输入 GitHub 账号密码,并同意授权 Notion 访问 GitHub 资料
  2. GitHub 颁发 authorization_code(授权码), 重定向回 Notion,并携带授权码: https://notion.so/callback?code=123456

  3. Notion 用授权码换取 access_token

    POST https://github.com/login/oauth/access_token
    Content-Type: application/json
    
    {
      "client_id": "NOTION_CLIENT_ID",
      "client_secret": "NOTION_SECRET",
      "code": "123456",
      "redirect_uri": "https://notion.so/callback"
    }
    
    {
      "access_token": "abcd1234",
      "token_type": "Bearer",
      "expires_in": 3600
    }
    
  4. Notion 使用 access_token 访问 GitHub API, GitHub 返回用户信息

    GET https://api.github.com/user
    Authorization: Bearer abcd1234
    
  5. 用户成功登录 Notion, Notion 获取 GitHub 账号信息,创建用户,完成 OAuth 登录。

6. OAuth 2.0 安全风险 & 解决方案

  1. 令牌泄露(Token Leakage): 使用 HTTPS,短时效 access_token + Refresh Token,HttpOnly Cookie 存储 Token
  2. CSRF(跨站请求伪造): 使用 state 参数校验授权请求 state参数是一种用于防止CSRF攻击的机制。它是一个随机生成的字符串,用于确保请求来自于客户端本身,而不是来自于其他网站。state参数通常与请求一起发送,并在服务器端进行验证。如果state参数不匹配,则表示请求可能是恶意的,服务器可以拒绝处理该请求。
  3. 重放攻击(Replay Attack): 使用 nonce,在 id_token 中加入唯一随机数
  4. XSS(跨站脚本攻击) 窃取 Token: 使用 HttpOnly + Secure Cookie,避免 localStorage 存储 Token

7. OAuth 2.0 + OIDC 的最佳实践

  • 使用 authorization_code 模式(更安全,避免 Token 泄露)。
  • access_token 用于 API 访问,id_token 用于用户认证。
  • 存储 Token 时使用 HttpOnly + Secure Cookie,避免 XSS 窃取。
  • 启用 PKCE(Proof Key for Code Exchange)防止授权码被劫持。
  • 在 id_token 中使用 nonce 防止重放攻击。
  • 使用 scope=openid profile email 以获取 OIDC 认证信息。

8. 多个微服务如何共享 Authing 认证信息?

  • JWT + Gateway(API Gateway 负责解析 JWT,微服务不需要重复验证)
  • Authing Webhook(同步用户状态,如封禁、权限变更)
  • Redis 共享 Session(存储 Token 解析结果,减少重复请求)

9. 如何使用 Redis + Authing 优化身份认证?

  • 存储 Token 解析结果,减少对 Authing 的 API 请求
  • 限制 Token 请求频率,防止 DDoS 攻击
  • 缓存用户权限信息,减少数据库查询

10. 如果 Authing 宕机了,你的系统如何保证部分功能可用?

  • 本地缓存 Token 解析结果(Redis + JWT)
  • 降级策略:允许已登录用户继续使用系统,但限制高敏感操作(如支付)
  • 预请求机制:提前获取 Authing 用户数据,避免实时依赖

令牌泄露和重放攻击

令牌泄露是指攻击者窃取了用户的 access_token 或 id_token,然后冒充合法用户访问受保护的 API 或资源。

如何防止令牌泄露?

  • 使用 HTTPS 传输所有令牌数据,防止中间人攻击(MITM)。
  • 使用 HttpOnly + Secure Cookie 存储 access_token,避免 XSS 窃取。
  • 使用短时效 access_token(如 15 分钟)+ Refresh Token 机制,即使令牌被盗,攻击者可利用的时间也有限。
  • 避免将 access_token 放在 URL 参数中,改用 Authorization: Bearer token 方式传输。
  • 限制 Token 作用范围(Scope),比如用户只允许访问自己的数据,避免被滥用。
  • 启用 Token 绑定 IP / 设备,如果 access_token 在新的 IP 或设备上使用,强制重新认证。

重放攻击(Replay Attack)是指攻击者截获用户的请求(包括 access_token),然后在稍后重复发送相同的请求,以冒充用户。

如何防止重放攻击?

  • 使用一次性 Token(Nonce):在请求中加入一个随机数 nonce,服务器检查是否被重复使用。
  • 使用 iat(签发时间)+ 过期时间检查:如果 Token 过期,则拒绝请求。
  • Token 绑定 IP 或设备:如果令牌突然在新的 IP 地址或设备上使用,要求用户重新登录。
  • 使用签名请求(HMAC):在每个请求中加入 HMAC 签名,确保请求未被篡改。
  • 使用 Webhooks / 事件监听:检测用户在多个 IP 地址上短时间内使用相同 access_token,自动吊销令牌。

Webhook & 使用 Webhook 触发 Redis 缓存更新的场景

Webhook 是一种基于 HTTP 回调的机制,当某个事件发生时,服务器会主动向指定的 URL 发送请求,通知外部系统,而不是外部系统定期轮询

使用 Webhook 触发 Redis 缓存更新的场景

  1. 用户数据变更(Authing/Clerk)
    • 当用户资料(如头像、用户名、权限)发生变更时,Authing/Clerk 触发 Webhook,通知你的服务器更新缓存。
    • 你的服务器收到 Webhook 后,更新 Redis 中的用户信息。
  2. 订单状态变更(支付系统)
    • 用户付款成功后,支付平台(如 Stripe)会发送 Webhook,通知你的服务器订单已支付。
    • 你的服务器收到 Webhook 后,更新 Redis 中的订单状态,避免重复查询数据库。
  3. 库存更新(电商系统)
    • 商品库存变更时,供应链管理系统发送 Webhook,通知你的服务器更新缓存。
    • 服务器更新 Redis,确保用户看到最新的库存数据。

如何结合 Webhook + Redis 缓存更新?

  1. 注册 Webhook, 在 Authing/Clerk/Stripe 等服务中,注册一个 Webhook 监听 URL https://yourapp.com/webhook

  2. 后端处理 Webhook 并更新 Redis

    • 在你的 Node.js 服务器上,使用 Express 或 Hono 处理 Webhook。

      import { Hono } from 'hono'
      import { Redis } from 'ioredis'
      
      const app = new Hono()
      const redis = new Redis('redis://localhost:6379')
      
      app.post('/webhook', async (c) => {
        const body = await c.req.json()
      
        // 确保 Webhook 事件有效
        if (!body.event) {
          return c.json({ error: 'Invalid Webhook' }, 400)
        }
      
        if (body.event === 'user.updated') {
          const userId = body.data.id
          const newUserData = body.data
      
          // 更新 Redis 缓存
          await redis.set(`user:${userId}`, JSON.stringify(newUserData))
      
          console.log(`用户 ${userId} 数据已更新到 Redis`)
        }
      
        return c.json({ message: 'Webhook received' })
      })
      
      app.fire()
      
  3. 设置缓存过期时间 & 失效策略

    • await redis.set(`user:${userId}`, JSON.stringify(newUserData), 'EX', 3600) // 1 小时后过期
    • 缓存预热 + Webhook 缓存不存在时,查询数据库,同时监听 Webhook 进行主动更新。

设计一个高并发商品抢购系统(如秒杀)

架构设计

组件作用
Redis作为缓存,存储库存、用户抢购状态,减少数据库压力
MySQL持久化存储订单、用户购买记录
消息队列(RabbitMQ/Kafka)处理高并发下的订单异步写入 MySQL,削峰
分布式锁(Redlock/ETCD/ZooKeeper)防止超卖,确保库存正确更新
CDN + API 网关缓解请求压力,过滤非法流量
限流(Redis+令牌桶算法)防止恶意刷单,提高系统稳定性
日志 & 监控(ELK、Prometheus)监控抢购过程,发现异常

系统流程

  1. 预热阶段
    • 商品信息提前存入 Redis
    • 库存信息存入 Redis(key: stock:product_id)
    • 限流策略(如令牌桶) 控制请求速率
  2. 用户请求抢购
    1. 请求流量控制

      • CDN + API 网关拦截
      • Redis 限流(防止恶意请求)
      • 用户鉴权(防止重复下单)
    2. 库存检查

      const stock = await redis.decr('stock:1001') // 先减库存
      if (stock < 0) {
        await redis.incr('stock:1001') // 还原库存
        return { error: '商品已售罄' }
      }
      
    3. 订单处理

      • 异步队列(防止超卖)
      • Redis 成功扣减库存后,不直接写入 MySQL
      • 订单请求发送到消息队列(Kafka/RabbitMQ)
      • 订单服务消费队列消息,异步写入 MySQL
    4. 订单写入 MySQL

      • 消息队列消费者处理订单

      • 数据库事务:确保库存 & 订单一致

        BEGIN;
        UPDATE product_stock SET stock = stock - 1 WHERE product_id = 1001 AND stock > 0;
        INSERT INTO orders (user_id, product_id) VALUES (123, 1001);
        COMMIT;
        
      • 防止超卖的 SQL 条件

        UPDATE product_stock SET stock = stock - 1 WHERE product_id = 1001 AND stock > 0; 
        
    5. 支付 & 订单确认

      • 订单状态变更后,发送 Webhook 更新 Redis
      • 订单超时未支付(设置 Redis 过期时间),超时后自动释放库存

关键优化

  1. 防止超卖

    • Redis 预减库存

    • MySQL 更新时加 WHERE stock > 0 条件

    • 分布式锁(Redlock)

      const lock = await redlock.lock('lock:1001', 1000) // 1000ms 过期
      if (!lock) return { error: '请稍后重试' }
      
  2. 限流

    • 令牌桶算法(Redis + Lua)

    • 用户防刷

      const userKey = `limit:user:${userId}`
      const count = await redis.incr(userKey)
      if (count > 5) return { error: '请求过于频繁' }
      
  3. 日志 & 监控

    • ELK(ElasticSearch + Kibana)
    • Prometheus 监控请求 & 延迟

代码细节

  1. 生产者

    // producer.ts
    import { Kafka } from 'kafkajs'
    import { Redis } from 'ioredis'
    
    const kafka = new Kafka({
      clientId: 'seckill',
      brokers: ['localhost:9092'], // Kafka 服务器地址
    })
    
    const producer = kafka.producer()
    const redis = new Redis('redis://localhost:6379')
    
    async function createOrder(userId: string, productId: string) {
      await producer.connect()
    
      // 预扣 Redis 库存
      const stock = await redis.decr(`stock:${productId}`)
      if (stock < 0) {
        await redis.incr(`stock:${productId}`) // 还原库存
        return { error: '商品已售罄' }
      }
    
      // 将订单消息推送到 Kafka
      await producer.send({
        topic: 'order',
        messages: [{ key: userId, value: JSON.stringify({ userId, productId }) }],
      })
    
      console.log(`订单请求已推送 Kafka:用户 ${userId} 购买商品 ${productId}`)
    
      await producer.disconnect()
      return { success: '抢购成功,订单处理中' }
    }
    
    // 示例调用
    createOrder('user_123', 'product_1001')
    
  2. 消费者

    // consumer.ts
    import { Kafka } from 'kafkajs'
    import mysql from 'mysql2/promise'
    
    const kafka = new Kafka({
      clientId: 'seckill',
      brokers: ['localhost:9092'],
    })
    
    const consumer = kafka.consumer({ groupId: 'order-group' })
    
    // 连接 MySQL
    const db = await mysql.createConnection({
      host: 'localhost',
      user: 'root',
      password: 'password',
      database: 'seckill',
    })
    
    async function processOrder() {
      await consumer.connect()
      await consumer.subscribe({ topic: 'order', fromBeginning: false })
    
      await consumer.run({
        eachMessage: async ({ message }) => {
          const { userId, productId } = JSON.parse(message.value.toString())
    
          console.log(`处理订单:用户 ${userId} 购买商品 ${productId}`)
    
          try {
            // 使用 MySQL 事务确保订单 & 库存一致
            await db.beginTransaction()
            const [rows] = await db.execute(
              'UPDATE product_stock SET stock = stock - 1 WHERE product_id = ? AND stock > 0',
              [productId]
            )
    
            if (rows.affectedRows === 0) {
              console.log(`库存不足,回滚订单:用户 ${userId} 购买 ${productId}`)
              await db.rollback()
              return
            }
    
            await db.execute(
              'INSERT INTO orders (user_id, product_id) VALUES (?, ?)',
              [userId, productId]
            )
            await db.commit()
    
            console.log(`✅ 订单成功写入 MySQL:用户 ${userId} 购买 ${productId}`)
          } catch (error) {
            console.error('❌ 订单写入失败:', error)
            await db.rollback()
          }
        },
      })
    }
    
    // 启动消费者
    processOrder()
    
  3. index.ts整合

    // index.ts
    import Redlock from 'redlock'
    import { Kafka } from 'kafkajs'
    import { Redis } from 'ioredis'
    import mysql from 'mysql2/promise'
    
    // Kafka 配置
    const kafka = new Kafka({
      clientId: 'seckill',
      brokers: ['localhost:9092'], // Kafka 服务器地址
    })
    
    const producer = kafka.producer()
    const consumer = kafka.consumer({ groupId: 'order-group' })
    const redis = new Redis('redis://localhost:6379')
    
    
    // 初始化 Redlock(支持多个 Redis 实例,提高可靠性)
    const redlock = new Redlock([redis], {
      driftFactor: 0.01, // 时钟漂移因子
      retryCount: 3,     // 最大重试次数
      retryDelay: 200,   // 重试间隔
      retryJitter: 50    // 随机抖动
    })
    
    // 连接 MySQL
    const dbConfig = {
      host: 'localhost',
      user: 'root',
      password: 'password',
      database: 'seckill',
    }
    
    async function start() {
      try {
        console.log('🚀 启动秒杀系统...')
    
        // 1️⃣ 连接 Kafka 生产者
        await producer.connect()
        console.log('✅ Kafka 生产者已连接')
    
        // 2️⃣ 连接 Kafka 消费者
        await consumer.connect()
        await consumer.subscribe({ topic: 'order', fromBeginning: false })
        console.log('✅ Kafka 消费者已连接')
    
        // 3️⃣ 连接 Redis
        await redis.set('stock:1001', 500, 'EX', 3600) // 预热库存
        console.log('✅ Redis 连接成功,库存初始化完成')
    
        // 4️⃣ 连接 MySQL
        const db = await mysql.createConnection(dbConfig)
        console.log('✅ MySQL 连接成功')
    
        // 5️⃣ 监听订单消息(消费者)
        consumer.run({
          eachMessage: async ({ message }) => {
            const { userId, productId } = JSON.parse(message.value.toString())
    
            console.log(`📦 处理订单:用户 ${userId} 购买商品 ${productId}`)
    
            try {
              await db.beginTransaction()
              const [rows] = await db.execute(
                'UPDATE product_stock SET stock = stock - 1 WHERE product_id = ? AND stock > 0',
                [productId]
              )
    
              if (rows.affectedRows === 0) {
                console.log(`⚠️ 库存不足,回滚订单:用户 ${userId} 购买 ${productId}`)
                await db.rollback()
                return
              }
    
              await db.execute(
                'INSERT INTO orders (user_id, product_id) VALUES (?, ?)',
                [userId, productId]
              )
              await db.commit()
    
              console.log(`✅ 订单成功写入 MySQL:用户 ${userId} 购买 ${productId}`)
            } catch (error) {
              console.error('❌ 订单写入失败:', error)
              await db.rollback()
            }
          },
        })
    
        // 6️⃣ 启动 HTTP 接口(用于模拟抢购请求)
        const express = require('express')
        const app = express()
        app.use(express.json())
    
        app.post('/seckill', async (req, res) => {
          const { userId, productId } = req.body
    
          const lockKey = `lock:stock:${productId}`
    
          try{
            const lock = await redlock.acquire([lockKey], 3000)
            console.log(`🔒 用户 ${userId} 获取锁成功`)
    
            // 预扣 Redis 库存
            const stock = await redis.decr(`stock:${productId}`)
            if (stock < 0) {
              await redis.incr(`stock:${productId}`)
              return res.status(400).json({ error: '商品已售罄' })
            }
    
            // 推送订单到 Kafka
            await producer.send({
              topic: 'order',
              messages: [{ key: userId, value: JSON.stringify({ userId, productId }) }],
            })
    
            // 释放锁
            await lock.release()
            console.log(`🔓 用户 ${userId} 释放锁`)
            res.json({ success: '抢购成功,订单处理中' })
          }catch (error) {
            console.error('❌ 获取锁失败:', error)
            return { error: '系统繁忙,请稍后重试' }
          }
        })
    
        app.listen(3000, () => console.log('✅ 秒杀系统 API 运行在 http://localhost:3000'))
      } catch (error) {
        console.error('❌ 系统启动失败:', error)
      }
    }
    // 启动系统
    start()
    

Express 常见中间件及示例

Express 中间件用于拦截 HTTP 请求,进行日志记录、身份验证、错误处理等操作。以下是常见的中间件类型

内置

  1. express.json() 解析 JSON
  2. express.urlencoded() 解析 URL 编码数据
  3. express.static() 静态文件托管

第三方

  1. cors 解决跨域
  2. morgan 记录日志
  3. helmet 安全防护
  4. compression 启用 Gzip 压缩

自定义中间件

  1. 记录请求时间
  2. 认证中间件
  3. 错误处理

微服务

1. 创建 Express 项目

require("dotenv").config();
const express = require("express");
const cors = require("cors");
const { auth } = require("./auth");

const app = express();
app.use(cors());
app.use(express.json());

// 认证中间件
app.use(auth);

app.get("/", (req, res) => res.send("Hello World"));

app.post("/process", async (req, res) => {
  const lock = await lockResource("process-lock");
  try {
    // 业务逻辑
    res.json({ message: "Processing completed" });
  } finally {
    await lock.unlock();
  }
});

app.listen(3000, () => console.log("Server running on port 3000"));

2. 集成 NextAuth 进行认证

  1. 配置NextAuth, 支持accessToken, refreshToken

    // auth.js
    const { NextAuth } = require("next-auth");
    const Providers = require("next-auth/providers");
    
    const options = {
      providers: [
        Providers.Credentials({
          name: "Credentials",
          authorize: async (credentials) => {
            if (credentials.username === "admin" && credentials.password === "password") {
              return { id: 1, name: "Admin" };
            }
            throw new Error("Invalid credentials");
          },
        }),
      ],
      callbacks: {
        async jwt({ token, user }) {
          if (user) {
            token.accessToken = jwt.sign(
              { sub: user.id, name: user.name },
              process.env.JWT_SECRET!,
              { expiresIn: "15m" }
            );
            token.refreshToken = jwt.sign(
              { sub: user.id, name: user.name },
              process.env.REFRESH_SECRET!,
              { expiresIn: "30d" }
            );
            token.accessTokenExpires = Date.now() + 15 * 60 * 1000;
          }
          // 如果 accessToken 过期,则刷新
          if (Date.now() > token.accessTokenExpires) {
            console.log("Refreshing access token...");
            try {
              const newAccessToken = jwt.sign(
                { sub: token.sub },
                process.env.JWT_SECRET!,
                { expiresIn: "15m" }
              );
    
              return {
                ...token,
                accessToken: newAccessToken,
                accessTokenExpires: Date.now() + 15 * 60 * 1000,
              };
            } catch (error) {
              console.error("Refresh token failed", error);
              return { ...token, error: "RefreshTokenError" };
            }
          }
          return token;
        },
        async session({ session, token }) {
          session.accessToken = token.accessToken;
          session.refreshToken = token.refreshToken;
          return session;
        },
      },
      secret: process.env.AUTH_SECRET,
    };
    
    const auth = (req, res, next) => {
      NextAuth(req, res, options)
        .then((session) => {
          req.user = session?.user;
          next();
        })
        .catch(() => res.status(401).json({ error: "Unauthorized" }));
    };
    
    module.exports = { auth };
    
  2. 微服务验证

    • 中间件验证 access_token

      // authMiddleware.js
      const jwt = require("jsonwebtoken");
      
      const authMiddleware = (req, res, next) => {
        const authHeader = req.headers.authorization;
      
        if (!authHeader || !authHeader.startsWith("Bearer ")) {
          return res.status(401).json({ error: "Unauthorized" });
        }
      
        const token = authHeader.split(" ")[1];
      
        try {
          const decoded = jwt.verify(token, process.env.JWT_SECRET);
          req.user = decoded;
          next();
        } catch (err) {
          if (err.name === "TokenExpiredError") {
            return res.status(401).json({ error: "Access token expired" });
          }
          return res.status(403).json({ error: "Invalid or expired token" });
        }
      };
      
      module.exports = authMiddleware;
      
    • 微服务 refresh-token API

      app.post("/refresh-token", (req, res) => {
        const { refreshToken } = req.body;
      
        if (!refreshToken) {
          return res.status(401).json({ error: "No refresh token provided" });
        }
      
        try {
          const decoded = jwt.verify(refreshToken, process.env.REFRESH_SECRET);
          const newAccessToken = jwt.sign(
            { sub: decoded.sub },
            process.env.JWT_SECRET,
            { expiresIn: "15m" }
          );
      
          return res.json({ accessToken: newAccessToken });
        } catch (err) {
          return res.status(403).json({ error: "Invalid or expired refresh token" });
        }
      });
      
  3. 使用

    1. 前端使用accessToken

      const session = await getSession();
      const accessToken = session?.accessToken;
      
    2. 后端使用

      // server.js
      const authMiddleware = require("./authMiddleware");
      
      app.get("/protected", authMiddleware, (req, res) => {
        res.json({ message: "This is a protected route", user: req.user });
      });
      
    3. 前端刷新accessToken

      async function fetchWithAuth(url: string, options = {}) {
        let session = await getSession();
      
        const response = await fetch(url, {
          ...options,
          headers: {
            ...options.headers,
            Authorization: `Bearer ${session?.accessToken}`,
          },
        });
      
        if (response.status === 401) {
          console.log("Access token expired, refreshing...");
          const refreshResponse = await fetch("/api/auth/refresh-token", {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify({ refreshToken: session?.refreshToken }),
          });
      
          if (refreshResponse.ok) {
            const { accessToken } = await refreshResponse.json();
            session = await updateSessionToken(accessToken);
            return fetch(url, {
              ...options,
              headers: {
                ...options.headers,
                Authorization: `Bearer ${accessToken}`,
              },
            });
          }
        }
      
        return response;
      }
      
      async function updateSessionToken(newAccessToken: string) {
        const session = await getSession();
        session.accessToken = newAccessToken;
        return session;
      }
      

3. 使用 Redis 缓存

//redis.js
const Redis = require("ioredis");
const redis = new Redis(process.env.REDIS_URL);

const cacheMiddleware = async (req, res, next) => {
  const key = `cache:${req.originalUrl}`;
  const cached = await redis.get(key);
  if (cached) {
    return res.json(JSON.parse(cached));
  }
  res.sendResponse = res.json;
  res.json = (body) => {
    redis.setex(key, 3600, JSON.stringify(body)); // 1-hour cache
    res.sendResponse(body);
  };
  next();
};

module.exports = { redis, cacheMiddleware };

4. 实现 Redis 红锁

//redis.js
const Redlock = require("redlock");

const redlock = new Redlock([redis], {
  driftFactor: 0.01,
  retryCount: 3,
  retryDelay: 200,
  retryJitter: 200,
});

const lockResource = async (key, ttl = 5000) => {
  return await redlock.lock(key, ttl);
};

module.exports = { redis, cacheMiddleware, lockResource };

5. ELK 记录日志

  1. 安装 pino

    npm install pino pino-http
    
  2. 在 server.js 配置日志

    // server.js
    const pino = require("pino");
    const pinoHttp = require("pino-http");
    
    const logger = pino();
    app.use(pinoHttp({ logger }));
    
    module.exports = { logger };
    
  3. docker中添加elk

  4. logstash配置

    // logstash.conf
    input {
      tcp {
        port => 5044
        codec => json
      }
    }
    output {
      elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "express-logs"
      }
      stdout { codec => rubydebug }
    }
    

6. Docker 部署

  1. 创建 Dockerfile

    FROM node:18
    WORKDIR /app
    COPY package*.json ./
    RUN npm install
    COPY . .
    EXPOSE 3000
    CMD ["node", "server.js"]
    
  2. 创建 docker-compose.yml

    version: "3"
    services:
      app:
        build: .
        ports:
          - "3000:3000"
        environment:
          - REDIS_URL=redis://redis:6379
        depends_on:
          - redis
      redis:
        image: redis:latest
        ports:
          - "6379:6379"
    
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:8.5.1
        environment:
          - discovery.type=single-node
        ports:
          - "9200:9200"
    
      logstash:
        image: docker.elastic.co/logstash/logstash:8.5.1
        volumes:
          - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
        depends_on:
          - elasticsearch
        ports:
          - "5044:5044"
    
      kibana:
        image: docker.elastic.co/kibana/kibana:8.5.1
        ports:
          - "5601:5601"
        depends_on:
          - elasticsearch
    

7. 设立请求阈值报警

  1. 使用 express-rate-limit

    npm install express-rate-limit
    
  2. 设置

    // server.js
    const rateLimit = require("express-rate-limit");
    const limiter = rateLimit({
      windowMs: 60 * 1000, // 1 minute
      max: 100, // Limit each IP to 100 requests per minute
      message: "Too many requests, please try again later",
    });
    
    app.use(limiter);
    
  3. 配合ELK报警

    if (req.rateLimit.remaining === 0) {
      logger.warn("High request traffic detected!");
    }
    

8. 运行项目

docker-compose up -d

9. 其他讨论

  1. 在哪里实现 refresh_token 逻辑
    1. 前端 + NextAuth 服务器管理 Refresh Token(推荐)
      • 你已经有一个 NextAuth 服务器,它负责用户认证,而微服务主要是数据 API,不希望额外存储 refresh_token
    2. 微服务管理 Refresh Token(适用于多客户端)