阅读(1499) (0)

Micronaut 使用低级 HTTP 客户端

2023-02-23 13:36:05 更新

HttpClient 接口构成了低级 API 的基础。此接口声明有助于简化执行 HTTP 请求和接收响应的方法。

HttpClient 接口中的大多数方法都返回 Reactive Streams Publisher 实例,这并不总是最有用的接口。

Micronaut 的 Reactor HTTP Client 依赖项附带一个名为 ReactorHttpClient 的子接口。它提供了返回 Project Reactor Flux 类型的 HttpClient 接口的变体。

发送您的第一个 HTTP 请求

获取 HttpClient

有几种方法可以获取对 HttpClient 的引用。最常见的是使用 Client 注释。例如:

注入 HTTP 客户端

@Client("https://api.twitter.com/1.1") @Inject HttpClient httpClient;

上面的示例注入了一个以 Twitter API 为目标的客户端。

@field:Client("\${myapp.api.twitter.url}") @Inject lateinit var httpClient: HttpClient

上面的 Kotlin 示例使用配置路径注入了一个以 Twitter API 为目标的客户端。请注意“\${path.to.config}”上所需的转义(反斜杠),这是由于 Kotlin 字符串插值所必需的。

Client 注释也是一个自定义范围,用于管理 HttpClient 实例的创建并确保它们在应用程序关闭时停止。

您传递给 Client 注释的值可以是以下之一:

  • 绝对 URI,例如https://api.twitter.com/1.1

  • 相对 URI,在这种情况下目标服务器将是当前服务器(用于测试)

  • 服务标识符。

另一种创建 HttpClient 的方法是使用 HttpClient 的静态创建方法,但是不推荐使用这种方法,因为您必须确保手动关闭客户端,当然创建的客户端不会发生依赖注入。

执行 HTTP GET

使用 HttpClient 时,通常有两种感兴趣的方法。第一个是 retrieve,它执行一个 HTTP 请求并以您作为 Publisher 请求的任何类型(默认为 String)返回正文。

retrieve 方法接受一个 HttpRequest 或一个字符串 URI 到您希望请求的端点。

以下示例显示如何使用 retrieve 执行 HTTP GET 并将响应主体作为字符串接收:

使用检索

 Java Groovy  Kotlin 
String uri = UriBuilder.of("/hello/{name}")
                       .expand(Collections.singletonMap("name", "John"))
                       .toString();
assertEquals("/hello/John", uri);

String result = client.toBlocking().retrieve(uri);

assertEquals("Hello John", result);
when:
String uri = UriBuilder.of("/hello/{name}")
                       .expand(name: "John")
then:
"/hello/John" == uri

when:
String result = client.toBlocking().retrieve(uri)

then:
"Hello John" == result
val uri = UriBuilder.of("/hello/{name}")
                    .expand(Collections.singletonMap("name", "John"))
                    .toString()
uri shouldBe "/hello/John"

val result = client.toBlocking().retrieve(uri)

result shouldBe "Hello John"

请注意,在此示例中,出于说明目的,我们调用 toBlocking() 以返回客户端的阻塞版本。但是,在生产代码中,您不应该这样做,而应该依赖 Micronaut HTTP 服务器的非阻塞特性。

例如,以下 @Controller 方法以非阻塞方式调用另一个端点:

不阻塞地使用 HTTP 客户端

 Java Groovy  Kotlin 
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Status;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import io.micronaut.core.async.annotation.SingleResult;
import static io.micronaut.http.HttpRequest.GET;
import static io.micronaut.http.HttpStatus.CREATED;
import static io.micronaut.http.MediaType.TEXT_PLAIN;

@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
    return Mono.from(httpClient.retrieve(GET("/hello/" + name))); // (2)
}
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Mono
import static io.micronaut.http.HttpRequest.GET
import static io.micronaut.http.HttpStatus.CREATED
import static io.micronaut.http.MediaType.TEXT_PLAIN

@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
    Mono.from(httpClient.retrieve( GET("/hello/" + name))) // (2)
}
import io.micronaut.http.HttpRequest.GET
import io.micronaut.http.HttpStatus.CREATED
import io.micronaut.http.MediaType.TEXT_PLAIN
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import io.micronaut.core.async.annotation.SingleResult

