消息发布与订阅

消息发布与订阅

Pub/Sub 是分布式系统中的一种常见模式,该系统中有许多服务希望利用解耦、异步消息传递。使用 Pub/Sub,您可以实现事件消费者与事件生产者解耦的场景。

Dapr 提供了一个可扩展的 Pub/Sub 系统,具有 At-Least-Once 保证,允许开发人员发布和订阅主题。Dapr 为 pub/sub 提供了组件,使运营商能够使用他们首选的基础设施,例如 Redis Streams、Kafka 等。

Pub/Sub 组件

组件配置

首先需要设置 Pub/Sub 组件,当运行 dapr init 时,Redis Streams 默认安装在本地机器上。在 Windows 上打开 %UserProfile%/.dapr/components/pubsub.yaml 或在 Linux/MacOS 上打开 ~/.dapr/components/pubsub.yaml 下的组件文件来验证。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: localhost:6379
    - name: redisPassword
      value: ""

消息订阅

然后我们可以以不同的方式订阅消息,声明式和编程式方法都支持相同的功能。声明式方法从你的代码中删除了对 Dapr 的依赖性,例如,允许现有的应用程序订阅主题,而无需更改代码。程序化方法则是在您的代码中实现订阅。

声明式订阅

您可以使用以下自定义资源定义(CRD)订阅一个主题。创建一个名为 subscription.yaml 的文件,并粘贴以下内容。

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: pubsub
scopes:
  - app1
  - app2
  • route 字段告诉 Dapr 将所有主题消息发送到应用程序中的 /dsstatus 端点。
  • scopes 字段为 ID 为 app1 和 app2 的应用程序启用此订阅。

将 CRD 放在你的./components 目录下。当 Dapr 启动时,它会和组件一起加载订阅。注意:默认情况下,Dapr 在 MacOS/Linux 下从 $HOME/.dapr/components 加载组件,在 Windows 下从 %USERPROFILE/%.dapr/components 加载组件。

$ dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py

这里我们使用 Flask 构建基础的应用:

import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

app.run()

类似的 Node.js 的应用声明如下:

const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

const port = 3000;

app.post("/dsstatus", (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`));

编程式订阅

要订阅主题,请用你选择的编程语言启动一个网络服务器,并监听以下 GET 端点 /dapr/subscribe。Dapr 实例在启动时调用到你的应用程序中,并期待一个 JSON 响应的主题订阅:

  • pubsubname: Which pub/sub component Dapr should use.
  • topic: Which topic to subscribe to.
  • route: Which endpoint for Dapr to call on when a message comes to that topic.

使用 Node 实现的代码如下:

const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

const port = 3000;

app.get("/dapr/subscribe", (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "deathStarStatus",
      route: "dsstatus",
    },
  ]);
});

app.post("/dsstatus", (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`));

为了告诉 Dapr 消息已处理成功,请返回 200 OK 响应。如果 Dapr 收到任何其他返回状态码而不是 200,或者如果您的应用程序崩溃,Dapr 将尝试按照 At-Least-Once 语义重新交付消息。

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

发布主题

要发布一个主题,你需要运行一个 Dapr Sidecar 的实例来使用 pubsub Redis 组件。你可以使用安装在本地环境中的默认 Redis 组件。

$ dapr run --app-id testpubsub --dapr-http-port 3500

然后发布消息到 deathStarStatus 主题。

$ dapr publish --publish-app-id testpubapp --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'

$ curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'

当然,我们也可以通过程序化方式发布:

const express = require("express");
const path = require("path");
const request = require("request");
const bodyParser = require("body-parser");

const app = express();
app.use(bodyParser.json());

const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = "pubsub";

app.post("/publish", (req, res) => {
  console.log("Publishing: ", req.body);
  const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
  request({ uri: publishUrl, method: "POST", json: req.body });
  res.sendStatus(200);
});

app.listen(process.env.PORT || port, () =>
  console.log(`Listening on port ${port}!`)
);