阅读(4453) (0)

Micronaut WebSocket 支持

2023-02-23 11:27:02 更新

Micronaut 专门支持创建 WebSocket 客户端和服务器。 io.micronaut.websocket.annotation 包包含用于定义客户端和服务器的注释。

使用@ServerWebSocket

@ServerWebSocket 注释可以应用于应映射到 WebSocket URI 的任何类。以下示例是一个简单的聊天 WebSocket 实现:

WebSocket 聊天示例

 Java Groovy  Kotlin 
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;

import java.util.function.Predicate;

@ServerWebSocket("/chat/{topic}/{username}") // (1)
public class ChatServerWebSocket {

    private final WebSocketBroadcaster broadcaster;

    public ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
        this.broadcaster = broadcaster;
    }

    @OnOpen // (2)
    public void onOpen(String topic, String username, WebSocketSession session) {
        String msg = "[" + username + "] Joined!";
        broadcaster.broadcastSync(msg, isValid(topic, session));
    }

    @OnMessage // (3)
    public void onMessage(String topic, String username,
                          String message, WebSocketSession session) {
        String msg = "[" + username + "] " + message;
        broadcaster.broadcastSync(msg, isValid(topic, session)); // (4)
    }

    @OnClose // (5)
    public void onClose(String topic, String username, WebSocketSession session) {
        String msg = "[" + username + "] Disconnected!";
        broadcaster.broadcastSync(msg, isValid(topic, session));
    }

    private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
        return s -> s != session &&
                topic.equalsIgnoreCase(s.getUriVariables().get("topic", String.class, null));
    }
}
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket

import java.util.function.Predicate

