消息发布与订阅
消息发布与订阅
Pub/Sub 是分布式系统中的一种常见模式,该系统中有许多服务希望利用解耦、异步消息传递。使用 Pub/Sub,您可以实现事件消费者与事件生产者解耦的场景。
Dapr 提供了一个可扩展的 Pub/Sub 系统,具有 At-Least-Once 保证,允许开发人员发布和订阅主题。Dapr 为 pub/sub 提供了组件,使运营商能够使用他们首选的基础设施,例如 Redis Streams、Kafka 等。
组件配置
首先需要设置 Pub/Sub 组件,当运行 dapr init 时,Redis Streams 默认安装在本地机器上。在 Windows 上打开 %UserProfile%/.dapr/components/pubsub.yaml 或在 Linux/MacOS 上打开 ~/.dapr/components/pubsub.yaml 下的组件文件来验证。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
消息订阅
然后我们可以以不同的方式订阅消息,声明式和编程式方法都支持相同的功能。声明式方法从你的代码中删除了对 Dapr 的依赖性,例如,允许现有的应用程序订阅主题,而无需更改代码。程序化方法则是在您的代码中实现订阅。
声明式订阅
您可以使用以下自定义资源定义(CRD)订阅一个主题。创建一个名为 subscription.yaml 的文件,并粘贴以下内容。
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
- route 字段告诉 Dapr 将所有主题消息发送到应用程序中的 /dsstatus 端点。
- scopes 字段为 ID 为 app1 和 app2 的应用程序启用此订阅。
将 CRD 放在你的./components 目录下。当 Dapr 启动时,它会和组件一起加载订阅。注意:默认情况下,Dapr 在 MacOS/Linux 下从 $HOME/.dapr/components 加载组件,在 Windows 下从 %USERPROFILE/%.dapr/components 加载组件。
$ dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py
这里我们使用 Flask 构建基础的应用:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
类似的 Node.js 的应用声明如下:
const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));
const port = 3000;
app.post("/dsstatus", (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`));
编程式订阅
要订阅主题,请用你选择的编程语言启动一个网络服务器,并监听以下 GET 端点 /dapr/subscribe。Dapr 实例在启动时调用到你的应用程序中,并期待一个 JSON 响应的主题订阅:
- pubsubname: Which pub/sub component Dapr should use.
- topic: Which topic to subscribe to.
- route: Which endpoint for Dapr to call on when a message comes to that topic.
使用 Node 实现的代码如下:
const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));
const port = 3000;
app.get("/dapr/subscribe", (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "deathStarStatus",
route: "dsstatus",
},
]);
});
app.post("/dsstatus", (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`));
为了告诉 Dapr 消息已处理成功,请返回 200 OK 响应。如果 Dapr 收到任何其他返回状态码而不是 200,或者如果您的应用程序崩溃,Dapr 将尝试按照 At-Least-Once 语义重新交付消息。
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
发布主题
要发布一个主题,你需要运行一个 Dapr Sidecar 的实例来使用 pubsub Redis 组件。你可以使用安装在本地环境中的默认 Redis 组件。
$ dapr run --app-id testpubsub --dapr-http-port 3500
然后发布消息到 deathStarStatus 主题。
$ dapr publish --publish-app-id testpubapp --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'
$ curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'
当然,我们也可以通过程序化方式发布:
const express = require("express");
const path = require("path");
const request = require("request");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json());
const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = "pubsub";
app.post("/publish", (req, res) => {
console.log("Publishing: ", req.body);
const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
request({ uri: publishUrl, method: "POST", json: req.body });
res.sendStatus(200);
});
app.listen(process.env.PORT || port, () =>
console.log(`Listening on port ${port}!`)
);