WebFlux 应用程序中的 WebFilter

2024-05-07

我有一个使用 Spring Boot 2.0.0.M5/2.0.0.BUILD-SNAPSHOT 的 Spring Boot WebFlux 应用程序。 我需要将跟踪 ID 添加到所有日志中。

为了让它在 WebFlux 应用程序中工作,我尝试使用描述的 WebFilter 方法here https://jira.spring.io/browse/SPR-15680 and here https://github.com/rstoyanchev/context-holder/blob/master/src/main/java/com/example/context/RequestContextFilter.java

@Component
public class TraceIdFilter implements WebFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return chain.filter(exchange).subscriberContext((Context context) ->
        context.put(AuditContext.class, getAuditContext(exchange.getRequest().getHeaders()))
    );
}

我的控制器

@GetMapping(value = "/some_mapping")
public Mono<ResponseEntity<WrappedResponse>> getResource(@PathVariable("resourceId") String id) {
    Mono.subscriberContext().flatMap(context -> {
        AuditContext auditContext = context.get(AuditContext.class);
        ...
    });

我遇到的问题是过滤器方法永远不会被执行,并且上下文也没有设置。我已经确认 Webfilter 在启动时已加载。 还需要其他什么才能使过滤器正常工作吗?


我在解决这个问题时遇到了很多问题,希望它能对某人有所帮助。我的用例是验证请求的签名。这需要我解析 PUT/POST 的请求正文。我看到的另一个主要用例是日志记录,因此下面的内容也会有所帮助。

中间件Authenticator.java

@Component
public class MiddlewareAuthenticator implements WebFilter { 

    @Autowired private RequestValidationService requestValidationService;

@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain chain) {
  return HEALTH_ENDPOINTS
      .matches(serverWebExchange)
      .flatMap(
          matches -> {
            if (matches.isMatch()) {
              return chain.filter(serverWebExchange);
            } else {
              return requestValidationService
                  .validate(serverWebExchange, 
                       new BiPredicate<ServerWebExchange, String> { 
                         @Override
                         public boolean test(ServerWebExchange e, String body) {
                             /** application logic can go here. few points:
                              1. I used a BiPredicate because I just need a true or false if the request should be passed to the controller. 
                              2. If you want todo other mutations you could swap the predicate to a normal function and return a mutated ServerWebExchange. 
                              3. I pass body separately here to ensure safety of accessing the request body and not having to rewrap the ServerWebExchange. A side affect of this though is any mutations to the String body do not affect downstream.
                              **/
                              return true;
                            }

                      })
                 .flatMap((ServerWebExchange r) -> chain.filter(r));
            }});
}

请求验证服务.java

@Service
public class RequestValidationService {
private DataBuffer stringBuffer(String value) {
  byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

  NettyDataBufferFactory nettyDataBufferFactory =
      new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
  DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
  buffer.write(bytes);
  return buffer;
}

private String bodyToString(InputStream bodyBytes) {
  byte[] currArr = null;
  try {
    currArr = bodyBytes.readAllBytes();
    bodyBytes.read(currArr);
  } catch (IOException ioe) {
    throw new RuntimeException("could not parse body");
  }

  if (currArr.length == 0) {
    return null;
  }

  return new String(currArr, StandardCharsets.UTF_8);
}

private ServerHttpRequestDecorator requestWrapper(ServerHttpRequest request, String bodyStr) {
  URI uri = request.getURI();
  ServerHttpRequest newRequest = request.mutate().uri(uri).build();
  final DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
  Flux<DataBuffer> newBodyFlux = Flux.just(bodyDataBuffer);
  ServerHttpRequestDecorator requestDecorator =
      new ServerHttpRequestDecorator(newRequest) {
        @Override
        public Flux<DataBuffer> getBody() {
          return newBodyFlux;
        }
      };

  return requestDecorator;
}

private InputStream newInputStream() {
  return new InputStream() {
    public int read() {
      return -1;
    }
  };
}

private InputStream processRequestBody(InputStream s, DataBuffer d) {
  SequenceInputStream seq = new SequenceInputStream(s, d.asInputStream());
  return seq;
}

private Mono<ServerWebExchange> processInputStream(
    InputStream aggregatedBodyBytes,
    ServerWebExchange exchange,
    BiPredicate<ServerHttpRequest, String> predicate) {

  ServerHttpRequest request = exchange.getRequest();
  HttpHeaders headers = request.getHeaders();

  String bodyStr = bodyToString(aggregatedBodyBytes);

  ServerWebExchange mutatedExchange = exchange;

  // if the body exists on the request we need to mutate the ServerWebExchange to not
  // reparse the body because DataBuffers can only be read once;
  if (bodyStr != null) {
    mutatedExchange = exchange.mutate().request(requestWrapper(request, bodyStr)).build();
  }

  ServerHttpRequest mutatedRequest = mutatedExchange.getRequest();

  if (predicate.test(mutatedRequest, bodyStr)) {
    return Mono.just(mutatedExchange);
  }

  return Mono.error(new RuntimeException("invalid signature"));
}

/*
 * Because the DataBuffer is in a Flux we must reduce it to a Mono type via Flux.reduce
 * This covers large payloads or requests bodies that get sent in multiple byte chunks
 * and need to be concatentated.
 *
 * 1. The reduce is initialized with a newInputStream
 * 2. processRequestBody is called on each step of the Flux where a step is a body byte
 *    chunk. The method processRequestBody casts the Inbound DataBuffer to a InputStream
 *    and concats the new InputStream with the existing one
 * 3. Once the Flux is complete flatMap is executed with the resulting InputStream which is
 *    passed with the ServerWebExchange to processInputStream which will do the request validation
 */
public Mono<ServerWebExchange> validate(
    ServerWebExchange exchange, BiPredicate<ServerHttpRequest, String> p) {
  Flux<DataBuffer> body = exchange.getRequest().getBody();

  return body.reduce(newInputStream(), this::processRequestBody)
      .flatMap((InputStream b) -> processInputStream(b, exchange, p));
}

}

双谓词文档:https://docs.oracle.com/javase/8/docs/api/java/util/function/BiPredicate.html https://docs.oracle.com/javase/8/docs/api/java/util/function/BiPredicate.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

WebFlux 应用程序中的 WebFilter 的相关文章

随机推荐

  • 在 jsTree 上下文菜单中创建自定义项目

    我在 asp net mvc3 中使用 jsTree 和 contextmenu 创建一个树视图 div ul li a href class usr Model Name a Html Partial Childrens Model li
  • 在 C# 中使用 VB6 字符串数组

    我有 旧的 VB6 代码 我想从 C 代码中使用它 这有点类似于这个问题 https stackoverflow com questions 23507416 passing string array from vb6 to c net 但
  • WatiN pressTab 不按 Tab

    有没有人找到了在 Internet Explorer 中使用 watiN 按 Tab 键的方法 您的意思是要按 Tab 键本身 还是只是单击看起来像 Tab 的 HTML 元素 对于后者 请对适当的元素 Div Span 等 使用 Clic
  • mysql 如何比较 dd-mon-yy 格式的日期

    如何比较格式中的日期dd mon yy 例如 2014 年 11 月 10 日 gt 2013 年 10 月 7 日 select expiration date from grocery where expiration date lt
  • 将文件名存储在变量中并在 SSIS 中使用该文件名创建表

    我在 SSIS 的一个文件夹中只有几个 excel 源文件 我想从这些 Excel 文件中提取数据并加载到 SQL 表中 我的问题是我想一一保存所有文件名 并想创建与文件名完全相同的 SQL 表 然后要加载相应表中的每个Excel文件 请帮
  • Oozie SSH 操作

    Oozie SSH 操作问题 Issue 我们正在尝试在集群的特定主机上运行一些命令 我们为此选择了 SSH Action 我们面对这个 SSH 问题已经有一段时间了 这里真正的问题可能是什么 请指出解决方案 logs AUTH FAILE
  • LibGDX - 着色器适用于桌面但不适用于 Android

    我编写了一个简单的程序 可以在 3D 环境中渲染球体 并根据球体周围的四个光源为其着色 当我在桌面上运行该程序时 它工作得很好 但在 Android 设备上 球体只是纯色的 下面是一些图片来说明我正在谈论的内容 gt Desktop gt
  • “马来半岛标准时间”的时区问题

    我有一个在 C 上运行以下代码的程序 TimeZoneInfo localZone TimeZoneInfo Local string timeZone TimeZoneInfo FindSystemTimeZoneById localZo
  • $.each([集合]) 与 $([集合]).each()

    两种方法似乎产生相同的结果 http jsbin com owedo 但我一直很难真正说服人们第二种方法有效 因为它显然并不为人所知 Create some data var foo vals id foo id bar Common Me
  • 将 PHP 错误处理程序限制为特定命名空间

    PHP 有没有办法只为特定的命名空间设置错误处理程序 我正在构建一个小型框架 我希望能够通过设置自定义错误处理程序并抛出异常来尝试捕获其名称空间内的所有错误 警告 通知消息 在此特定名称空间之外触发的错误应该以常规方式表现 用PHP可以完成
  • 数据透视和运行时错误 1004:应用程序定义或对象定义的错误

    我对宏和 VBA 编码很陌生 我正在尝试创建一个非常简单的宏 它从包含 33 列的表中获取数据并将其转换为数据透视表 只有最后 3 列 31 32 33 包含数字 我需要将其显示在枢轴上 因为我想比较当前月份 上个月以及逐月的变动 到目前为
  • 将图像文件存储在猫鼬模式的二进制数据中并以html形式显示图像

    我正在使用 Express Node js 和 Mongodb 创建上传和显示图像文件的网页 我使用 schema 将图像的二进制文件保存在 mongodb 中 这是我在index js和db js中的一点代码 var Post mongo
  • 对 Dictionary 的键使用锁定

    我有一个Dictionary
  • 强制动态链接库中静态变量的预初始化

    C 11 标准第 3 6 2 条 第 4 条规定 具有静态存储持续时间的非局部变量的动态初始化是否在 main 的第一个语句之前完成是由实现定义的 C 标准将静态初始化与动态初始化区分开来 静态初始化仅需要计算编译时常量 我相信急切静态初始
  • 使用 Node.js 将对象写入文件

    我已经在 stackoverflow google 上搜索过这个 但似乎无法弄清楚 我正在抓取给定 URL 页面的社交媒体链接 该函数返回一个包含 URL 列表的对象 当我尝试将此数据写入不同的文件时 它会输出到该文件 object Obj
  • Python 请求:使用 Multipart/form-data 在 Facebook 上发布图像

    我正在使用 facebook API 在页面上发布图像 我可以使用以下命令从网络发布图像 import requests data url url caption caption access token token status requ
  • 获取小部件的背景颜色 - 真的

    我无法获取小部件的实际背景颜色 在我的特殊情况下 我在使用 QTabWidget 中的小部件时遇到问题 这是在Windows7上 因此 经典的小部件有一些灰色背景 而选项卡内的小部件通常用白色背景绘制 I tried def bgcolor
  • python seaborn:按色调显示 alpha

    在seaborn中 色调为组设置不同的颜色 我可以设置吗alpha取决于组中的JointGrid 或者甚至在单个数据点上 sns set theme jg sns JointGrid data df sns x x y y hue hue
  • C# 是否可以中断 ThreadPool 内的特定线程?

    假设我已将一个工作项排入队列ThreadPool 但是如果没有要处理的数据 从BlockingQueue 如果队列为空并且队列中不再有工作 那么我必须调用Thread Interrupt方法 如果我想中断阻塞任务 但是如何用 a 做同样的事
  • WebFlux 应用程序中的 WebFilter

    我有一个使用 Spring Boot 2 0 0 M5 2 0 0 BUILD SNAPSHOT 的 Spring Boot WebFlux 应用程序 我需要将跟踪 ID 添加到所有日志中 为了让它在 WebFlux 应用程序中工作 我尝试