高效解析响应式缓冲区流

工程 | Arjen Poutsma | 2021年9月14日 | ...

Spring Framework 5.3 发布已有一段时间了。该版本的一个特性是对我们的响应式 Multipart 支持进行了重大改进。在这篇博文中,我们分享了在开发此功能时学到的一些知识。具体来说,我们重点讨论了在字节缓冲区流中查找标记的方法。

Multipart Form Data

每当您上传文件时,您的浏览器会将其以及表单中的其他字段作为 multipart/form-data 消息发送到服务器。这些消息的确切格式在 RFC 7578 中有所描述。如果您提交一个包含一个名为 foo 的简单文本字段和一个名为 file 的文件选择器的简单表单,那么 multipart/form-data 消息看起来像这样

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)

--boundary (2)
Content-Disposition: form-data; name="foo" (3)

bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary-- (6)
  1. 消息的 Content-Type 头包含 boundary 参数。

  2. 边界用于开始第一部分。它前面是 --

  3. 第一部分包含文本字段 foo 的值,如部分头中所示。字段的值是 bar

  4. 边界用于分隔第一部分和第二部分。同样,它前面是 --

  5. 第二部分包含提交文件 lorum.txt 的内容。

  6. 消息的结尾由边界指示。它前面和后面都是 --

查找边界

multipart/form-data 消息中的边界非常重要。它被指定为 Content-Type 头的一个参数。当前面有两个连字符 (--) 时,边界表示新部分的开始。当后面也跟着 -- 时,边界表示消息的结束。

在传入字节缓冲区流中查找边界是解析多部分消息的关键。这样做看起来足够简单

private int indexOf(DataBuffer source, byte[] target) {
  int max = source.readableByteCount() - target.length + 1;
  for (int i = 0; i < max; i++) {
    boolean found = true;
    for (int j = 0; j < target.length; j++) {
      if (source.getByte(i + j) != target[j]) {
        found = false;
        break;
      }
    }
    if (found) {
      return i;
    }
  }
  return -1;
}

然而,有一个复杂之处:边界可以跨越两个缓冲区,这在 Reactive 环境中可能不会同时到达。例如,给定前面显示的多部分消息示例,第一个缓冲区可能包含以下内容

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary"

--boundary
Content-Disposition: form-data; name="foo"

bar
--bou

而下一个缓冲区包含剩余部分

ndary
Content-Disposition: form-data; name="file"; filename="lorum.txt"
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary--

如果我们一次检查一个缓冲区,我们就无法找到这样的分割边界。相反,我们需要跨多个缓冲区查找边界。

解决这个问题的一种方法是等到所有缓冲区都已接收,将它们合并,然后定位边界。以下示例使用示例流和前面定义的 indexOf 方法来完成此操作

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

Mono<Integer> result = DataBufferUtils.join(stream)
  .map(joined -> indexOf(joined, boundary));

StepVerifier.create(result)
  .expectNext(6)
  .verifyComplete();

使用 Reactor 的 StepVerifier,我们看到边界从索引 6 开始。

这种方法有一个主要缺点:将多个缓冲区合并为一个,实际上会将整个多部分消息存储在内存中。多部分消息主要用于上传(大)文件,因此这不是一个可行的选择。相反,我们需要一种更智能的方法来定位边界。

Knuth 算法来帮忙!

幸运的是,这种方法以 Knuth-Morris-Pratt 算法的形式存在。该算法的主要思想是,如果我们已经匹配了边界的几个字节,但下一个字节不匹配,我们就不需要从头开始。为此,该算法维护状态,其形式是预计算表中包含不匹配后可以跳过的字节数的某个位置。

在 Spring Framework 中,我们已经在 Matcher 接口中实现了 Knuth-Morris-Pratt 算法,您可以通过 DataBufferUtils::matcher 获取其实例。您还可以查看源代码

这里,我们使用 Matcher 来获取 streamboundary 的结束索引,使用与前面相同的示例输入

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(boundary);
Flux<Integer> result = stream.map(matcher::match);

StepVerifier.create(result)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(3)
  .expectNext(-1)
  .verifyComplete();

请注意,Knuth-Morris-Pratt 算法给出边界的**结束**索引,这解释了测试结果:边界直到倒数第二个缓冲区中的索引 3 才结束。

正如所料,Spring Framework 的 MultipartParser 大量使用了 Matcher,用于

如果您需要在字节缓冲区流中查找一系列字节,请尝试使用 Matcher

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有