11.4响应式地消费REST API

11.4响应式地消费REST API

在第7章中,使用ResTemplateTaco Cloud API发出客户端请求。RestTemplate是一个旧的计时器,已经在Spring3.0版本中引入。在它的时代,它被用来代表使用它的应用程序发出无数的请求。

但是ResTemplate提供的所有方法都处理非响应式域类型和集合。这意味着如果你想以一种响应式的方式处理一个响应的数据,你需要用Flux或者Mono来包装它。如果你已经有了一个Flux或者Mono,并且你想在POST或者PUT请求中发送它,那么你需要在发出请求之前将数据提取成一个非响应式类型。

如果有一种方法可以将RestTemplate原生地用于响应式类型,那就太好了。不要害,Spring 5提供了WebClient作为RestTemplate的一个响应式替代品。WebClient允许在向外部API发出请求时发送和接收响应类型。

使用WebClient与使用RestTemplate有很大不同。WebClient没有几个方法来处理不同类型的请求,而是有一个流畅的构件式接口,用来描述和发送请求。使用WebClient的一般使用模式是:

  • 创建一个WebClient实例(或是注入一个WebClient bean
  • 指定用于发送请求的HTTP方法
  • 指定URI和应该存在于请求中Header
  • 提交请求
  • 获取响应

让我们看看WebClient的几个实际例子,从如何使用WebClient发送HTTP GET请求开始。

11.4.1 GET资源

作为WebClient使用的一个例子,假设需要从Taco Cloud API中获取一个Ingredient对象的ID。可以使用RestTemplategetForObject()方法。但是使用WebClient,你需要构建请求、检索响应,然后提取一个发布Ingredient对象的Mono

Mono<Ingredient> ingredient = WebClient.create().get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .retrieve()
    .bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... })

在这里,将使用create()创建一个新的WebClient实例。然后,使用get()uri()定义对 http://localhost:8080/ingresents/{id}GET请求,其中 {id} 占位符将替换为ingredentId中的值。retrieve()方法执行请求。最后,对bodyToMono()的调用将响应的body有效负载提取到Mono中,可以继续对其应用addition Mono操作。

要对bodyToMono()返回的Mono应用其他操作,在发送请求之前订阅它是很重要的。发出可以返回值集合的请求同样容易。例如,以下代码片段获取所有成分:

Flux<Ingredient> ingredients = WebClient.create()
    .get()
    .uri("http://localhost:8080/ingredients")
    .retrieve()
    .bodyToFlux(Ingredient.class);
ingredients.subscribe(i -> { ... })

对于大多数情况,获取多条数据与请求单个数据一样。最大的区别在于,你不用bodyToMono()来将响应体提取到Mono中,而是用bodyToFlux()来将其提取到Flux中。

bodyToMono()一样,bodyToFlux()返回的Flux还没有被订阅。这允许附加操作(过滤器、映射等)在数据开始流经Flux之前应用于Flux。因此,订阅结果的Flux很重要,否则请求将永远不会被发送。

使用一个基本URI发出请求

你可能会发现,自己对许多不同的请求都使用一个通用的基本URI。在这种情况下,创建一个带有基本URIWebClientbean注入到任何需要的地方是很有用的。这样的bean如下所示:

@Bean
public WebClient webClient() {
    return WebClient.create("http://localhost:8080");
}

然后,在需要使用该基本URI发出请求的任何地方,都可以这样注入和使用WebClient bean

@Autowired
WebClient webClient;

public Mono<Ingredient> getIngredientById(String ingredientId) {
    Mono<Ingredient> ingredient = webClient
        .get()
        .uri("/ingredients/{id}", ingredientId)
        .retrieve()
        .bodyToMono(Ingredient.class);
    ingredient.subscribe(i -> { ... })
}

因为WebClient已经创建,所以可以通过调用get()获得使用权限。对于URI,在调用uri()时,只需要指定相对于基URI的路径。

对长期运行的请求的定时处理

你可以指望的是,网络并不总是像你期望的那样可靠和快速。或者,远程服务器在处理请求时可能很慢。理想情况下,对远程服务的请求将在合理的时间内返回。如果不是,如果客户没有长时间等待响应,那已经非常值得庆幸了。

