This page looks best with JavaScript enabled

hertz库源码阅读

 ·  ☕ 11 min read · 👀... views

初步阅读项目代码后发现不能用简单的单分支流来标识程序控制流(因为network部分有两套并行实现) 用目录的方式来解释更加直观一些

目录可以很清晰地看出结构设计,pkg是主要部分

app提供了client和server引擎
common提供基础程序服务
network提供网络 包括go net包和netpoll
protocol提供protocolSuite包括http/1.1和h2
route则提供路由

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
pkg
├── app
│   ├── client
│   │   ├── client.go
│   │   ├── client_test.go
│   │   ├── middleware.go
│   │   ├── middleware_test.go
│   │   ├── option.go
│   │   └── option_test.go
│   ├── context.go
│   ├── context_test.go
│   ├── fs.go
│   ├── fs_test.go
│   ├── middlewares
│   │   └── server
│   │       ├── basic_auth
│   │       │   ├── basic_auth.go
│   │       │   ├── basic_auth_test.go
│   │       │   └── doc.go
│   │       └── recovery
│   │           ├── recovery.go
│   │           └── recovery_test.go
│   └── server
│       ├── binding
│       │   ├── binding.go
│       │   ├── binding_test.go
│       │   └── request.go
│       ├── hertz.go
│       ├── hertz_test.go
│       ├── option.go
│       ├── option_test.go
│       └── render
│           ├── data.go
│           ├── doc.go
│           ├── html.go
│           ├── json.go
│           ├── json_test.go
│           ├── protobuf.go
│           ├── render.go
│           ├── render_test.go
│           ├── text.go
│           └── xml.go
├── common
│   ├── adaptor
│   │   ├── request.go
│   │   ├── request_test.go
│   │   └── response.go
│   ├── bytebufferpool
│   │   ├── bytebuffer.go
│   │   ├── bytebuffer_test.go
│   │   ├── doc.go
│   │   ├── pool.go
│   │   └── pool_test.go
│   ├── compress
│   │   ├── compress.go
│   │   ├── compress_test.go
│   │   └── doc.go
│   ├── config
│   │   ├── client_option.go
│   │   ├── option.go
│   │   ├── request_option.go
│   │   └── request_option_test.go
│   ├── errors
│   │   ├── errors.go
│   │   └── errors_test.go
│   ├── hlog
│   │   ├── default.go
│   │   └── log.go
│   ├── stackless
│   │   ├── doc.go
│   │   ├── func.go
│   │   ├── func_test.go
│   │   ├── func_timing_test.go
│   │   ├── writer.go
│   │   └── writer_test.go
│   ├── test
│   │   ├── assert
│   │   │   └── assert.go
│   │   └── mock
│   │       ├── body_data.go
│   │       ├── body_data_test.go
│   │       ├── network.go
│   │       └── reader.go
│   ├── testdata
│   │   ├── conf
│   │   │   └── p_s_m.yaml
│   │   ├── proto
│   │   │   ├── test.pb.go
│   │   │   └── test.proto
│   │   ├── template
│   │   │   ├── htmltemplate.html
│   │   │   └── index.tmpl
│   │   ├── test.png
│   │   └── test.txt
│   ├── timer
│   │   ├── doc.go
│   │   ├── timer.go
│   │   └── timer_test.go
│   ├── tracer
│   │   ├── stats
│   │   │   ├── event.go
│   │   │   ├── event_test.go
│   │   │   └── status.go
│   │   ├── traceinfo
│   │   │   ├── httpstats.go
│   │   │   ├── interface.go
│   │   │   └── traceinfo.go
│   │   └── tracer.go
│   ├── ut
│   │   ├── request.go
│   │   ├── request_test.go
│   │   ├── response.go
│   │   └── response_test.go
│   └── utils
│       ├── bufpool.go
│       ├── chunk.go
│       ├── chunk_test.go
│       ├── ioutil.go
│       ├── ioutil_test.go
│       ├── network.go
│       ├── path.go
│       ├── path_test.go
│       ├── utils.go
│       └── utils_test.go
├── network
│   ├── connection.go
│   ├── dialer
│   │   ├── default.go
│   │   ├── default_windows.go
│   │   └── dialer.go
│   ├── netpoll
│   │   ├── connection.go
│   │   ├── dial.go
│   │   └── transport.go
│   ├── standard
│   │   ├── buffer.go
│   │   ├── connection.go
│   │   ├── connection_test.go
│   │   ├── dial.go
│   │   └── transport.go
│   ├── transport.go
│   ├── utils.go
│   ├── utils_test.go
│   ├── writer.go
│   └── writer_test.go
├── protocol
│   ├── args.go
│   ├── args_test.go
│   ├── client
│   │   └── client.go
│   ├── consts
│   │   ├── default.go
│   │   ├── fs.go
│   │   ├── headers.go
│   │   ├── http2.go
│   │   ├── methods.go
│   │   └── status.go
│   ├── cookie.go
│   ├── cookie_test.go
│   ├── doc.go
│   ├── header.go
│   ├── header_test.go
│   ├── header_timing_test.go
│   ├── http1
│   │   ├── client.go
│   │   ├── client_test.go
│   │   ├── ext
│   │   │   ├── common.go
│   │   │   ├── common_test.go
│   │   │   ├── error.go
│   │   │   ├── headerscanner.go
│   │   │   ├── headerscanner_test.go
│   │   │   └── stream.go
│   │   ├── factory
│   │   │   ├── client.go
│   │   │   └── server.go
│   │   ├── proxy
│   │   │   └── proxy.go
│   │   ├── req
│   │   │   ├── header.go
│   │   │   ├── header_test.go
│   │   │   ├── request.go
│   │   │   └── request_test.go
│   │   ├── resp
│   │   │   ├── header.go
│   │   │   ├── header_test.go
│   │   │   ├── response.go
│   │   │   └── response_test.go
│   │   └── server.go
│   ├── multipart.go
│   ├── multipart_test.go
│   ├── request.go
│   ├── request_test.go
│   ├── response.go
│   ├── response_test.go
│   ├── server.go
│   ├── suite
│   │   ├── client.go
│   │   └── server.go
│   ├── uri.go
│   ├── uri_test.go
│   ├── uri_timing_test.go
│   ├── uri_unix.go
│   └── uri_windows.go
└── route
    ├── consts
    │   └── const.go
    ├── default.go
    ├── default_windows.go
    ├── engine.go
    ├── engine_test.go
    ├── param
    │   └── param.go
    ├── routergroup.go
    ├── routergroup_test.go
    ├── routes_test.go
    ├── routes_timing_test.go
    ├── tree.go
    └── tree_test.go