@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket {

    private final WebSocketBroadcaster broadcaster

    ChatServerWebSocket(WebSocketBroadcaster broadcaster) {
        this.broadcaster = broadcaster
    }

    @OnOpen // (2)
    void onOpen(String topic, String username, WebSocketSession session) {
        String msg = "[$username] Joined!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    @OnMessage // (3)
    void onMessage(String topic, String username,
                   String message, WebSocketSession session) {
        String msg = "[$username] $message"
        broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
    }

    @OnClose // (5)
    void onClose(String topic, String username, WebSocketSession session) {
        String msg = "[$username] Disconnected!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    private Predicate<WebSocketSession> isValid(String topic, WebSocketSession session) {
        return { s -> s != session && topic.equalsIgnoreCase(s.uriVariables.get("topic", String, null)) }
    }
}
import io.micronaut.websocket.WebSocketBroadcaster
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import io.micronaut.websocket.annotation.ServerWebSocket

import java.util.function.Predicate

@ServerWebSocket("/chat/{topic}/{username}") // (1)
class ChatServerWebSocket(private val broadcaster: WebSocketBroadcaster) {

    @OnOpen // (2)
    fun onOpen(topic: String, username: String, session: WebSocketSession) {
        val msg = "[$username] Joined!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    @OnMessage // (3)
    fun onMessage(topic: String, username: String,
                  message: String, session: WebSocketSession) {
        val msg = "[$username] $message"
        broadcaster.broadcastSync(msg, isValid(topic, session)) // (4)
    }

    @OnClose // (5)
    fun onClose(topic: String, username: String, session: WebSocketSession) {
        val msg = "[$username] Disconnected!"
        broadcaster.broadcastSync(msg, isValid(topic, session))
    }

    private fun isValid(topic: String, session: WebSocketSession): Predicate<WebSocketSession> {
        return Predicate<WebSocketSession> {
            (it !== session && topic.equals(it.uriVariables.get("topic", String::class.java, null), ignoreCase = true))
        }
    }
}
  1. @ServerWebSocket 注释定义了 WebSocket 映射的路径。 URI 可以是 URI 模板。

  2. @OnOpen 注释声明了在打开 WebSocket 时要调用的方法。

  3. @OnMessage 注释声明了在收到消息时要调用的方法。

  4. 您可以使用 WebSocketBroadcaster 向每个 WebSocket 会话广播消息。您可以使用谓词过滤要发送到的会话。此外,您可以使用 WebSocketSession 实例通过 WebSocketSession::send 向它发送消息。

  5. @OnClose 注释声明了在关闭 WebSocket 时调用的方法。

可以在 Micronaut 指南中找到 WebSockets 的实际工作示例。

对于绑定,每个 WebSocket 方法的方法参数可以是:

  • 来自 URI 模板的变量(在上面的示例中,主题和用户名是 URI 模板变量)

  • WebSocketSession 的实例

@OnClose 方法

@OnClose 方法可以选择接收 CloseReason。 @OnClose 方法在会话关闭之前被调用。

@OnMessage 方法

@OnMessage 方法可以为消息体定义一个参数。该参数可以是以下之一:

  • 一个 Netty WebSocketFrame

  • 任何 Java 基本类型或简单类型(例如 String)。事实上,可以从 ByteBuf 转换的任何类型(您可以注册额外的 TypeConverter beans 以支持自定义类型)。

  • byte[]、ByteBuf 或 Java NIO ByteBuffer。

  • 一个POJO。在这种情况下,默认情况下将使用 JsonMediaTypeCodec 将其解码为 JSON。您可以注册自定义编解码器并使用 @Consumes 注释定义处理程序的内容类型。

  • WebSocketPongMessage。这是一种特殊情况:该方法不会接收常规消息,而是处理作为对发送给客户端的 ping 的答复到达的 WebSocket pong。

@OnError 方法

可以添加一个用@OnError 注解的方法来实现自定义错误处理。 @OnError 方法可以定义一个参数,接收要处理的异常类型。如果不存在@OnError 处理并且发生不可恢复的异常,WebSocket 将自动关闭。

非阻塞消息处理

前面的示例使用 WebSocketBroadcaster 接口的 broadcastSync 方法,该方法会阻塞直到广播完成。 WebSocketSession 中存在类似的 sendSync 方法,以阻塞方式向单个接收者发送消息。但是,您可以通过从每个 WebSocket 处理程序方法返回 Publisher 或 Future 来实现非阻塞 WebSocket 服务器。例如:

WebSocket 聊天示例

 Java Groovy  Kotlin 
@OnMessage
public Publisher<Message> onMessage(String topic, String username,
                                    Message message, WebSocketSession session) {
    String text = "[" + username + "] " + message.getText();
    Message newMessage = new Message(text);
    return broadcaster.broadcast(newMessage, isValid(topic, session));
}
@OnMessage
Publisher<Message> onMessage(String topic, String username,
                             Message message, WebSocketSession session) {
    String text = "[$username] $message.text"
    Message newMessage = new Message(text)
    broadcaster.broadcast(newMessage, isValid(topic, session))
}
@OnMessage
fun onMessage(topic: String, username: String,
              message: Message, session: WebSocketSession): Publisher<Message> {
    val text = "[" + username + "] " + message.text
    val newMessage = Message(text)
    return broadcaster.broadcast(newMessage, isValid(topic, session))
}

上面的示例使用了广播,它创建了 Publisher 的实例并将值返回给 Micronaut。 Micronaut 基于 Publisher 接口异步发送消息。类似的 send 方法通过 Micronaut 返回值异步发送单个消息。

要在 Micronaut 注释的处理程序方法之外异步发送消息,您可以在它们各自的 WebSocketBroadcaster 和 WebSocketSession 接口中使用 broadcastAsync 和 sendAsync 方法。对于阻塞发送,可以使用 broadcastSync 和 sendSync 方法。

@ServerWebSocket 和作用域

默认情况下,@ServerWebSocket 实例为所有 WebSocket 连接共享。必须特别注意同步本地状态以避免线程安全问题。

如果您希望每个连接都有一个实例,请使用@Prototype 注释该类。这使您可以从 @OnOpen 处理程序检索 WebSocketSession 并将其分配给 @ServerWebSocket 实例的字段。

与 HTTP 会话共享会话

WebSocketSession 默认由内存映射支持。如果添加会话模块,则可以在 HTTP 服务器和 WebSocket 服务器之间共享会话。

当会话由 Redis 等持久存储支持时,在处理每条消息后,会话将更新到后备存储。

使用 CLI

如果您使用应用程序类型 Micronaut 应用程序创建项目,则可以将 create-websocket-server 命令与 Micronaut CLI 结合使用来创建一个使用 ServerWebSocket 注释的类。

$ mn create-websocket-server MyChat
| Rendered template WebsocketServer.java to destination src/main/java/example/MyChatServer.java

连接超时

默认情况下,Micronaut 会在五分钟后将没有活动的空闲连接超时。通常这不是问题,因为浏览器会自动重新连接 WebSocket 会话,但是您可以通过设置 micronaut.server.idle-timeout 设置来控制此行为(负值不会导致超时):

设置服务器的连接超时

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.server.idle-timeout=30m
micronaut:
  server:
    idle-timeout: 30m
[micronaut]
  [micronaut.server]
    idle-timeout="30m"
micronaut {
  server {
    idleTimeout = "30m"
  }
}
{
  micronaut {
    server {
      idle-timeout = "30m"
    }
  }
}
{
  "micronaut": {
    "server": {
      "idle-timeout": "30m"
    }
  }
}

如果您使用 Micronaut 的 WebSocket 客户端,您可能还希望在客户端上设置超时:

为客户端设置连接超时

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.read-idle-timeout=30m
micronaut:
  http:
    client:
      read-idle-timeout: 30m
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      read-idle-timeout="30m"
micronaut {
  http {
    client {
      readIdleTimeout = "30m"
    }
  }
}
{
  micronaut {
    http {
      client {
        read-idle-timeout = "30m"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "read-idle-timeout": "30m"
      }
    }
  }
}

使用@ClientWebSocket

@ClientWebSocket 注释可以与 WebSocketClient 接口一起使用来定义 WebSocket 客户端。

您可以使用 @Client 注释注入对 WebSocketClient 的引用:

@Inject
@Client("http://localhost:8080")
WebSocketClient webSocketClient;

这使您可以为 WebSocket 客户端使用相同的服务发现和负载平衡功能。

一旦您获得了对 WebSocketClient 接口的引用,您就可以使用 connect 方法来获取使用 @ClientWebSocket 注释的 bean 的连接实例。

例如考虑以下实现:

WebSocket 聊天示例

 Java Groovy  Kotlin 
import io.micronaut.http.HttpRequest;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import org.reactivestreams.Publisher;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;

@ClientWebSocket("/chat/{topic}/{username}") // (1)
public abstract class ChatClientWebSocket implements AutoCloseable { // (2)

    private WebSocketSession session;
    private HttpRequest request;
    private String topic;
    private String username;
    private Collection<String> replies = new ConcurrentLinkedQueue<>();

    @OnOpen
    public void onOpen(String topic, String username,
                       WebSocketSession session, HttpRequest request) { // (3)
        this.topic = topic;
        this.username = username;
        this.session = session;
        this.request = request;
    }

    public String getTopic() {
        return topic;
    }

    public String getUsername() {
        return username;
    }

    public Collection<String> getReplies() {
        return replies;
    }

    public WebSocketSession getSession() {
        return session;
    }

    public HttpRequest getRequest() {
        return request;
    }

    @OnMessage
    public void onMessage(String message) {
        replies.add(message); // (4)
    }
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future
import io.micronaut.core.async.annotation.SingleResult

@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket implements AutoCloseable { // (2)

    private WebSocketSession session
    private HttpRequest request
    private String topic
    private String username
    private Collection<String> replies = new ConcurrentLinkedQueue<>()

    @OnOpen
    void onOpen(String topic, String username,
                WebSocketSession session, HttpRequest request) { // (3)
        this.topic = topic
        this.username = username
        this.session = session
        this.request = request
    }

    String getTopic() {
        topic
    }

    String getUsername() {
        username
    }

    Collection<String> getReplies() {
        replies
    }

    WebSocketSession getSession() {
        session
    }

    HttpRequest getRequest() {
        request
    }

    @OnMessage
    void onMessage(String message) {
        replies << message // (4)
    }
import io.micronaut.http.HttpRequest
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.ClientWebSocket
import io.micronaut.websocket.annotation.OnMessage
import io.micronaut.websocket.annotation.OnOpen
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Future

@ClientWebSocket("/chat/{topic}/{username}") // (1)
abstract class ChatClientWebSocket : AutoCloseable { // (2)

    var session: WebSocketSession? = null
        private set
    var request: HttpRequest<*>? = null
        private set
    var topic: String? = null
        private set
    var username: String? = null
        private set
    private val replies = ConcurrentLinkedQueue<String>()

    @OnOpen
    fun onOpen(topic: String, username: String,
               session: WebSocketSession, request: HttpRequest<*>) { // (3)
        this.topic = topic
        this.username = username
        this.session = session
        this.request = request
    }

    fun getReplies(): Collection<String> {
        return replies
    }

    @OnMessage
    fun onMessage(message: String) {
        replies.add(message) // (4)
    }
  1. 该类是抽象的(稍后会详细介绍)并使用 @ClientWebSocket 进行注释

  2. 客户端必须实现 AutoCloseable 并且您应该确保连接在某个时候关闭。

  3. 您可以使用与服务器上相同的注释,在本例中为@OnOpen 以获取对底层会话的引用。

  4. @OnMessage 注释定义了从服务器接收响应的方法。

您还可以定义以发送或广播开头的抽象方法,这些方法将在编译时为您实现。例如:

WebSocket 发送方法

public abstract void send(String message);

请注意,通过返回 void 这会告诉 Micronaut 该方法是阻塞发送。您可以改为定义返回期货或发布者的方法:

WebSocket 发送方法

public abstract reactor.core.publisher.Mono<String> send(String message);

上面的示例定义了一个返回 Mono 的发送方法。

WebSocket 发送方法

public abstract java.util.concurrent.Future<String> sendAsync(String message);

上面的示例定义了一个异步执行并返回 Future 以访问结果的发送方法。

一旦定义了客户端类,就可以连接到客户端套接字并开始发送消息:

连接客户端 WebSocket

ChatClientWebSocket chatClient = webSocketClient
    .connect(ChatClientWebSocket.class, "/chat/football/fred")
    .blockFirst();
chatClient.send("Hello World!");

出于说明目的,我们使用 blockFirst() 来获取客户端。然而,可以结合连接(它返回一个 Flux)来通过 WebSocket 执行非阻塞交互。

使用 CLI

如果您使用 Micronaut CLI 和默认(服务)配置文件创建项目,则可以使用 create-websocket-client 命令创建一个带有 WebSocketClient 的抽象类。

$ mn create-websocket-client MyChat
| Rendered template WebsocketClient.java to destination src/main/java/example/MyChatClient.java