消息发布与订阅
消息发布与订阅

组件配置
首先需要设置
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
消息订阅
然后我们可以以不同的方式订阅消息,声明式和编程式方法都支持相同的功能。声明式方法从你的代码中删除了对
声明式订阅
您可以使用以下自定义资源定义
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
route 字段告诉Dapr 将所有主题消息发送到应用程序中的/dsstatus 端点。scopes 字段为ID 为app1 和app2 的应用程序启用此订阅。
将
$ dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py
这里我们使用
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
类似的
const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));
const port = 3000;
app.post("/dsstatus", (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`));
编程式订阅
要订阅主题,请用你选择的编程语言启动一个网络服务器,并监听以下
- pubsubname: Which pub/sub component Dapr should use.
- topic: Which topic to subscribe to.
- route: Which endpoint for Dapr to call on when a message comes to that topic.
使用
const express = require("express");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));
const port = 3000;
app.get("/dapr/subscribe", (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "deathStarStatus",
route: "dsstatus",
},
]);
});
app.post("/dsstatus", (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`));
为了告诉
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
发布主题
要发布一个主题,你需要运行一个
$ dapr run --app-id testpubsub --dapr-http-port 3500
然后发布消息到
$ dapr publish --publish-app-id testpubapp --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'
$ curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'
当然,我们也可以通过程序化方式发布:
const express = require("express");
const path = require("path");
const request = require("request");
const bodyParser = require("body-parser");
const app = express();
app.use(bodyParser.json());
const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = "pubsub";
app.post("/publish", (req, res) => {
console.log("Publishing: ", req.body);
const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
request({ uri: publishUrl, method: "POST", json: req.body });
res.sendStatus(200);
});
app.listen(process.env.PORT || port, () =>
console.log(`Listening on port ${port}!`)
);