47 directories, 171 files

网络库选择问题:
New调用的pkg/route/engine.go中NewEngine指定transport为defaultTransporter(opt)
defaultTransporter是常量standard.NewTransporter 固默认未使用netpoll
当非windows平台编译时会修改参数为defaultTransporter = netpoll.NewTransporter(见default.go) 此时使用netpoll
当然可以通过配置项配置

1
2
server.New(server.WithTransport(standard.NewTransporter))
server.New(server.WithTransport(netpoll.NewTransporter))

server/hertz.go一共三个接口

  • New根据Option创建一个server
  • Default在New的基础上添加了Recovery
  • Spin在example中可以看到使用,运行server并根据信号选择Close或者优雅Shutdown
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// New creates a hertz instance without any default config.
func New(opts ...config.Option) *Hertz {
	options := config.NewOptions(opts)
	h := &Hertz{ Engine: route.NewEngine(options), }
	return h
}

// Default creates a hertz instance with default middlewares.
func Default(opts ...config.Option) *Hertz {
	h := New(opts...)
	h.Use(recovery.Recovery())

	return h
}

// Spin runs the server until catching os.Signal.
// SIGTERM triggers immediately close.
// SIGHUP|SIGINT triggers graceful shutdown.
func (h *Hertz) Spin() {
	errCh := make(chan error)
	go func() {
		errCh <- h.Run()
	}()

	if err := waitSignal(errCh); err != nil {
		hlog.Errorf("HERTZ: Receive close signal: error=%v", err)
		if err := h.Engine.Close(); err != nil {
			hlog.Errorf("HERTZ: Close error=%v", err)
		}
		return
	}

	hlog.Infof("HERTZ: Begin graceful shutdown, wait at most num=%d seconds...", h.GetOptions().ExitWaitTimeout/time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), h.GetOptions().ExitWaitTimeout)
	defer cancel()

	if err := h.Shutdown(ctx); err != nil {
		hlog.Errorf("HERTZ: Shutdown error=%v", err)
	}
}