@Get("/hello/{name}")
@SingleResult
internal fun hello(name: String): Publisher<String> { // (1)
    return Flux.from(httpClient.retrieve(GET<Any>("/hello/$name")))
                     .next() // (2)
}
  1. hello 方法返回一个 Mono,它可能会或可能不会发出一个项目。如果未发出某个项目,则返回 404。

  2. 检索方法被调用,它返回一个 Flux。这有一个 firstElement 方法返回第一个发出的项目或什么都不返回

使用 Reactor(如果您愿意,也可以使用 RxJava),您可以轻松高效地编写多个 HTTP 客户端调用,而不会阻塞(这会限制您的应用程序的吞吐量和可扩展性)。

调试/跟踪 HTTP 客户端

要调试从 HTTP 客户端发送和接收的请求,您可以通过 logback.xml 文件启用跟踪日志记录:

logback.xml

<logger name="io.micronaut.http.client" level="TRACE"/>

客户端特定调试/跟踪

要启用特定于客户端的日志记录,您可以为所有 HTTP 客户端配置默认记录器。您还可以使用特定于客户端的配置为不同的客户端配置不同的记录器。例如,在您的配置文件(例如 application.yml)中:

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.logger-name=mylogger
micronaut.http.services.otherClient.logger-name=other.client
micronaut:
  http:
    client:
      logger-name: mylogger
    services:
      otherClient:
        logger-name: other.client
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      logger-name="mylogger"
    [micronaut.http.services]
      [micronaut.http.services.otherClient]
        logger-name="other.client"
micronaut {
  http {
    client {
      loggerName = "mylogger"
    }
    services {
      otherClient {
        loggerName = "other.client"
      }
    }
  }
}
{
  micronaut {
    http {
      client {
        logger-name = "mylogger"
      }
      services {
        otherClient {
          logger-name = "other.client"
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "logger-name": "mylogger"
      },
      "services": {
        "otherClient": {
          "logger-name": "other.client"
        }
      }
    }
  }
}

然后在 logback.xml 中启用日志记录:

logback.xml

<logger name="mylogger" level="DEBUG"/>
<logger name="other.client" level="TRACE"/>

自定义 HTTP 请求

前面的示例演示了使用 HttpRequest 接口的静态方法来构造 MutableHttpRequest 实例。顾名思义,MutableHttpRequest 可以改变,包括添加标头、自定义请求正文等的能力。例如:

传递一个 HttpRequest 来检索

 Java Groovy  Kotlin 
Flux<String> response = Flux.from(client.retrieve(
        GET("/hello/John")
        .header("X-My-Header", "SomeValue")
));
Flux<String> response = Flux.from(client.retrieve(
        GET("/hello/John")
        .header("X-My-Header", "SomeValue")
))
val response = client.retrieve(
        GET<Any>("/hello/John")
                .header("X-My-Header", "SomeValue")
)

上面的示例在发送之前向响应添加一个标头(X-My-Header)。 MutableHttpRequest 接口有更多方便的方法,可以很容易地以常用的方式修改请求。

读取 JSON 响应

微服务通常使用 JSON 等消息编码格式。 Micronaut 的 HTTP 客户端利用 Jackson 进行 JSON 解析,因此 Jackson 可以解码的任何类型都可以作为第二个参数传递给检索。

例如,考虑以下返回 JSON 响应的 @Controller 方法:

从控制器返回 JSON

 Java Groovy  Kotlin 
@Get("/greet/{name}")
Message greet(String name) {
    return new Message("Hello " + name);
}
@Get("/greet/{name}")
Message greet(String name) {
    new Message("Hello $name")
}
@Get("/greet/{name}")
internal fun greet(name: String): Message {
    return Message("Hello $name")
}

上面的方法返回一个 Message 类型的 POJO,如下所示:

Message POJO

 Java Groovy  Kotlin 
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Message {

    private final String text;

    @JsonCreator
    public Message(@JsonProperty("text") String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }
}
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

class Message {

    final String text

    @JsonCreator
    Message(@JsonProperty("text") String text) {
        this.text = text
    }
}
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

class Message @JsonCreator
constructor(@param:JsonProperty("text") val text: String)

Jackson注解用于映射构造函数

在客户端,您可以调用此端点并使用 retrieve 方法将 JSON 解码为映射,如下所示:

将响应主体解码为 Map

 Java Groovy  Kotlin 
response = Flux.from(client.retrieve(
        GET("/greet/John"),
        Argument.of(Map.class, String.class, String.class) // (1)
));
response = Flux.from(client.retrieve(
        GET("/greet/John"),
        Argument.of(Map, String, String) // (1)
))
var response: Flux<Map<*, *>> = Flux.from(client.retrieve(
        GET<Any>("/greet/John"), Map::class.java
))
  1. Argument.of 方法返回一个 Map,其中键和值类型为 String

虽然检索 JSON 作为映射可能是可取的,但通常您希望将对象解码为 POJO。为此,请改为传递类型:

将响应主体解码为 POJO

 Java Groovy  Kotlin 
Flux<Message> response = Flux.from(client.retrieve(
        GET("/greet/John"), Message.class
));

assertEquals("Hello John", response.blockFirst().getText());
when:
Flux<Message> response = Flux.from(client.retrieve(
        GET("/greet/John"), Message
))

then:
"Hello John" == response.blockFirst().getText()
val response = Flux.from(client.retrieve(
        GET<Any>("/greet/John"), Message::class.java
))

response.blockFirst().text shouldBe "Hello John"

请注意如何在客户端和服务器上使用相同的 Java 类型。这意味着您通常会定义一个公共 API 项目,在该项目中定义用于定义 API 的接口和类型。

解码其他内容类型

如果您与之通信的服务器使用非 JSON 的自定义内容类型,默认情况下 Micronaut 的 HTTP 客户端将不知道如何解码这种类型。

要解决此问题,请将 MediaTypeCodec 注册为一个 bean,它会被自动拾取并用于解码(或编码)消息。

接收完整的 HTTP 响应

有时仅接收响应主体是不够的,您还需要响应中的其他信息,例如标头、cookie 等。在这种情况下,不要使用 retrieve 方法,而是使用 exchange 方法:

接收完整的 HTTP 响应

 Java Groovy  Kotlin 
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        GET("/greet/John"), Message.class // (1)
));

HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message.class); // (2)
// check the status
assertEquals(HttpStatus.OK, response.getStatus()); // (3)
// check the body
assertTrue(message.isPresent());
assertEquals("Hello John", message.get().getText());
when:
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        GET("/greet/John"), Message // (1)
))

HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message) // (2)
// check the status
then:
HttpStatus.OK == response.getStatus() // (3)
// check the body
message.isPresent()
"Hello John" == message.get().getText()
val call = client.exchange(
        GET<Any>("/greet/John"), Message::class.java // (1)
)

val response = Flux.from(call).blockFirst()
val message = response.getBody(Message::class.java) // (2)
// check the status
response.status shouldBe HttpStatus.OK // (3)
// check the body
message.isPresent shouldBe true
message.get().text shouldBe "Hello John"
  1. 交换方法接收 HttpResponse

  2. 使用响应的 getBody(..) 方法检索正文

  3. 可以检查响应的其他方面,例如 HttpStatus

上面的示例接收完整的 HttpResponse,您可以从中获取标头和其他有用信息。

发布请求正文

到目前为止,所有示例都使用了相同的 HTTP 方法,即 GET。 HttpRequest 接口具有适用于所有不同 HTTP 方法的工厂方法。下表总结了它们:

表 1. HttpRequest 工厂方法
方法 描述 Allows Body

HttpRequest.GET(java.lang.String)

构造一个 HTTP GET 请求

false

HttpRequest.OPTIONS(java.lang.String)

构造一个 HTTP OPTIONS 请求

false

HttpRequest.HEAD(java.lang.String)

构造一个 HTTP HEAD 请求

false

HttpRequest.POST(java.lang.String,T)

构造一个 HTTP POST 请求

true

HttpRequest.PUT(java.lang.String,T)

构造一个 HTTP PUT 请求

true

HttpRequest.PATCH(java.lang.String,T)

构造一个 HTTP PATCH 请求

true

HttpRequest.DELETE(java.lang.String)

构造一个 HTTP DELETE 请求

true

还存在一个创建方法来构造任何 HttpMethod 类型的请求。由于 POST、PUT 和 PATCH 方法需要主体,因此需要第二个参数,即主体对象。

