https://github.com/lucas-clemente/quic-go
quic-go——A QUIC implementation in pure go
quic-go实现了QUIC协议,并实现了HTTP3,本文对其源码进行简单阅读
quic-go/http3
http3子包一共5000行代码,3000行是test,具体实现只有2000行(当然该库还有一些没有实现的地方),阅读起来较为简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
➜ http3 git:(master) cloc *test.go
11 text files.
11 unique files.
0 files ignored.
github.com/AlDanial/cloc v 1.90 T=0.02 s (732.9 files/s, 235656.0 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Go 11 390 25 3122
-------------------------------------------------------------------------------
SUM: 11 390 25 3122
-------------------------------------------------------------------------------
➜ http3 git:(master) cloc .
22 text files.
22 unique files.
0 files ignored.
github.com/AlDanial/cloc v 1.90 T=0.02 s (982.9 files/s, 264218.7 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Go 22 661 329 4924
-------------------------------------------------------------------------------
SUM: 22 661 329 4924
-------------------------------------------------------------------------------
|
一共2000行代码而server.go近600行,其也是向外提供接口的地方
http3子包server.go expose两个函数,ListenAndServeQUIC和ListenAndServe,其中ListenAndServe同时提供HTTP_TLS和HTTP3服务,而ListenAndServeQUIC顾名思义提供QUIC服务
ListenAndServe
1
2
3
4
5
6
7
8
9
|
->LoadX509KeyPair
->config := &tls.Config{Certificates: certs,}
->udpConn := net.ListenUDP
->quicServer := &Server{TLSConfig: config,}
->quicServer.Serve(udpConn)
->tcpConn := net.ListenTCP()
->tlsConn := tls.NewListener()
->httpServer := &http.Server{}
->httpServer.Serve(tlsConn)
|
// ListenAndServeQUIC listens on the UDP network address addr and calls the
// handler for HTTP/3 requests on incoming connections. http.DefaultServeMux is
// used when handler is nil.
ListenAndServeQUIC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
// ListenAndServeTLS listens on the UDP address s.Addr and calls s.Handler to handle HTTP/3 requests on incoming connections.
->ListenAndServeTLS
->LoadX509KeyPair
->config := &tls.Config{Certificates: certs,}
->s.serveConn(config, nil)
->quicConf = &quic.Config{}
// An EarlyListener listens for incoming QUIC connections,and returns them before the handshake completes.
->var ln quic.EarlyListener
->ln, err = quicListenAddr(addr, baseConf, quicConf)
// ListenAddrEarly works like ListenAddr, but it returns connections before the handshake completes.
->quic.ListenAddrEarly(addr, baseConf, quicConf)
->s.addListener(&ln)
->s.listeners[l] = listenerInfo{port}
->s.serveListener(ln)
->for {
conn, err := ln.Accept(context.Background())
->s.baseServer.accept(ctx)
->conn := <-s.connQueue
go s.handleConn(conn)
// 开启单向流Control Streams传输SETTINGS frame
->str, err := conn.OpenUniStream()
// http3/frames.go 中多个Write函数封装了对应frame的QUICvarint写入 提供透明服务
->(&settingsFrame{...}).Write(buf)
->str.Write(buf.Bytes())
// 根据RFC文档 SETTINGS frame是贯穿连接始终的 开启gorutine监听处理
->go s.handleUnidirectionalStreams(conn)
// 再然后监听处理请求数据
->for {
str, err := conn.AcceptStream(context.Background())
go func() {
s.handleRequest(conn, str, decoder, func(errfun) {})
->frame, err := parseNextFrame(str, ufh)
// 根据RFC定义的首字节类型标识判断Frame类型
->switch quicvarint.Read(qr){case FrameType: return &Frame{}}
->hf, ok := frame.(*headersFrame)
->headerBlock := make([]byte, hf.Length)
// RFC9204 QPACK Decoder
->hfs, err := decoder.DecodeFull(headerBlock)
// http3/request.go 解析请求伪标头字段
->req, err := requestFromHeaders(hfs)
->req.RemoteAddr = conn.RemoteAddr().String()
->body := newRequestBody(newStream(str, onFrameError))
->req.Body = body
->r := newResponseWriter(str, conn, s.logger)
// Handler可由外部框架实现并指定 如gin可使用c.Next()执行handler链
->handler := s.Handler
->func(){
// 逐一执行handler函数
// ServeHTTP(http.ResponseWriter, *http.Request)
handler.ServeHTTP(r, req)
// 这就回到了开头ListenAndServeQUIC和ListenAndServeTLS的注释
}()
->r.WriteHeader(200)
str.close()
}()
}
}
->s.removeListener(&ln)
->delete(s.listeners, l)
|
net
QUIC实现前先把go/src/net的逻辑,尤其是UDP的逻辑再梳理一遍
如net/net.go开头注释所说,Packet net提供了包括TCP/IP,UDP,DNS,Unix domain sockets在内的网络I/O接口
如最常见的函数接口都在此处定义
Dial function连接到server
Listen function创建一个server
1
2
3
4
5
6
7
|
// ln, err := net.Listen("tcp", ":8080")
// conn, err := ln.Accept()
type Listener interface {
Accept() (Conn, error)
Close() error
Addr() Addr
}
|
conn结构体为一个 *netFD的网络文件描述符号,Conn接口方法都会作用在conn对象上。
Conn接口的对应实现在net/udpsock.go和net/tcpsock.go中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// conn, err := net.Dial("tcp", "golang.org:80")
type Conn interface {
// net.go
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
type conn struct {
fd *netFD
}
// conn, err := net.Dial("udp", "127.0.0.1:8002")
type PacketConn interface {
// udpsock.go
ReadFrom(p []byte) (n int, addr Addr, err error)
WriteTo(p []byte, addr Addr) (n int, err error)
// net.go
Close() error
LocalAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// net/udpsock.go
type UDPConn struct {
conn
}
// ReadFrom implements the PacketConn ReadFrom method.
func (c *UDPConn) ReadFrom(b []byte) (int, Addr, error) {}
// ListenUDP acts like ListenPacket for UDP networks.
func ListenUDP(network string, laddr *UDPAddr) (*UDPConn, error) {}
// net/tcpsock.go
type TCPConn struct {
conn
}
func (c *TCPConn) ReadFrom(r io.Reader) (int64, error) {}
// ListenTCP acts like Listen for TCP networks.
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {}
|
另外他们的读写实现通过以下路径进行系统调用
net/udp[tcp]sock_posix.go
net/fd_posix.go(netFD)
internal/poll/fd_unix.go(poll.FD)
internal/syscall/net.go
quic
上文控制流中写到ListenAndServeTLS->serveConn会创建quic.EarlyListener添加到server的listener列表中并serve,
quic.EarlyListener Accept会返回 quic.EarlyConnection
其跟net标准库中的设计如出一辙
1
2
3
4
5
|
type EarlyListener interface {
Close() error
Addr() net.Addr
Accept(context.Context) (EarlyConnection, error)
}
|
quic.EarlyConnection是quic.Connection的wrapper,其设计也与net标准库有相似性
Accept单向流和双向流
同步异步创建单向双向流
通过UDP发送和接收message
而SetReadDeadline、SetWriteDeadline、SetDeadline则移动到对应的
ReceiveStream单向流、SendStream单向流、Stream双向流接口中了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type Connection interface {
AcceptStream(context.Context) (Stream, error)
AcceptUniStream(context.Context) (ReceiveStream, error)
OpenStream() (Stream, error)
OpenStreamSync(context.Context) (Stream, error)
OpenUniStream() (SendStream, error)
OpenUniStreamSync(context.Context) (SendStream, error)
LocalAddr() net.Addr
RemoteAddr() net.Addr
CloseWithError(ApplicationErrorCode, string) error
Context() context.Context
ConnectionState() ConnectionState
// RFC9221 SendMessage sends a message as a datagram
SendMessage([]byte) error
// RFC9221 ReceiveMessage gets a message received in a datagram
ReceiveMessage() ([]byte, error)
}
type ReceiveStream interface {
StreamID() StreamID
io.Reader
CancelRead(StreamErrorCode)
SetReadDeadline(t time.Time) error
}
type SendStream interface {
StreamID() StreamID
io.Writer
io.Closer
CancelWrite(StreamErrorCode)
Context() context.Context
SetWriteDeadline(t time.Time) error
}
type Stream interface {
ReceiveStream
SendStream
SetDeadline(t time.Time) error
}
|
以上提及的interface全都在interface.go文件中
server.go提供了Listen{Addr}{Early}等一系列函数(http3库中ListenAndServeTLS调用的是ListenAddrEarly)通过内部listen函数 返回基于baseServer的Listener和EarlyListener
同时协程listen->go s.run()接收receivedPackets信号通过handlePacketImpl根据其中数据解析并进行一些情况的处理然如版本协商后进入handleInitialImpl
handleInitialImpl根据情况发送重试包 生成connID并添加handler到packetHandlerMap中(其由multiplexer初始化时指定newPacketHandlerMap作为创建函数),其中使用rawConn等信息创建quicConn并用handlePacket将p传到s.receivedPackets,添加后将新的conn也启动go conn.run()并go s.handleNewConn(conn)
handleNewConn根据需要的conn是否为EarlyConn根据当前状态返回conn 并将conn加到s.connQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (s *baseServer) handleNewConn(conn quicConn) {
connCtx := conn.Context()
if s.acceptEarlyConns {
// wait until the early connection is ready (or the handshake fails)
select {
case <-conn.earlyConnReady():
case <-connCtx.Done():
return
}
} else {
// wait until the handshake is complete (or fails)
select {
case <-conn.HandshakeComplete().Done():
case <-connCtx.Done():
return
}
}
atomic.AddInt32(&s.connQueueLen, 1)
select {
case s.connQueue <- conn:
// blocks until the connection is accepted
case <-connCtx.Done():
atomic.AddInt32(&s.connQueueLen, -1)
// don't pass connections that were already closed to Accept()
}
}
|
与此同时accept通过connQueue接收conn并处理conn := <-s.connQueue
两边select语句都没有default选项,由于quicConn在server启动时容量为1quicConn : make(chan quicConn)
,所以双方都会阻塞
这也是handleNewConn()中注释blocks until the connection is accepted的原因
packetHandlerMap
1
2
3
4
5
6
7
8
|
// server.go
type packetHandlerManager interface {
AddWithConnID(protocol.ConnectionID, protocol.ConnectionID, func() packetHandler) bool
Destroy() error
connRunner
SetServer(unknownPacketHandler)
CloseServer()
}
|
packetHandlerMap一经创建就通过listen不断ReadPacket并handlePacket
handlePacket先是从packetHandlerMap的handlers中找到entry的packetHandler处理 entry的packetHandler由baseServer.handleInitialImpl在AddWithConnID是指定
再是由server的handlePacket处理 即加到s.receivedPackets <- p
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// packet_handler_map.go
func (h *packetHandlerMap) listen() {
defer close(h.listening)
for {
p, err := h.conn.ReadPacket()
h.handlePacket(p)
}
}
func (h *packetHandlerMap) handlePacket(p *receivedPacket) {
connID, err := wire.ParseConnectionID(p.data, h.connIDLen)
...
if entry, ok := h.handlers[string(connID)]; ok {
if entry.is0RTTQueue { // only enqueue 0-RTT packets in the 0-RTT queue
if wire.Is0RTTPacket(p.data) {
entry.packetHandler.handlePacket(p)
return
}
} else { // existing connection
entry.packetHandler.handlePacket(p)
return
}
}
...
if wire.Is0RTTPacket(p.data) {
...
}
h.server.handlePacket(p)
}
|
上述用到的ReadPacket以及ReadPacket 由sys_conn_oob.go实现
1
2
3
4
5
6
7
8
9
10
|
// packet_handler_map.go
// rawConn is a connection that allow reading of a receivedPacket.
type rawConn interface {
ReadPacket() (*receivedPacket, error)
WritePacket(b []byte, addr net.Addr, oob []byte) (int, error)
LocalAddr() net.Addr
io.Closer
}
// sys_conn_oob.go
func (c *oobConn) ReadPacket() (*receivedPacket, error) {}
|
send_conn.go中的Write也是调用sys_conn_oob.go中WritePacket
h3通过EarlyConnection.AcceptStream来获取请求流并处理请求
server.go中baseServer中维护一个newConn的函数指针,在listen的时候初始化为newConnection
1
2
3
4
5
6
|
// A Listener of QUIC
type baseServer struct {
...
newConn func(...) quicConn
...
}
|
connection.go中connection结构体维持一个streamMap,并创建了自己的初始化函数newConnection
1
2
3
4
5
6
|
// A Connection is a QUIC connection
type connection struct {
...
streamsMap streamManager
...
}
|
[server.go]listen->s := &baseServer{newConn: newConnection}->handleInitialImpl()->s.connHandler.AddWithConnID(…,func() packetHandler {conn = s.newConn()})
[connection.go]newConnection->s.preSetup()->s.streamsMap = newStreamsMap(…)中初始化streamsMap
[streams_map.go]newStreamsMap->initMaps里面创建并维护出入单双向stream的map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type streamsMap struct {
perspective protocol.Perspective
version protocol.VersionNumber
maxIncomingBidiStreams uint64
maxIncomingUniStreams uint64
sender streamSender
newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
mutex sync.Mutex
outgoingBidiStreams *outgoingBidiStreamsMap
outgoingUniStreams *outgoingUniStreamsMap
incomingBidiStreams *incomingBidiStreamsMap
incomingUniStreams *incomingUniStreamsMap
reset bool
}
|