快速使用
2. Tutorial
2.1 编写NIO Server
现在我们开始编写一个
package main
import (
"context"
"time"
"github.com/cloudwego/netpoll"
)
func main() {
network, address := "tcp", "127.0.0.1:8888"
// 创建 listener
listener, err := netpoll.CreateListener(network, address)
if err != nil {
panic("create netpoll listener fail")
}
// handle: 连接读数据和处理逻辑
var onRequest netpoll.OnRequest = handler
// options: EventLoop 初始化自定义配置项
var opts = []netpoll.Option{
netpoll.WithReadTimeout(1 * time.Second),
netpoll.WithIdleTimeout(10 * time.Minute),
netpoll.WithOnPrepare(nil),
}
// 创建 EventLoop
eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
if err != nil {
panic("create netpoll event-loop fail")
}
// 运行 Server
err = eventLoop.Serve(listener)
if err != nil {
panic("netpoll server exit")
}
}
// 读事件处理
func handler(ctx context.Context, connection netpoll.Connection) error {
return connection.Writer().Flush()
}
2.1.1 创建Listener
首先我们先创建一个 netpoll.Listener
,和 net.Listener
创建方式相似,通过 network
和 address
构建。
listener, err := netpoll.CreateListener(network, address)
if err != nil {
panic("create netpoll listener fail")
}
2.1.2 创建EventLoop
EventLoop
是
// handle: 连接读数据和处理逻辑
var handle netpoll.OnRequest
// options: EventLoop 初始化自定义配置项
var opts = []netpoll.Option{
netpoll.WithReadTimeout(1 * time.Second),
netpoll.WithIdleTimeout(10 * time.Minute),
netpoll.WithOnPrepare(nil),
...
}
// 创建 EventLoop
eventLoop, err := netpoll.NewEventLoop(handle, opts...)
if err != nil {
panic("create netpoll event-loop fail")
}
2.1.3 运行Server
EventLoop
通过绑定 Listener
来提供对外服务,范例如下。其中 Serve()
方法只在异常下退出,如
// 运行 Server
err = eventLoop.Serve(listener)
if err != nil {
panic("netpoll server exit")
}
2.1.4 关闭Server
// 关闭 Server
eventLoop.Shutdown()
2.2 使用/ 编写Dialer
Netpoll 同时具备在Dialer
的方式,与 net.Dialer
相似。同样我们先给出完整使用范例。
package main
import (
"time"
"github.com/cloudwego/netpoll"
)
func main() {
network, address := "tcp", "127.0.0.1:8888"
// 直接创建连接
conn, err := netpoll.DialConnection(network, address, 50*time.Millisecond)
if err != nil {
panic("dial netpoll connection fail")
}
// 通过 dialer 创建连接
dialer := netpoll.NewDialer()
conn, err = dialer.DialConnection(network, address, 50*time.Millisecond)
if err != nil {
panic("dialer netpoll connection fail")
}
// conn write & flush message
conn.Writer().WriteBinary([]byte("hello world"))
conn.Writer().Flush()
}
2.2.1 缺省方式创建连接
Netpoll 提供了快速建立连接的
// 创建任意连接
DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)
// 创建 TCP 连接
DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error)
// 创建 Unix 连接
DialUnix(network string, laddr, raddr *UnixAddr) (*UnixConnection, error)
2.2.2 创建Dialer
Netpoll 也支持通过 Dialer
对象创建连接,支持可扩展的自定义配置(目前尚未开放
// 通过 dialer 创建连接
dialer := netpoll.NewDialer()
conn, err = dialer.DialConnection(network, address, 50*time.Millisecond)
if err != nil {
panic("dialer netpoll connection fail")
}
2.3 使用Connection
Connection
专为net.Conn
性能更高,内存和net.Conn
,但不推荐使用
type Connection interface {
net.Conn // API 对齐,不推荐使用
// 推荐使用的 zero-copy API
Reader() Reader
Writer() Writer
... // 更多参见注释部分
2.3.1 使用zero-copy API
推荐使用 Connection
的零拷贝
// 读取 n 字节, 返回底层缓存切片, 同时缓存减少 n 字节
conn.Reader().Next(n)
// 预读取 n 字节, 返回底层缓存切片, 缓存大小不变, 可重复预读
conn.Reader().Peek(n)
// 丢弃缓存最前的 n 字节, 不可找回
conn.Reader().Skip(n)
// 释放已读部分的底层缓存, (在此之前读取的)上层读缓存切片将全部失效
conn.Reader().Release()
// 在连接写缓存区顺序分配 n 字节
conn.Writer().Malloc(n)
// 将已分配的写缓存全部发送到连接对端, (在此之前分配的)上层写缓存切片将全部失效
conn.Writer().Flush()
2.3.2 继承zero-copy 能力
连接提供了一些高级能力,不仅可以在连接上做零拷贝读写,而且还可以将零拷贝读写能力传递下去。
我们开发了一种 LinkBuffer
,不仅支持Slice/Append
接口,LinkBuffer
支持逻辑上的任意切分和拼接,实际仅基于同一个底层缓存,切分和拼接的过程是Slice/Append
接口,可以读写整个的 LinkBuffer
分片。
使得上层逻辑可以基于 LinkBuffer
分片继续
// 读取 n 字节 LinkBuffer 切片
Slice(n int) (r Reader, err error)
// 拼接(写) LinkBuffer 切片
Append(w Writer) (n int, err error)
// 持续继承 zero-copy 读写
buf1, _ := conn.Reader().Slice(n1)
buf2, _ := buf1.Slice(n2)
buf1.Append(buf2)
conn.Writer().Append(buf1)
3. How To
3.1 如何配置poller 个数
NumLoops
是 Netpoll 底层的runtime.GOMAXPROCS(0)
动态调整了
// 设置合适的 poller 数量
netpoll.SetNumLoops(num_you_want)
3.2 如何配置poller 连接负载均衡策略
一般情况下,Netpoll 底层存在多个
- Random
- 新建立的连接将被简单随机地,分配给任意一个
poller
- 新建立的连接将被简单随机地,分配给任意一个
- RoundRobin
- 新建立的连接将被循环式的,依次分配给有序排列的
poller Netpoll 默认使用RoundRobin 策略,用户可以通过以下方式自定义改变该策略
- 新建立的连接将被循环式的,依次分配给有序排列的
// 负载均衡策略设置
netpoll.SetLoadBalance(netpoll.Random)
// or
netpoll.SetLoadBalance(netpoll.RoundRobin)
3.3 如何配置gopool 协程池
Netpoll 默认开启 gopool 协程池,
因为基于
3.4 如何初始化新连接
- 在
Server 端,定义了OnPrepare
用于自定义初始化连接,同时支持初始化一个context
,提供给后续的读事件处理时重复使用。OnPrepare
需要在创建EventLoop
时,通过option
WithOnPrepare
注入。Server 端在Accept
新连接时,会自动调度执行注册的OnPrepare
方法,完成连接初始化工作,代码范例如下。
package main
// EventLoop 注册连接初始化逻辑 范例
func InitEventLoop() {
// handle: 连接读数据和处理逻辑
var onRequest netpoll.OnRequest = handler
// prepare: 连接初始化, 返回读事件处理时使用的 context
var onPrepare netpoll.OnPrepare = prepare
// 创建 EventLoop 时, 注册 OnPrepare
eventLoop, err := netpoll.NewEventLoop(onRequest, netpoll.WithOnPrepare(onPrepare))
if err != nil {
panic("create netpoll event-loop fail")
}
}
// 连接初始化
func prepare(connection netpoll.Connection) context.Context {
return context.Background()
}
// 读事件处理
func handler(ctx context.Context, connection netpoll.Connection) error {
return connection.Writer().Flush()
}
- 在
Client 端,连接初始化需要自行额外完成。一般认为,当通过Dialer
创建新的连接后,不存在需要连接来感知的初始化工作, 因此这部分( 初始化) 工作由上层逻辑完成,最后在需要时注册读事件回调即可(参见How To - 3.6 如何配置连接读事件回调)
3.5 如何配置连接超时
目前支持两种超时配置
- 连接异步读超时
read timeout
- 为了保持和
net.Conn
相同的操作风格,Connection
在读数据是也是阻塞读取的,允许使用conn.Reader().Next(n)
的方式阻塞读取足额的n 字节。 而由于 Netpoll 是异步回调模型,连接读等待唤醒取决于对端是否返回了数据,并且读事件被调度。 因此这里支持读阻塞到指定超时时间后主动返回。 read timeout
没有默认值( 无限等待) ,可以通过Connection
API 或者EventLoop
option
配置
- 为了保持和
// option 方式
netpoll.WithReadTimeout(1 * time.Second)
// api 方式
connection.SetReadTimeout(1 * time.Second)
- 连接空闲超时
idle timeout
// option 方式
netpoll.WithIdleTimeout(1 * time.Second)
// api 方式
connection.SetIdleTimeout(1 * time.Second)
3.6 如何配置连接读事件回调
读事件回调 OnRequest
是指,连接在底层读事件到来时,由 Netpoll 底层调度触发的回调。
该回调是以EventLoop
时强制需要 OnRequest
,并在每个连接数据到来时触发,用于执行
// handle: 连接读数据和处理逻辑
var onRequest netpoll.OnRequest = handler
// Server 端
eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
// Client 端
connection.SetOnRequest(onRequest)
3.7 如何配置连接关闭回调
连接关闭回调 CloseCallback
是指,连接在被关闭时,由 Netpoll 底层调度触发的回调。
该回调用以在连接被关闭后,执行额外的处理逻辑。Netpoll 能够感知连接状态,当连接对端关闭、清理死连
等情况下,底层会主动触发连接关闭,此时 CloseCallback
起到通知的作用。触发主动的处理连接关闭,而不是在下一次读写连接时报错(net.Conn
的做法Connection
提供了CloseCallback
,已被添加的回调不可以移除,支持添加多个回调。
// 添加 CloseCallback 范例
func addCloseCallback() {
// 回调方法定义
var cb netpoll.CloseCallback = callback
// 添加回调
conn.AddCloseCallback(cb)
}
func callback(connection netpoll.Connection) error {
return nil
}
3.8 如何使用LinkBuffer
Netpoll 提供的 LinkBuffer
支持并发的单个读和单个写操作,有较小的
4. Attention
4.1 错误设置NumLoops
NumLoops
是 Netpoll 底层的runtime.GOMAXPROCS(0)
动态调整了runtime.GOMAXPROCS(0)
拿到的是物理机核心数,可能会导致性能下降。解决办法有以下几种:
- 使用
taskset
命令来限制CPU 的使用
taskset 0-3 ./output/bootstrap.sh
- 主动设置
P 的数量
func init() {
runtime.GOMAXPROCS(num_you_want)
}
- 主动设置
poller 数量
func init() {
netpoll.SetNumLoops(num_you_want)
}
{“mode”:“full”,“isActive”:false}