以下示例演示了如何发送简单的 String 正文:

发送字符串正文

 Java Groovy  Kotlin 
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
        POST("/hello", "Hello John") // (1)
            .contentType(MediaType.TEXT_PLAIN_TYPE)
            .accept(MediaType.TEXT_PLAIN_TYPE), // (2)
        String.class // (3)
));
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
        POST("/hello", "Hello John") // (1)
            .contentType(MediaType.TEXT_PLAIN_TYPE)
            .accept(MediaType.TEXT_PLAIN_TYPE), // (2)
        String // (3)
))
val call = client.exchange(
        POST("/hello", "Hello John") // (1)
                .contentType(MediaType.TEXT_PLAIN_TYPE)
                .accept(MediaType.TEXT_PLAIN_TYPE), String::class.java // (3)
)
  1. 使用POST方法;第一个参数是 URI,第二个是主体

  2. 内容类型和接受类型设置为text/plain(默认为application/json)

  3. 预期的响应类型是 String

发送 JSON

前面的示例发送纯文本。要发送 JSON,将要编码的对象传递给 JSON(无论是 Map 还是 POJO),只要 Jackson 能够对其进行编码。

例如,您可以从上一节创建一个 Message 并将其传递给 POST 方法:

Sending a JSON body

 Java Groovy  Kotlin 
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        POST("/greet", new Message("Hello John")), // (1)
        Message.class // (2)
));
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        POST("/greet", new Message("Hello John")), // (1)
        Message // (2)
))
val call = client.exchange(
        POST("/greet", Message("Hello John")), Message::class.java // (2)
)
  1. 创建 Message 实例并将其传递给 POST 方法

  2. 同一个类解码响应

在上面的示例中,以下 JSON 作为请求的主体发送:

Resulting JSON

{"text":"Hello John"}

可以使用 Jackson Annotations 自定义 JSON。

使用 URI 模板

如果在 URI 中包含对象的某些属性,则可以使用 URI 模板。

例如,假设您有一个带有 title 属性的 Book 类。您可以在 URI 模板中包含标题,然后从 Book 的实例中填充它。例如:

Sending a JSON body with a URI template

 Java Groovy  Kotlin 
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand")),
        Book.class
));
Flux<HttpResponse<Book>> call = client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand")),
        Book
);
val call = client.exchange(
        POST("/amazon/book/{title}", Book("The Stand")),
        Book::class.java
)

在上述情况下,title 属性包含在 URI 中。

发送表单数据

您还可以将 POJO 或地图编码为表单数据而不是 JSON。只需在发布请求中将内容类型设置为 application/x-www-form-urlencoded:

Sending a Form Data

 Java Groovy  Kotlin 
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand"))
        .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book.class
));
Flux<HttpResponse<Book>> call = client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand"))
        .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book
)
val call = client.exchange(
        POST("/amazon/book/{title}", Book("The Stand"))
                .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book::class.java
)

请注意,Jackson 也可以绑定表单数据,因此要自定义绑定过程,请使用 Jackson 注释。

多部分客户端上传

Micronaut HTTP 客户端支持多部分请求。要构建多部分请求,请将内容类型设置为 multipart/form-data 并将正文设置为 MultipartBody 的实例。

例如:

Creating the body

 Java Groovy  Kotlin 
import io.micronaut.http.client.multipart.MultipartBody;

String toWrite = "test file";
File file = File.createTempFile("data", ".txt");
FileWriter writer = new FileWriter(file);
writer.write(toWrite);
writer.close();

MultipartBody requestBody = MultipartBody.builder()     // (1)
        .addPart(                                       // (2)
            "data",
            file.getName(),
            MediaType.TEXT_PLAIN_TYPE,
            file
        ).build();                                      // (3)
import io.micronaut.http.multipart.CompletedFileUpload
import io.micronaut.http.multipart.StreamingFileUpload
import io.micronaut.http.client.multipart.MultipartBody
import org.reactivestreams.Publisher

File file = new File(uploadDir, "data.txt")
file.text = "test file"
file.createNewFile()


MultipartBody requestBody = MultipartBody.builder()     // (1)
        .addPart(                                       // (2)
            "data",
            file.name,
            MediaType.TEXT_PLAIN_TYPE,
            file
        ).build()                                       // (3)