为了避免客户端请求被缓慢的网络或服务所阻塞,可以使用FluxMono中的timeout()方法来限制等待发布数据的时间。作为示例,考虑在获取成分数据时如何使用timeout()

Flux<Ingredient> ingredients = WebClient.create()
    .get()
    .uri("http://localhost:8080/ingredients")
    .retrieve()
    .bodyToFlux(Ingredient.class);

ingredients
    .timeout(Duration.ofSeconds(1))
    .subscribe(
        i -> { ... },
        e -> {
            // handle timeout error
        })

在订阅Flux之前,调用timeout(),指定了1s的持续时间。如果请求能在1秒内完成,那么就没问题。但是,如果请求花费的时间超过1秒,那么它将超时,并调用作为第二个参数传递给subscribe()的错误处理程序。

11.4.2发送资源

使用WebClient发送数据与接收数据没有太大区别。例如,假设有一个Mono,并且希望向URI发送一个POST请求,其中包含由Mono发布的成分以及/ingredients的相对路径。

只需使用post()方法而不是get()方法,并指定使用Mono来调用body()来填充请求体:

Mono<Ingredient> ingredientMono = ...;

Mono<Ingredient> result = webClient
    .post()
    .uri("/ingredients")
    .body(ingredientMono, Ingredient.class)
    .retrieve()
    .bodyToMono(Ingredient.class);

result.subscribe(i -> { ... })

如果没有MonoFlux要发送,而手头有原始域对象,那么可以使用syncBody()。例如,假设有一个要在请求体中发送的Ingredient,而不是Mono

Ingedient ingredient = ...;

Mono<Ingredient> result = webClient
    .post()
    .uri("/ingredients")
    .syncBody(ingredient)
    .retrieve()
    .bodyToMono(Ingredient.class);

result.subscribe(i -> { ... })

如果你想要用PUT请求更新一个Ingredient而不是POST请求,那么就调用put()来代替post(),并相应地调整URI路径:

Mono<Void> result = webClient
    .put()
    .uri("/ingredients/{id}", ingredient.getId())
    .syncBody(ingredient)
    .retrieve()
    .bodyToMono(Void.class)
    .subscribe();

PUT请求通常具有空的响应有效负载,因此必须请求bodyToMono()返回一个Void类型的Mono。订阅Mono后,请求将被发送。

11.4.3删除资源

WebClient还允许通过它的delete()方法删除资源。例如,以下代码删除了给定IDIngredient

Mono<Void> result = webClient
    .delete()
    .uri("/ingredients/{id}", ingredientId)
    .retrieve()
    .bodyToMono(Void.class)
    .subscribe();

PUT请求一样,DELETE请求通常没有有效负载。同样,你需要返回并订阅一个Mono来发送请求。

11.4.4处理错误

到目前为止,所有的WebClient示例都假定有一个圆满的结局;没有包含400500状态码的响应。如果返回这两种错误状态,WebClient将记录失败日志;否则,WebClient将默认忽略该错误。

如果你需要处理此类错误,那么可以使用对onStatus()的调用来指定应该如何处理各种HTTP状态码。onStatus()接受两个函数:一个谓词函数是用于匹配HTTP状态;另一个函数是给定的ClientResponse对象,返回Mono

为了演示如何使用onStatus()创建自定义错误处理程序,请考虑使用以下WebClient,以获取给定IDIngredient

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .retrieve()
    .bodyToMono(Ingredient.class);

只要ingredientId中的值与已知的成分资源匹配,那么当Mono订阅了Ingredient对象时,它就会发布该Ingredient对象。但是如果没有匹配的成分,会发生什么情况呢?

当订阅一个可能以错误结束的MonoFlux时,在调用subscribe()时注册一个错误消费者与注册一个数据消费者同样重要:

ingredientMono.subscribe(
    ingredient -> {
    // handle the ingredient data
    ...
    },
    error-> {
    // deal with the error
    ...
    });

如果找到该成分资源,那么将调用给予subscribe()的第一个lambda(数据使用者)和匹配的Ingredient对象。但是,如果没有找到它,那么请求将使用HTTP 404(NOTFOUND)响应,这将导致第二个lambda(错误消费者)在默认情况下被给予一个WebClientResponseException