从New开始,设置options,然后初始化Engine

1
2
3
4
5
6
7
8
// New creates a hertz instance without any default config.
func New(opts ...config.Option) *Hertz {
	options := config.NewOptions(opts)
	h := &Hertz{
		Engine: route.NewEngine(options),
	}
	return h
}

NewEngine进行参数初始化 包括RouterGroup transport RequestContext_pool protocolSuite等

其中 defaultTransporter 是常量 standard.NewTransporter
protocolSuite是支持的协议,可以通过engine.AddProtocol添加,后续会将支持的协议serve接口放到protocolServers中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewEngine(opt *config.Options) *Engine {
	engine := &Engine{
		trees: make(MethodTrees, 0, 9),
		RouterGroup: RouterGroup{
			Handlers: nil,
			basePath: "/",
			root:     true,
		},
		transport:       defaultTransporter(opt),
		tracerCtl:       &internalStats.Controller{},
		protocolServers: make(map[string]protocol.Server),
		enableTrace:     true,
		options:         opt,
	}
	engine.RouterGroup.engine = engine

	traceLevel := initTrace(engine)

	// prepare RequestContext pool
	engine.ctxPool.New = func() interface{} {
		ctx := engine.allocateContext()
		if engine.enableTrace {
			ti := traceinfo.NewTraceInfo()
			ti.Stats().SetLevel(traceLevel)
			ctx.SetTraceInfo(ti)
		}
		return ctx
	}

	// Init protocolSuite
	engine.protocolSuite = suite.New()

	return engine
}

New创建好引擎后可以使用Get Post等添加路由及对应handler,其中包括路由和路由组的设定及解析

再然后是Spin运行server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Spin runs the server until catching os.Signal.
// SIGTERM triggers immediately close.
// SIGHUP|SIGINT triggers graceful shutdown.
func (h *Hertz) Spin() {
	errCh := make(chan error)
	go func() {
		errCh <- h.Run()
	}()

	if err := waitSignal(errCh); err != nil {
		hlog.Errorf("HERTZ: Receive close signal: error=%v", err)
		if err := h.Engine.Close(); err != nil {
			hlog.Errorf("HERTZ: Close error=%v", err)
		}
		return
	}

	hlog.Infof("HERTZ: Begin graceful shutdown, wait at most num=%d seconds...", h.GetOptions().ExitWaitTimeout/time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), h.GetOptions().ExitWaitTimeout)
	defer cancel()

	if err := h.Shutdown(ctx); err != nil {
		hlog.Errorf("HERTZ: Shutdown error=%v", err)
	}
}

Run就是engine.Run
先是engine.Init()初始化引擎
然后是engine.transport.ListenAndServe(engine.onData)
transport是之前设定的defaultTransporter,transport实现了Transporter接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// pkg/network/transport.go
type Transporter interface {
	// Close the transporter immediately
	Close() error

	// Graceful shutdown the transporter
	Shutdown(ctx context.Context) error

	// Start listen and ready to accept connection
	ListenAndServe(onData OnData) error
}
// pkg/network/standard/transport.go
func (t *transport) ListenAndServe(onData network.OnData) (err error) {
	t.handler = onData
	return t.serve()
}

t.serve非常简单 就是持续监听 创建连接并使用handler处理