import io.micronaut.http.client.multipart.MultipartBody

val toWrite = "test file"
val file = File.createTempFile("data", ".txt")
val writer = FileWriter(file)
writer.write(toWrite)
writer.close()

val requestBody = MultipartBody.builder()     // (1)
        .addPart(                             // (2)
                "data",
                file.name,
                MediaType.TEXT_PLAIN_TYPE,
                file
        ).build()                             // (3)
  1. 创建一个 MultipartBody 构建器,用于向主体添加部件。

  2. 将一个部分添加到正文中,在本例中是一个文件。此方法在 MultipartBody.Builder 中有不同的变体。

  3. build 方法将构建器中的所有部件组装成一个 MultipartBody。至少需要一个部分。

创建请求

 Java Groovy  Kotlin 
HttpRequest.POST("/multipart/upload", requestBody)    // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
HttpRequest.POST("/multipart/upload", requestBody)      // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
HttpRequest.POST("/multipart/upload", requestBody)    // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
  1. 具有不同类型数据的多部分请求正文。

  2. 将请求的内容类型标头设置为 multipart/form-data。

通过 HTTP 流式传输 JSON

Micronaut 的 HTTP 客户端支持通过 ReactorStreamingHttpClient 接口通过 HTTP 流式传输数据,该接口包括特定于流式传输的方法,包括:

表 1. HTTP 流媒体方法
方法 描述

dataStream(HttpRequest<I> request)

将数据流作为 ByteBuffer 的 Flux 返回

exchangeStream(HttpRequest<I> request)

返回包装 ByteBuffer 的 Flux 的 HttpResponse

jsonStream(HttpRequest<I> request)

返回一个非阻塞的 JSON 对象流

要使用 JSON 流,请在服务器上声明一个控制器方法,该方法返回 JSON 对象的 application/x-json-stream。例如:

Streaming JSON on the Server

 Java Groovy  Kotlin 
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Publisher<Headline> streamHeadlines() {
    return Mono.fromCallable(() -> {  // (2)
        Headline headline = new Headline();
        headline.setText("Latest Headline at " + ZonedDateTime.now());
        return headline;
    }).repeat(100) // (3)
      .delayElements(Duration.of(1, ChronoUnit.SECONDS)); // (4)
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Flux<Headline> streamHeadlines() {
    Mono.fromCallable({ // (2)
        new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
    }).repeat(100) // (3)
            .delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit.SECONDS

@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) // (1)
internal fun streamHeadlines(): Flux<Headline> {
    return Mono.fromCallable { // (2)
        val headline = Headline()
        headline.text = "Latest Headline at ${ZonedDateTime.now()}"
        headline
    }.repeat(100) // (3)
     .delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
  1. streamHeadlines 方法产生 application/x-json-stream

  2. Flux 是从 Callable 函数创建的(注意函数内不会发生阻塞,所以这没问题,否则你应该订阅 I/O 线程池)。

  3. Flux 重复 100 次

  4. Flux 发射物品,每个物品之间有 1 秒的延迟

服务器不必用 Micronaut 编写,任何支持 JSON 流的服务器都可以。

然后在客户端上,使用 jsonStream 订阅流,每次服务器发出 JSON 对象时,客户端都会解码并使用它:

在客户端流式传输 JSON

 Java Groovy  Kotlin 
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
        GET("/streaming/headlines"), Headline.class)); // (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(1); // (3)
    }

    @Override
    public void onNext(Headline headline) {
        System.out.println("Received Headline = " + headline.getText());
        future.complete(headline); // (4)
    }

    @Override
    public void onError(Throwable t) {
        future.completeExceptionally(t); // (5)
    }

    @Override
    public void onComplete() {
        // no-op // (6)
    }
});
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
        GET("/streaming/headlines"), Headline)) // (1)
CompletableFuture<Headline> future = new CompletableFuture<>() // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
    @Override
    void onSubscribe(Subscription s) {
        s.request(1) // (3)
    }

    @Override
    void onNext(Headline headline) {
        println "Received Headline = $headline.text"
        future.complete(headline) // (4)
    }

    @Override
    void onError(Throwable t) {
        future.completeExceptionally(t) // (5)
    }

    @Override
    void onComplete() {
        // no-op // (6)
    }
})
val headlineStream = client.jsonStream(
    GET<Any>("/streaming/headlines"), Headline::class.java) // (1)