WebClientResponseException的最大问题是,它没有明确指出导致Mono失败的原因。它的名称表明在WebClient发出的请求的响应中有错误,但是你需要深入查看WebClientResponseException以了解出错的原因。无论如何,如果提供给错误使用者的异常是域特定的,而不是WebClient,那就更好了。

通过添加自定义错误处理程序,可以提供将状态代码转换为自己选择的Throwable的代码。假设你想让一个对成分资源的失败请求导致MonoUnknownIngredientException错误中完成。在调用retrieve()方法后,在onStatus()方法中添加调用,可以实现:

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError,
        response -> Mono.just(new UnknownIngredientException()))
    .bodyToMono(Ingredient.class);

onStatus()调用中的第一个参数是一个谓词,它给定一个HttpStatus,如果状态码是你想要处理的,则返回true。如果状态码匹配,那么响应将返回到第二个参数中的函数,由它根据自身需要进行处理,最终返回一个类型为ThrowableMono

在示例中,如果状态码是400级别的状态码(例如,客户端错误,那么Mono将返回一个UnknownIngredientException。这导致该ingredientMono失败,并抛出异常。

注意HttpStatus::is4xxClientErrorHttpStatusis4xxClientError方法的一个方法引用。该方法将在给定的HttpStatus对象上调用。如果你愿意,可以在HttpStatus上使用另一个方法作为方法引用;或者可以以lambda或方法引用的形式提供返回布尔值的自己的函数。

例如,可以通过将调用更改为onStatus()来检查HTTP 404(NOT FOUND)状态,从而在错误处理方面获得更精确的结果:

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .retrieve()
    .onStatus(status -> status == HttpStatus.NOT_FOUND,
        response -> Mono.just(new UnknownIngredientException()))
    .bodyToMono(Ingredient.class);

同样值得一提的是,在处理响应中可能返回的任何HTTP状态码时,都可以调用onStatus()

11.4.5交换请求

到目前为止,在使用WebClient时,已经使用retrieve()方法表示发送请求。在这些情况下,retrieve()方法返回一个ResponseSpec类型的对象,通过该方法,可以通过调用onStatus()bodyToFlux()bodyToMono()等方法来处理响应。使用ResponseSpec对于简单的案例来说是好的,但是在某些方面它有局限性。例如,如果需要访问响应头或cookie值,那么ResponseSpec就不合适了。

ResponseSpec出现短缺时,可以尝试调用exchange()而不是retrieve()exchange()方法返回类型为ClientResponseMono,可以在该方法上应用响应式操作来检查和使用来自整个响应的数据,包括有效负载、报头和Cookie

在我们研究exchange()retrieve()的区别之前,让我们先看看它们之间的相似程度。下面的代码片段使用WebClientexchange()通过成分ID获取单个成分:

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .exchange()
    .flatMap(cr -> cr.bodyToMono(Ingredient.class));

这大致相当于下面的使用retrieve()的例子:

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .retrieve()
    .bodyToMono(Ingredient.class);

exchange()示例中,不使用ResponseSpecbodyToMono()来获取一个Mono,而是得到一个Mono,在这个基础上,你可以应用一个平面映射函数来将ClientResponse映射到一个Mono,这个映射函数被扁平化为最终的Mono

现在让我们来看看exchanger()有什么不同。我们假设来自请求的响应可能包含一个名为X_UNAVAILABLEheader,其值为true,以指示(由于某种原因)所述成分不可用。为了便于讨论,假设该header存在,你希望得到的Mono是空的,以便不返回任何内容。可以通过向flatMap()添加另一个调用来实现这个场景,这样整个WebClient调用看起来就像这样:

Mono<Ingredient> ingredientMono = webClient
    .get()
    .uri("http://localhost:8080/ingredients/{id}", ingredientId)
    .exchange()
    .flatMap(cr -> {
        if (cr.headers().header("X_UNAVAILABLE").contains("true")) {
            return Mono.empty();
        }
        return Mono.just(cr);
    })
    .flatMap(cr -> cr.bodyToMono(Ingredient.class));

新的flatMap()调用检查给定的ClientRequest对象头,寻找一个名为X_UNAVAILABLE,值为trueheader。如果找到,它返回一个空的Mono。否则,它将返回包含ClientResponseMono。无论哪种情况,返回的Mono都将平铺成Mono,以便下一个flatMap()的调用操作。

上一页
下一页