此处transport.serve等同于 quic-go/http3 Server.serveListener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (t *transport) serve() (err error) {
	network.UnlinkUdsFile(t.network, t.addr) //nolint:errcheck

	if t.listenConfig != nil {
		t.ln, err = t.listenConfig.Listen(context.Background(), t.network, t.addr)
	} else {
		t.ln, err = net.Listen(t.network, t.addr)
	}
	if err != nil {
		return err
	}
	for {
		conn, err := t.ln.Accept()
		var c network.Conn
		if err != nil {
			hlog.Errorf("HERTZ: Error=%s", err.Error())
			return err
		}
		if t.tls != nil {
			c = newTLSConn(tls.Server(conn, t.tls), t.readBufferSize)
		} else {
			c = newConn(conn, t.readBufferSize)
		}
		go t.handler(context.Background(), c)
	}
}

engine.onData是回调函数 调用engine.Serve(c, conn)处理,其根据options中的配置进行处理并用在engine.Init时候在engine.protocolServers保存的serverMap找出对应的处理函数进行处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (engine *Engine) Serve(c context.Context, conn network.Conn) (err error) {
	defer func() {
		errProcess(conn, err)
	}()

	// H2C path
	if engine.options.H2C {
		// protocol sniffer
		buf, _ := conn.Peek(len(bytestr.StrClientPreface))
		if bytes.Equal(buf, bytestr.StrClientPreface) && engine.protocolServers[suite.HTTP2] != nil {
			return engine.protocolServers[suite.HTTP2].Serve(c, conn)
		}
		hlog.Warnf("HERTZ: HTTP2 server is not loaded, request is going to fallback to HTTP1 server")
	}

	// ALPN path
	if engine.options.ALPN && engine.options.TLS != nil {
		proto, err1 := engine.getNextProto(conn)
		if err1 != nil {
			// The client closes the connection when handshake. So just ignore it.
			if err1 == io.EOF {
				return nil
			}
			if re, ok := err1.(tls.RecordHeaderError); ok && re.Conn != nil && utils.TLSRecordHeaderLooksLikeHTTP(re.RecordHeader) {
				io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
				re.Conn.Close()
				return re
			}
			return err1
		}
		if server, ok := engine.protocolServers[proto]; ok {
			return server.Serve(c, conn)
		}
	}

	// HTTP1 path
	err = engine.protocolServers[suite.HTTP1].Serve(c, conn)

	return
}

所以说要在原来基础上添加HTTP3支持,在serverMap中添加处理函数,然后在依据RFC在低版本的header进行特殊处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Core is the core interface that promises to be provided for the protocol layer extensions
type Core interface {
	// IsRunning Check whether engine is running or not
	IsRunning() bool
	// A RequestContext pool ready for protocol server impl
	GetCtxPool() *sync.Pool
	// Business logic entrance
	// After pre-read works, protocol server may call this method
	// to introduce the middlewares and handlers
	ServeHTTP(c context.Context, ctx *app.RequestContext)
	// GetTracer for tracing requirement
	GetTracer() tracer.Controller
}

type ServerFactory interface {
	New(core Core) (server protocol.Server, err error)
}

Core中的函数在route/engine.go中实现了 每个engine Serve函数处理ctx后在其中交由ServeHTTP来处理实现框架的路由和中间件等剥离

1
2
3
4
5
6
7
8
9
type Server struct {
	Option
	Core suite.Core
}
func (s Server) Serve(c context.Context, conn network.Conn) (err error) {
	...
	s.Core.ServeHTTP(c, ctx)
	...
}

protocolServers[proto]返回protocol.Server,engine.Serve根据protocol选择protocol.Server.Serve(c, conn)

1
engine.Run->engine.Init->engine.protocolSuite.LoadAll->c.configMap[protocol].New(core)

Close和优雅Shutdown的区别就是有没有延迟关闭,一个是0一个是h.GetOptions().ExitWaitTimeout

1
2
3
4
5
func (t *transport) Close() error {
	ctx, cancel := context.WithTimeout(context.Background(), 0)
	defer cancel()
	return t.Shutdown(ctx)
}

上面讲完pkg/network/transport.go中接口实现
下面是讲pkg/network/connection.go的定义接口实现

简单说可以分成两个来说Dialer和Conn