val future = CompletableFuture<Headline>() // (2)
headlineStream.subscribe(object : Subscriber<Headline> {
    override fun onSubscribe(s: Subscription) {
        s.request(1) // (3)
    }

    override fun onNext(headline: Headline) {
        println("Received Headline = ${headline.text!!}")
        future.complete(headline) // (4)
    }

    override fun onError(t: Throwable) {
        future.completeExceptionally(t) // (5)
    }

    override fun onComplete() {
        // no-op // (6)
    }
})
  1. jsonStream 方法返回一个 Flux

  2. CompletableFuture 用于接收值,但是您对每个发出的项目执行的操作是特定于应用程序的

  3. 订阅请求单个项目。您可以使用订阅来调节背压和需求。

  4. onNext 方法在一个项目被发出时被调用

  5. 发生错误时调用 onError 方法

  6. onComplete 方法在所有 Headline 实例发出后被调用

请注意,上例中的服务器和客户端都不执行任何阻塞 I/O。

配置 HTTP 客户端

所有客户端的全局配置

默认的 HTTP 客户端配置是一个名为 DefaultHttpClientConfiguration 的配置属性,它允许为所有 HTTP 客户端配置默认行为。例如,在您的配置文件中(例如 application.yml):

更改默认 HTTP 客户端配置

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.read-timeout=5s
micronaut:
  http:
    client:
      read-timeout: 5s
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      read-timeout="5s"
micronaut {
  http {
    client {
      readTimeout = "5s"
    }
  }
}
{
  micronaut {
    http {
      client {
        read-timeout = "5s"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "read-timeout": "5s"
      }
    }
  }
}

上面的示例设置了 HttpClientConfiguration 类的 readTimeout 属性。

客户端特定配置

要为每个客户端单独配置,有几个选项。您可以在配置文件(例如 application.yml)中手动配置服务发现并应用每个客户端配置:

手动配置 HTTP 服务

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.read-timeout=5s
micronaut:
  http:
    services:
      foo:
        urls:
          - http://foo1
          - http://foo2
        read-timeout: 5s
[micronaut]
  [micronaut.http]
    [micronaut.http.services]
      [micronaut.http.services.foo]
        urls=[
          "http://foo1",
          "http://foo2"
        ]
        read-timeout="5s"
