阅读(199) (0)

Micronaut 服务器发送事件

2023-02-23 11:26:48 更新

Micronaut HTTP 服务器支持使用事件 API 发出服务器发送事件 (SSE)。

要从服务器发出事件,请返回一个发出事件类型对象的 Reactive Streams Publisher。

发布者本身可以通过事件系统等从后台任务发布事件。

想象一个新闻标题的事件流;你可以定义一个数据类如下:

Headline

 Java Groovy  Kotlin 
public class Headline {

    private String title;
    private String description;

    public Headline() {}

    public Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }

    public String getTitle() {
        return title;
    }

    public String getDescription() {
        return description;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}
class Headline {

    String title
    String description

    Headline() {}

    Headline(String title, String description) {
        this.title = title;
        this.description = description;
    }
}
class Headline {

    var title: String? = null
    var description: String? = null

    constructor()

    constructor(title: String, description: String) {
        this.title = title
        this.description = description
    }
}

要发出新闻标题事件,请使用您喜欢的任何 Reactive 库编写一个控制器,该控制器返回一个事件发布者实例。下面的示例通过 generate 方法使用 Project Reactor 的 Flux:

从控制器发布服务器发送的事件

 Java Groovy  Kotlin 
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Controller("/headlines")
public class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    public Publisher<Event<Headline>> index() { // (1)
        String[] versions = {"1.0", "2.0"}; // (2)
        return Flux.generate(() -> 0, (i, emitter) -> { // (3)
            if (i < versions.length) {
                emitter.next( // (4)
                    Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
                );
            } else {
                emitter.complete(); // (5)
            }
            return ++i;
        });
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux

@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = MediaType.TEXT_EVENT_STREAM)
    Publisher<Event<Headline>> index() { // (1)
        String[] versions = ["1.0", "2.0"] // (2)
        Flux.generate(() -> 0, (i, emitter) -> {
            if (i < versions.length) {
                emitter.next( // (4)
                        Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
                )
            } else {
                emitter.complete() // (5)
            }
            return i + 1
        })
    }
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction


@Controller("/headlines")
class HeadlineController {

    @ExecuteOn(TaskExecutors.IO)
    @Get(produces = [MediaType.TEXT_EVENT_STREAM])
    fun index(): Publisher<Event<Headline>> { // (1)
        val versions = arrayOf("1.0", "2.0") // (2)
        return Flux.generate(
            { 0 },
            BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> ->  // (3)
                if (i < versions.size) {
                    emitter.next( // (4)
                        Event.of(
                            Headline(
                                "Micronaut " + versions[i] + " Released", "Come and get it"
                            )
                        )
                    )
                } else {
                    emitter.complete() // (5)
                }
                return@BiFunction i + 1
            })
    }
}
  1. 控制器方法返回事件的发布者

  2. 为每个版本的 Micronaut 发出一个标题

  3. Flux 类型的 generate 方法生成一个 Publisher。 generate 方法接受初始值和接受该值的 lambda 以及发射器。请注意,此示例与控制器操作在同一线程上执行,但您可以使用 subscribeOn 或映射现有的“热”Flux。

  4. Emitter 接口 onNext 方法发出 Event 类型的对象。 Event.of(ET) 工厂方法构造事件。

  5. Emitter 接口的 onComplete 方法指示何时完成发送服务器发送的事件。

您通常希望在单独的执行程序上安排 SSE 事件流。前面的示例使用@ExecuteOn 在 I/O 执行器上执行流。

上面的示例发回一个文本/事件流类型的响应,对于每个发出的事件,之前的标题类型将被转换为 JSON,从而产生如下响应:

服务器发送的事件响应输出

 data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
 data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}

可以使用Event接口的方法自定义发回的Server Sent Event数据,包括关联事件id、注释、重试超时时间等。