This page looks best with JavaScript enabled

quic-go库源码阅读

 ·  ☕ 8 min read · 👀... views

https://github.com/lucas-clemente/quic-go

quic-go——A QUIC implementation in pure go

quic-go实现了QUIC协议,并实现了HTTP3,本文对其源码进行简单阅读

quic-go/http3

http3子包一共5000行代码,3000行是test,具体实现只有2000行(当然该库还有一些没有实现的地方),阅读起来较为简单

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
➜  http3 git:(master) cloc *test.go
      11 text files.
      11 unique files.                              
       0 files ignored.

github.com/AlDanial/cloc v 1.90  T=0.02 s (732.9 files/s, 235656.0 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Go                              11            390             25           3122
-------------------------------------------------------------------------------
SUM:                            11            390             25           3122
-------------------------------------------------------------------------------
➜  http3 git:(master) cloc .       
      22 text files.
      22 unique files.                              
       0 files ignored.

github.com/AlDanial/cloc v 1.90  T=0.02 s (982.9 files/s, 264218.7 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Go                              22            661            329           4924
-------------------------------------------------------------------------------
SUM:                            22            661            329           4924
-------------------------------------------------------------------------------

一共2000行代码而server.go近600行,其也是向外提供接口的地方

http3子包server.go expose两个函数,ListenAndServeQUIC和ListenAndServe,其中ListenAndServe同时提供HTTP_TLS和HTTP3服务,而ListenAndServeQUIC顾名思义提供QUIC服务

ListenAndServe

1
2
3
4
5
6
7
8
9
->LoadX509KeyPair
  ->config := &tls.Config{Certificates: certs,}
  ->udpConn := net.ListenUDP
    ->quicServer := &Server{TLSConfig: config,}
    ->quicServer.Serve(udpConn)
  ->tcpConn := net.ListenTCP()
    ->tlsConn := tls.NewListener()
    ->httpServer := &http.Server{}
    ->httpServer.Serve(tlsConn)

// ListenAndServeQUIC listens on the UDP network address addr and calls the
// handler for HTTP/3 requests on incoming connections. http.DefaultServeMux is
// used when handler is nil.
ListenAndServeQUIC

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// ListenAndServeTLS listens on the UDP address s.Addr and calls s.Handler to handle HTTP/3 requests on incoming connections.
->ListenAndServeTLS
  ->LoadX509KeyPair
  ->config := &tls.Config{Certificates: certs,}
  ->s.serveConn(config, nil)
    ->quicConf = &quic.Config{}
    // An EarlyListener listens for incoming QUIC connections,and returns them before the handshake completes.
    ->var ln quic.EarlyListener
    ->ln, err = quicListenAddr(addr, baseConf, quicConf)
      // ListenAddrEarly works like ListenAddr, but it returns connections before the handshake completes.
      ->quic.ListenAddrEarly(addr, baseConf, quicConf)
    ->s.addListener(&ln)
      ->s.listeners[l] = listenerInfo{port}
    ->s.serveListener(ln)
      ->for {
          conn, err := ln.Accept(context.Background())
            ->s.baseServer.accept(ctx)
            ->conn := <-s.connQueue
          go s.handleConn(conn)
            // 开启单向流Control Streams传输SETTINGS frame
            ->str, err := conn.OpenUniStream()
            // http3/frames.go 中多个Write函数封装了对应frame的QUICvarint写入 提供透明服务
            ->(&settingsFrame{...}).Write(buf)
            ->str.Write(buf.Bytes())
            // 根据RFC文档 SETTINGS frame是贯穿连接始终的 开启gorutine监听处理
            ->go s.handleUnidirectionalStreams(conn)
            // 再然后监听处理请求数据
            ->for {
                str, err := conn.AcceptStream(context.Background())
                go func() {
                    s.handleRequest(conn, str, decoder, func(errfun) {})
                      ->frame, err := parseNextFrame(str, ufh)
                        // 根据RFC定义的首字节类型标识判断Frame类型
                        ->switch quicvarint.Read(qr){case FrameType: return &Frame{}}
                      ->hf, ok := frame.(*headersFrame)
                      ->headerBlock := make([]byte, hf.Length)
                      // RFC9204 QPACK Decoder
                      ->hfs, err := decoder.DecodeFull(headerBlock)
                      // http3/request.go 解析请求伪标头字段 
                      ->req, err := requestFromHeaders(hfs)
                      ->req.RemoteAddr = conn.RemoteAddr().String()
	                    ->body := newRequestBody(newStream(str, onFrameError))
	                    ->req.Body = body
	                    ->r := newResponseWriter(str, conn, s.logger)
                      // Handler可由外部框架实现并指定 如gin可使用c.Next()执行handler链
	                    ->handler := s.Handler
                      ->func(){
                          // 逐一执行handler函数
                          // ServeHTTP(http.ResponseWriter, *http.Request)
                          handler.ServeHTTP(r, req)
                          // 这就回到了开头ListenAndServeQUIC和ListenAndServeTLS的注释
                        }()
                      ->r.WriteHeader(200)
                    str.close()
                }()
              }
        }
    ->s.removeListener(&ln)
      ->delete(s.listeners, l)

net

QUIC实现前先把go/src/net的逻辑,尤其是UDP的逻辑再梳理一遍

如net/net.go开头注释所说,Packet net提供了包括TCP/IP,UDP,DNS,Unix domain sockets在内的网络I/O接口

如最常见的函数接口都在此处定义
Dial function连接到server
Listen function创建一个server

1
2
3
4
5
6
7
// ln, err := net.Listen("tcp", ":8080")
// conn, err := ln.Accept()
type Listener interface {
	Accept() (Conn, error)
	Close() error
	Addr() Addr
}

conn结构体为一个 *netFD的网络文件描述符号,Conn接口方法都会作用在conn对象上。
Conn接口的对应实现在net/udpsock.go和net/tcpsock.go中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// conn, err := net.Dial("tcp", "golang.org:80")
type Conn interface {
    // net.go
	Read(b []byte) (n int, err error)
	Write(b []byte) (n int, err error)
	Close() error
	LocalAddr() Addr
	RemoteAddr() Addr
	SetDeadline(t time.Time) error
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
}

type conn struct {
	fd *netFD
}

// conn, err := net.Dial("udp", "127.0.0.1:8002")
type PacketConn interface {
    // udpsock.go
	ReadFrom(p []byte) (n int, addr Addr, err error)
	WriteTo(p []byte, addr Addr) (n int, err error)
    // net.go
	Close() error
	LocalAddr() Addr
	SetDeadline(t time.Time) error
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// net/udpsock.go
type UDPConn struct {
	conn
}
// ReadFrom implements the PacketConn ReadFrom method.
func (c *UDPConn) ReadFrom(b []byte) (int, Addr, error) {}
// ListenUDP acts like ListenPacket for UDP networks.
func ListenUDP(network string, laddr *UDPAddr) (*UDPConn, error) {}
// net/tcpsock.go
type TCPConn struct {
	conn
}
func (c *TCPConn) ReadFrom(r io.Reader) (int64, error) {}
// ListenTCP acts like Listen for TCP networks.
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {}

另外他们的读写实现通过以下路径进行系统调用
net/udp[tcp]sock_posix.go
net/fd_posix.go(netFD)
internal/poll/fd_unix.go(poll.FD)
internal/syscall/net.go

quic

上文控制流中写到ListenAndServeTLS->serveConn会创建quic.EarlyListener添加到server的listener列表中并serve,
quic.EarlyListener Accept会返回 quic.EarlyConnection
其跟net标准库中的设计如出一辙

1
2
3
4
5
type EarlyListener interface {
	Close() error
	Addr() net.Addr
	Accept(context.Context) (EarlyConnection, error)
}

quic.EarlyConnection是quic.Connection的wrapper,其设计也与net标准库有相似性
Accept单向流和双向流
同步异步创建单向双向流
通过UDP发送和接收message

而SetReadDeadline、SetWriteDeadline、SetDeadline则移动到对应的
ReceiveStream单向流、SendStream单向流、Stream双向流接口中了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Connection interface {
	AcceptStream(context.Context) (Stream, error)
	AcceptUniStream(context.Context) (ReceiveStream, error)
	OpenStream() (Stream, error)
	OpenStreamSync(context.Context) (Stream, error)
	OpenUniStream() (SendStream, error)
	OpenUniStreamSync(context.Context) (SendStream, error)
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	CloseWithError(ApplicationErrorCode, string) error
	Context() context.Context
	ConnectionState() ConnectionState
	// RFC9221 SendMessage sends a message as a datagram
	SendMessage([]byte) error
	// RFC9221 ReceiveMessage gets a message received in a datagram
	ReceiveMessage() ([]byte, error)
}
type ReceiveStream interface {
	StreamID() StreamID
	io.Reader
	CancelRead(StreamErrorCode)
	SetReadDeadline(t time.Time) error
}
type SendStream interface {
	StreamID() StreamID
	io.Writer
	io.Closer
	CancelWrite(StreamErrorCode)
	Context() context.Context
	SetWriteDeadline(t time.Time) error
}
type Stream interface {
	ReceiveStream
	SendStream
	SetDeadline(t time.Time) error
}

以上提及的interface全都在interface.go文件中

server.go提供了Listen{Addr}{Early}等一系列函数(http3库中ListenAndServeTLS调用的是ListenAddrEarly)通过内部listen函数 返回基于baseServer的Listener和EarlyListener

同时协程listen->go s.run()接收receivedPackets信号通过handlePacketImpl根据其中数据解析并进行一些情况的处理然如版本协商后进入handleInitialImpl

handleInitialImpl根据情况发送重试包 生成connID并添加handler到packetHandlerMap中(其由multiplexer初始化时指定newPacketHandlerMap作为创建函数),其中使用rawConn等信息创建quicConn并用handlePacket将p传到s.receivedPackets,添加后将新的conn也启动go conn.run()并go s.handleNewConn(conn)

handleNewConn根据需要的conn是否为EarlyConn根据当前状态返回conn 并将conn加到s.connQueue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *baseServer) handleNewConn(conn quicConn) {
	connCtx := conn.Context()
	if s.acceptEarlyConns {
		// wait until the early connection is ready (or the handshake fails)
		select {
		case <-conn.earlyConnReady():
		case <-connCtx.Done():
			return
		}
	} else {
		// wait until the handshake is complete (or fails)
		select {
		case <-conn.HandshakeComplete().Done():
		case <-connCtx.Done():
			return
		}
	}

	atomic.AddInt32(&s.connQueueLen, 1)
	select {
	case s.connQueue <- conn:
		// blocks until the connection is accepted
	case <-connCtx.Done():
		atomic.AddInt32(&s.connQueueLen, -1)
		// don't pass connections that were already closed to Accept()
	}
}

与此同时accept通过connQueue接收conn并处理conn := <-s.connQueue
两边select语句都没有default选项,由于quicConn在server启动时容量为1quicConn : make(chan quicConn),所以双方都会阻塞
这也是handleNewConn()中注释blocks until the connection is accepted的原因

packetHandlerMap

1
2
3
4
5
6
7
8
// server.go
type packetHandlerManager interface {
	AddWithConnID(protocol.ConnectionID, protocol.ConnectionID, func() packetHandler) bool
	Destroy() error
	connRunner
	SetServer(unknownPacketHandler)
	CloseServer()
}

packetHandlerMap一经创建就通过listen不断ReadPacket并handlePacket
handlePacket先是从packetHandlerMap的handlers中找到entry的packetHandler处理 entry的packetHandler由baseServer.handleInitialImpl在AddWithConnID是指定
再是由server的handlePacket处理 即加到s.receivedPackets <- p

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// packet_handler_map.go
func (h *packetHandlerMap) listen() {
	defer close(h.listening)
	for {
		p, err := h.conn.ReadPacket()
		h.handlePacket(p)
	}
}
func (h *packetHandlerMap) handlePacket(p *receivedPacket) {
	connID, err := wire.ParseConnectionID(p.data, h.connIDLen)
	...
	if entry, ok := h.handlers[string(connID)]; ok {
		if entry.is0RTTQueue { // only enqueue 0-RTT packets in the 0-RTT queue
			if wire.Is0RTTPacket(p.data) {
				entry.packetHandler.handlePacket(p)
				return
			}
		} else { // existing connection
			entry.packetHandler.handlePacket(p)
			return
		}
	}
	...
	if wire.Is0RTTPacket(p.data) {
		...
	}
	h.server.handlePacket(p)
}

上述用到的ReadPacket以及ReadPacket 由sys_conn_oob.go实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// packet_handler_map.go
// rawConn is a connection that allow reading of a receivedPacket.
type rawConn interface {
	ReadPacket() (*receivedPacket, error)
	WritePacket(b []byte, addr net.Addr, oob []byte) (int, error)
	LocalAddr() net.Addr
	io.Closer
}
// sys_conn_oob.go
func (c *oobConn) ReadPacket() (*receivedPacket, error) {}

send_conn.go中的Write也是调用sys_conn_oob.go中WritePacket

h3通过EarlyConnection.AcceptStream来获取请求流并处理请求

server.go中baseServer中维护一个newConn的函数指针,在listen的时候初始化为newConnection

1
2
3
4
5
6
// A Listener of QUIC
type baseServer struct {
	...
	newConn func(...) quicConn
	...
}

connection.go中connection结构体维持一个streamMap,并创建了自己的初始化函数newConnection

1
2
3
4
5
6
// A Connection is a QUIC connection
type connection struct {
	...
	streamsMap      streamManager
	...
}

[server.go]listen->s := &baseServer{newConn: newConnection}->handleInitialImpl()->s.connHandler.AddWithConnID(…,func() packetHandler {conn = s.newConn()})
[connection.go]newConnection->s.preSetup()->s.streamsMap = newStreamsMap(…)中初始化streamsMap
[streams_map.go]newStreamsMap->initMaps里面创建并维护出入单双向stream的map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type streamsMap struct {
	perspective protocol.Perspective
	version     protocol.VersionNumber

	maxIncomingBidiStreams uint64
	maxIncomingUniStreams  uint64

	sender            streamSender
	newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController

	mutex               sync.Mutex
	outgoingBidiStreams *outgoingBidiStreamsMap
	outgoingUniStreams  *outgoingUniStreamsMap
	incomingBidiStreams *incomingBidiStreamsMap
	incomingUniStreams  *incomingUniStreamsMap
	reset               bool
}
Share on

ruokeqx
WRITTEN BY
ruokeqx