micronaut {
  http {
    services {
      foo {
        urls = ["http://foo1", "http://foo2"]
        readTimeout = "5s"
      }
    }
  }
}
{
  micronaut {
    http {
      services {
        foo {
          urls = ["http://foo1", "http://foo2"]
          read-timeout = "5s"
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "services": {
        "foo": {
          "urls": ["http://foo1", "http://foo2"],
          "read-timeout": "5s"
        }
      }
    }
  }
}
  • 读取超时应用于 foo 客户端。

警告:此客户端配置可以与 @Client 注释结合使用,通过直接注入 HttpClient 或在客户端界面上使用。在任何情况下,注释上的所有其他属性都将被忽略,除了服务 id。

然后,注入指定的客户端配置:

注入 HTTP 客户端

@Client("foo") @Inject ReactorHttpClient httpClient;

您还可以定义一个从 HttpClientConfiguration 扩展的 bean,并确保 javax.inject.Named 注释适当地命名它:

定义 HTTP 客户端配置 bean

@Named("twitter")
@Singleton
class TwitterHttpClientConfiguration extends HttpClientConfiguration {
   public TwitterHttpClientConfiguration(ApplicationConfiguration configuration) {
        super(configuration);
    }
}

如果您使用服务发现使用 @Client 注入名为 twitter 的服务,则将选择此配置:

注入 HTTP 客户端

@Client("twitter") @Inject ReactorHttpClient httpClient;

或者,如果您不使用服务发现,则可以使用 @Client 的配置成员来引用特定类型:

注入 HTTP 客户端

@Client(value = "https://api.twitter.com/1.1",
        configuration = TwitterHttpClientConfiguration.class)
@Inject
ReactorHttpClient httpClient;

使用 HTTP 客户端连接池

处理大量请求的客户端将受益于启用 HTTP 客户端连接池。以下配置为 foo 客户端启用池化:

手动配置 HTTP 服务

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.pool.enabled=true
micronaut.http.services.foo.pool.max-connections=50
micronaut:
  http:
    services:
      foo:
        urls:
          - http://foo1
          - http://foo2
        pool:
          enabled: true
          max-connections: 50
[micronaut]
  [micronaut.http]
    [micronaut.http.services]
      [micronaut.http.services.foo]
        urls=[
          "http://foo1",
          "http://foo2"
        ]
        [micronaut.http.services.foo.pool]
          enabled=true
          max-connections=50
micronaut {
  http {
    services {
      foo {
        urls = ["http://foo1", "http://foo2"]
        pool {
          enabled = true
          maxConnections = 50
        }
      }
    }
  }
}
{
  micronaut {
    http {
      services {
        foo {
          urls = ["http://foo1", "http://foo2"]
          pool {
            enabled = true
            max-connections = 50
          }
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "services": {
        "foo": {
          "urls": ["http://foo1", "http://foo2"],
          "pool": {
            "enabled": true,
            "max-connections": 50
          }
        }
      }
    }
  }
}
  • pool 启用池并为其设置最大连接数

有关可用池配置选项的详细信息,请参阅 ConnectionPoolConfiguration 的 API。

配置事件循环组

默认情况下,Micronaut 为工作线程和所有 HTTP 客户端线程共享一个通用的 Netty EventLoopGroup。

这个 EventLoopGroup 可以通过 micronaut.netty.event-loops.default 属性进行配置:

配置默认事件循环

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.netty.event-loops.default.num-threads=10
micronaut.netty.event-loops.default.prefer-native-transport=true
micronaut:
  netty:
    event-loops:
      default:
        num-threads: 10
        prefer-native-transport: true
[micronaut]
  [micronaut.netty]
    [micronaut.netty.event-loops]
      [micronaut.netty.event-loops.default]
        num-threads=10
        prefer-native-transport=true
micronaut {
  netty {
    eventLoops {
      'default' {
        numThreads = 10
        preferNativeTransport = true
      }
    }
  }
}
{
  micronaut {
    netty {
      event-loops {
        default {
          num-threads = 10
          prefer-native-transport = true
        }
      }
    }
  }
}
{
  "micronaut": {
    "netty": {
      "event-loops": {
        "default": {
          "num-threads": 10,
          "prefer-native-transport": true
        }
      }
    }
  }
}

您还可以使用 micronaut.netty.event-loops 设置来配置一个或多个额外的事件循环。下表总结了属性:

表 1. DefaultEventLoopGroupConfiguration 的配置属性
属性 类型 描述

micronaut.netty.event-loops.*.num-threads

int

micronaut.netty.event-loops.*.io-ratio

java.lang.Integer

micronaut.netty.event-loops.*.prefer-native-transport

boolean

micronaut.netty.event-loops.*.executor

java.lang.String

micronaut.netty.event-loops.*.shutdown-quiet-period

java.time.Duration

micronaut.netty.event-loops.*.shutdown-timeout

java.time.Duration

例如,如果您与 HTTP 客户端的交互涉及 CPU 密集型工作,则可能值得为一个或所有客户端配置一个单独的 EventLoopGroup。

以下示例配置了一个名为“other”的附加事件循环组,其中包含 10 个线程:

配置额外的事件循环

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.netty.event-loops.other.num-threads=10
micronaut.netty.event-loops.other.prefer-native-transport=true
micronaut:
  netty:
    event-loops:
      other:
        num-threads: 10
        prefer-native-transport: true
[micronaut]
  [micronaut.netty]
    [micronaut.netty.event-loops]
      [micronaut.netty.event-loops.other]
        num-threads=10
        prefer-native-transport=true
micronaut {
  netty {
    eventLoops {
      other {
        numThreads = 10
        preferNativeTransport = true
      }
    }
  }
}
{
  micronaut {
    netty {
      event-loops {
        other {
          num-threads = 10
          prefer-native-transport = true
        }
      }
    }
  }
}
{
  "micronaut": {
    "netty": {
      "event-loops": {
        "other": {
          "num-threads": 10,
          "prefer-native-transport": true
        }
      }
    }
  }
}

配置附加事件循环后,您可以更改 HTTP 客户端配置以使用它:

改变客户端使用的事件循环组

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.event-loop-group=other
micronaut:
  http:
    client:
      event-loop-group: other
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      event-loop-group="other"
micronaut {
  http {
    client {
      eventLoopGroup = "other"
    }
  }
}
{
  micronaut {
    http {
      client {
        event-loop-group = "other"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "event-loop-group": "other"
      }
    }
  }
}

错误响应

如果返回代码为 400 或更高的 HTTP 响应,则会创建 HttpClientResponseException。异常包含原始响应。如何抛出异常取决于方法的返回类型。

对于阻塞客户端,抛出异常并应由调用者捕获和处理。对于反应式客户端,异常作为错误通过发布者传递。

绑定错误

如果请求成功,您通常希望使用端点并绑定到 POJO,如果发生错误则绑定到不同的 POJO。以下示例显示如何调用具有成功和错误类型的交换。

 Java Groovy  Kotlin 
@Controller("/books")
public class BooksController {

    @Get("/{isbn}")
    public HttpResponse find(String isbn) {
        if (isbn.equals("1680502395")) {
            Map<String, Object> m = new HashMap<>();
            m.put("status", 401);
            m.put("error", "Unauthorized");
            m.put("message", "No message available");
            m.put("path", "/books/" + isbn);
            return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m);
        }

        return HttpResponse.ok(new Book("1491950358", "Building Microservices"));
    }
}
@Controller("/books")
class BooksController {

    @Get("/{isbn}")
    HttpResponse find(String isbn) {
        if (isbn == "1680502395") {
            Map<String, Object> m = [
                    status : 401,
                    error  : "Unauthorized",
                    message: "No message available",
                    path   : "/books/" + isbn]
            return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m)
        }

        return HttpResponse.ok(new Book("1491950358", "Building Microservices"))
    }
}
@Controller("/books")
class BooksController {

    @Get("/{isbn}")
    fun find(isbn: String): HttpResponse<*> {
        if (isbn == "1680502395") {
            val m = mapOf(
                "status" to 401,
                "error" to "Unauthorized",
                "message" to "No message available",
                "path" to "/books/$isbn"
            )
            return HttpResponse.status<Any>(HttpStatus.UNAUTHORIZED).body(m)
        }

        return HttpResponse.ok(Book("1491950358", "Building Microservices"))
    }
}
 Java Groovy  Kotlin 
@Test
public void afterAnHttpClientExceptionTheResponseBodyCanBeBoundToAPOJO() {
    try {
        client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
                Argument.of(Book.class), // (1)
                Argument.of(CustomError.class)); // (2)
    } catch (HttpClientResponseException e) {
        assertEquals(HttpStatus.UNAUTHORIZED, e.getResponse().getStatus());
        Optional<CustomError> jsonError = e.getResponse().getBody(CustomError.class);
        assertTrue(jsonError.isPresent());
        assertEquals(401, jsonError.get().status);
        assertEquals("Unauthorized", jsonError.get().error);
        assertEquals("No message available", jsonError.get().message);
        assertEquals("/books/1680502395", jsonError.get().path);
    }
}
def "after an HttpClientException the response body can be bound to a POJO"() {
    when:
    client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
            Argument.of(Book), // (1)
            Argument.of(CustomError)) // (2)

    then:
    def e = thrown(HttpClientResponseException)
    e.response.status == HttpStatus.UNAUTHORIZED

    when:
    Optional<CustomError> jsonError = e.response.getBody(CustomError)

    then:
    jsonError.isPresent()
    jsonError.get().status == 401
    jsonError.get().error == 'Unauthorized'
    jsonError.get().message == 'No message available'
    jsonError.get().path == '/books/1680502395'
}
"after an httpclient exception the response body can be bound to a POJO" {
    try {
        client.toBlocking().exchange(HttpRequest.GET<Any>("/books/1680502395"),
                Argument.of(Book::class.java), // (1)
                Argument.of(CustomError::class.java)) // (2)
    } catch (e: HttpClientResponseException) {
        e.response.status shouldBe HttpStatus.UNAUTHORIZED
    }
}
  1. 成功类型
  2. 错误类型