Dialer中函数会返回Conn,Conn封装了net.Conn和Reader、Writer两个接口

Dialer在各自的dial.go中实现
Conn和Reader、Writer在各自的connection.go中实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
type Dialer interface {
	// DialConnection is used to dial the peer end.
	DialConnection(network, address string, timeout time.Duration, tlsConfig *tls.Config) (conn Conn, err error)

	// DialTimeout is used to dial the peer end with a timeout.
	//
	// NOTE: Not recommended to use this function. Just for compatibility.
	DialTimeout(network, address string, timeout time.Duration, tlsConfig *tls.Config) (conn net.Conn, err error)

	// AddTLS will transfer a common connection to a tls connection.
	AddTLS(conn Conn, tlsConfig *tls.Config) (Conn, error)
}

// Reader is for buffered Reader
type Reader interface {
	// Peek returns the next n bytes without advancing the reader.
	Peek(n int) ([]byte, error)

	// Skip discards the next n bytes.
	Skip(n int) error

	// Release the memory space occupied by all read slices. This method needs to be executed actively to
	// recycle the memory after confirming that the previously read data is no longer in use.
	// After invoking Release, the slices obtained by the method such as Peek will
	// become an invalid address and cannot be used anymore.
	Release() error

	// Len returns the total length of the readable data in the reader.
	Len() int

	// ReadByte is used to read one byte with advancing the read pointer.
	ReadByte() (byte, error)

	// ReadBinary is used to read next n byte with copy, and the read pointer will be advanced.
	ReadBinary(n int) (p []byte, err error)
}

type Writer interface {
	// Malloc will provide a n bytes buffer to send data.
	Malloc(n int) (buf []byte, err error)

	// WriteBinary will use the user buffer to flush.
	// NOTE: Before flush successfully, the buffer b should be valid.
	WriteBinary(b []byte) (n int, err error)

	// Flush will send data to the peer end.
	Flush() error
}

type ReadWriter interface {
	Reader
	Writer
}

type Conn interface {
	net.Conn
	Reader
	Writer

	// SetReadTimeout should work for every Read process
	SetReadTimeout(t time.Duration) error
}

type ConnTLSer interface {
	Handshake() error
	ConnectionState() tls.ConnectionState
}

type HandleSpecificError interface {
	HandleSpecificError(err error, rip string) (needIgnore bool)
}

其中Dialer是给client用的
pkg/network/dialer 中提供dialer,win和linux实现不同 封装了平台无关的wrapper ,提供了SetDialer接口可以设置dialer,有两个实现可选pkg/network/netpoll/dial.go和pkg/network/standard/dial.go
一个是net库的实现 一个是netpoll的实现 当然netpoll的实现还不成熟AddTLS直接返回errNotSupportTLS
其中standard最后调用pkg/network/standard/connection.go中newConn和newTLSConn
netpoll则完全由netpoll仓库完成

1
2
3
4
// pkg/network/dialer/default_windows.go
defaultDialer = standard.NewDialer()
// pkg/network/dialer/default.go
defaultDialer = netpoll.NewDialer()

不过他们调用dialer.DialConnection最后返回的都是conn即封装了net.Conn的 Conn interface
分别在pkg/network/netpoll/connection.go和pkg/network/standard/connection.go中实现

其中netpoll的Conn和Reader、Writer实现自然也是在其独立库中实现并优化了,
standard 为 ET 模型,netpoll 为 LT 模型,使得两个网络库的适用场景有一些不同。 在 ET 模型下,由框架处理 Read / Write 事件;在 LT 模型下,由网络库处理 Read / Write 事件。 使得在小包场景下,由于更优的调度策略使得 LT 性能更好;在大包场景下,由于读 / 写不受框架层控制,使得大量数据被读入内存而不能及时处理,可能会造成内存压力。

所以到这,如果我们要添加QUIC的实现,在network中添加一个子目录提供QUIC的接口是符合系统架构设计的

最后protocol/HTTP3调用network/QUIC接口实现整个HTTP/3功能的添加

Share on

ruokeqx
WRITTEN BY
ruokeqx