消息发布与订阅

消息发布与订阅

Pub/Sub是分布式系统中的一种常见模式,该系统中有许多服务希望利用解耦、异步消息传递。使用Pub/Sub,您可以实现事件消费者与事件生产者解耦的场景。

Dapr提供了一个可扩展的Pub/Sub系统,具有At-Least-Once保证,允许开发人员发布和订阅主题。Daprpub/sub提供了组件,使运营商能够使用他们首选的基础设施,例如Redis StreamsKafka等。

Pub/Sub 组件

组件配置

首先需要设置Pub/Sub组件,当运行dapr init时,Redis Streams默认安装在本地机器上。在Windows上打开%UserProfile%/.dapr/components/pubsub.yaml或在Linux/MacOS上打开~/.dapr/components/pubsub.yaml下的组件文件来验证。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: localhost:6379
    - name: redisPassword
      value: ""

消息订阅

然后我们可以以不同的方式订阅消息,声明式和编程式方法都支持相同的功能。声明式方法从你的代码中删除了对Dapr的依赖性,例如,允许现有的应用程序订阅主题,而无需更改代码。程序化方法则是在您的代码中实现订阅。

声明式订阅

您可以使用以下自定义资源定义(CRD)订阅一个主题。创建一个名为subscription.yaml的文件,并粘贴以下内容。

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: pubsub
scopes:
  - app1
  - app2
  • route字段告诉Dapr将所有主题消息发送到应用程序中的/dsstatus端点。
  • scopes字段为IDapp1app2的应用程序启用此订阅。

CRD放在你的./components目录下。当Dapr启动时,它会和组件一起加载订阅。注意:默认情况下,DaprMacOS/Linux下从$HOME/.dapr/components加载组件,在Windows下从%USERPROFILE/%.dapr/components加载组件。

$ dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py

这里我们使用Flask构建基础的应用:

import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

app.run()

类似的Node.js的应用声明如下:

const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

const port = 3000;

app.post("/dsstatus", (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`));

编程式订阅

要订阅主题,请用你选择的编程语言启动一个网络服务器,并监听以下GET端点/dapr/subscribeDapr实例在启动时调用到你的应用程序中,并期待一个JSON响应的主题订阅:

  • pubsubname: Which pub/sub component Dapr should use.
  • topic: Which topic to subscribe to.
  • route: Which endpoint for Dapr to call on when a message comes to that topic.

使用Node实现的代码如下:

const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

const port = 3000;

app.get("/dapr/subscribe", (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "deathStarStatus",
      route: "dsstatus",
    },
  ]);
});

app.post("/dsstatus", (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`));

为了告诉Dapr消息已处理成功,请返回200 OK响应。如果Dapr收到任何其他返回状态码而不是200,或者如果您的应用程序崩溃,Dapr将尝试按照At-Least-Once语义重新交付消息。

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

发布主题

要发布一个主题,你需要运行一个Dapr Sidecar的实例来使用pubsub Redis组件。你可以使用安装在本地环境中的默认Redis组件。

$ dapr run --app-id testpubsub --dapr-http-port 3500

然后发布消息到deathStarStatus主题。

$ dapr publish --publish-app-id testpubapp --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'

$ curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'

当然,我们也可以通过程序化方式发布:

const express = require("express");
const path = require("path");
const request = require("request");
const bodyParser = require("body-parser");

const app = express();
app.use(bodyParser.json());

const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = "pubsub";

app.post("/publish", (req, res) => {
  console.log("Publishing: ", req.body);
  const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
  request({ uri: publishUrl, method: "POST", json: req.body });
  res.sendStatus(200);
});

app.listen(process.env.PORT || port, () =>
  console.log(`Listening on port ${port}!`)
);