0%

go 库学习之 bufio

标准库的 bufio 包实现了带缓冲的 I/O。它对 io.Reader 或 io.Writer 进行封装,并返回一个新的 io.Reader 或 io.Writer 对象。新的 I/O 对象在原有的 io.Reader 或 io.Writer 对象之上提供缓冲功能,从而减少系统调用次数,以提高 I/O 性能。

带缓冲的 Reader

bufio.Reader 类型实现了带缓冲的 io.Reader。它在实现读取操作时,会尽量一次性地从底层 Reader 读取数据并保存到缓冲区中。之后用户从缓冲 Reader 中读取数据时,它会尝试先返回缓冲区中的数据,只有必要时才继续从底层 Reader 读取数据。bufio.Reader 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Reader struct {
// 所使用的缓冲区
buf []byte
// 封装的底层 Reader
rd io.Reader // reader provided by the client
// r 表示缓冲区中当前读取的位置,即用户代码从缓冲 Reader 读取数据时,会从 r 处读取
// w 表示缓冲区中当前写入的位置,即从底层 Reader 读取到数据后,会从 w 处写入
r, w int // buf read and write positions
// 错误信息
err error
// 上次读取的最后一个字节,以支持 UnreadByte 操作
lastByte int // last byte read for UnreadByte; -1 means invalid
// 上次读取的 rune,以支持 UnreadRune 操作
lastRuneSize int // size of last rune read for UnreadRune; -1 means invalid
}

缓冲 Reader 管理相关函数

NewReader() 函数以默认缓冲区大小 4096 创建一个带缓冲的 Reader:

1
2
3
func NewReader(rd io.Reader) *Reader {
return NewReaderSize(rd, defaultBufSize)
}

NewReaderSize() 函数在创建缓冲 Reader 时,可以指定缓冲区的大小,但是最小长度为 16。

1
func NewReaderSize(rd io.Reader, size int) *Reader

值得说明的是,Reader 所使用的缓冲区在创建时长度就已经设置为对应的缓冲区大小了,它是靠 Reader.rReader.w 来实现对缓冲区读、写位置的管理。

1
2
3
4
5
func NewReaderSize(rd io.Reader, size int) *Reader {
......
r.reset(make([]byte, max(size, minReadBufferSize)), rd)
return r
}

Reader.Reset() 方法可以重新设置新的底层 Reader,该方法会丢弃缓冲区中已有的数据:

1
func NewReader(rd io.Reader) *Reader

Reader.Size() 方法用于返回缓冲区的大小:

1
func (b *Reader) Size() int { return len(b.buf) }

Reader.Buffered() 方法返回该 Reader 底层缓冲区中尚未处理的字节数,即可以直接从缓冲区中读取的数据长度。

1
func (b *Reader) Buffered() int { return b.w - b.r }

Read 系列方法

Reader.Read() 方法用于从缓冲 Reader 中读取数据,它的代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (b *Reader) Read(p []byte) (n int, err error) {
// 如果 len(p) 为 0,无法读取
n = len(p)
if n == 0 {
// 如果我们自己内部有数据,返回 0, nil
if b.Buffered() > 0 {
return 0, nil
}
// 否则返回 0,可能的错误信息
return 0, b.readErr()
}

// 如果当前缓冲区中无数据
if b.r == b.w {
// 判断 b.err
if b.err != nil {
return 0, b.readErr()
}
// 需要读取的字节数大于缓冲区大小,直接从底层 Reader 中读取数据数到 p 中
// 这样省去了 copy 的开销
if len(p) >= len(b.buf) {
n, b.err = b.rd.Read(p)
if n < 0 {
panic(errNegativeRead)
}
if n > 0 {
b.lastByte = int(p[n-1])
b.lastRuneSize = -1
}
// 总是返回
return n, b.readErr()
}

// 需要读取的字节数小于 buffer 大小
// 尝试从底层 Reader 中一次读取 len(buf) 个字节
b.r = 0
b.w = 0
n, b.err = b.rd.Read(b.buf)
if n < 0 {
panic(errNegativeRead)
}
// 如果没有读到数据,则直接返回,因为没有数据可返回
if n == 0 {
return 0, b.readErr()
}
b.w += n
}

// 将现有数据进行返回
n = copy(p, b.buf[b.r:b.w])
b.r += n
b.lastByte = int(b.buf[b.r-1])
b.lastRuneSize = -1
return n, nil
}

可以看到该 Reader.Read() 方法的核心逻辑是:

  • 只有当缓冲区中没有数据时,才会从底层 Reader 中读取数据
    • 如果需要的数据大于缓冲区大小,此时直接从底层 Reader 中读取数据到 p 中
    • 否则一次性地从底层 Reader 中读取 len(buf) 长度的数据到缓冲区中
  • 如果缓冲区中存在数据,则本次直接返回缓冲区中的数据

Reader.ReadByte() 方法会尝试读取一个字节,它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 缓冲 Reader 实现了 ByteReader 接口,每次读取一个字节
func (b *Reader) ReadByte() (byte, error) {
b.lastRuneSize = -1
// 如果缓冲区中没有数据可读
for b.r == b.w {
// 出现错误,返回 0 以及对应的错误码
if b.err != nil {
return 0, b.readErr()
}
b.fill() // buffer is empty
}
// 返回当前字节
c := b.buf[b.r]
b.r++
// 记录本次读取的字节,以支持 UnreadByte 操作
b.lastByte = int(c)
return c, nil
}

该函数的核心逻辑是:

  • 如果底层缓冲区中有数据,则直接从缓冲区返回一个数据即可
  • 如果缓冲区中没有数据,会尝试调用 b.fill() 尝试从底层 reader 中读取数据,直到遇到错误

Reader.fill() 非导出方法实现了从底层 Reader 中读取数据并保存到缓冲区中,它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 从底层 reader 中读取数据
func (b *Reader) fill() {
// 将当前尚未被用户读取的数据移动到 buffer 最前面
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}

// 如果写入位置已经超过 buffer 总长度
if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}

// 尝试从底层 Reader 中读取数据
for i := maxConsecutiveEmptyReads; i > 0; i-- {
// 从 b.rd 中读取数据,数据保存到 buf[b.w:] 处
// 因此最多读取 len(buf) - w 个字节数据
n, err := b.rd.Read(b.buf[b.w:])
if n < 0 {
panic(errNegativeRead)
}
// 先处理 n
b.w += n
// 如果返回了错误,包括 EOF 错误,则记录该错误
if err != nil {
b.err = err
return
}
// 只要成功读取数据,就返回
if n > 0 {
return
}

// 0, nil 则继续尝试读取
// 但是 0, nil 的次数是有限制的
}
// 连续 maxConsecutiveEmptyReads 次都读到空数据,记录成 io.ErrNoProgress 错误
b.err = io.ErrNoProgress
}

Reader.ReadRune() 方法会尝试读一个 Rune,返回该 rune、该 rune 的字节长度以及错误信息:

1
func (b *Reader) ReadRune() (r rune, size int, err error)
  • 如果无法读取到数据,该函数返回 0,0 以及错误信息
  • 即使字节序列是无效编码,该函数也会消耗一个字节,并返回 unicode.ReplacementChar (U+FFFD),size 认为是 1,而错误信息则为 nil

Reader.ReadSlice() 会尝试一直读取,直到遇到 delim 字节或者遇到错误(对于该函数,底层缓冲区满了也是一种错误)。该函数返回所读取的所有内容(包括结尾的 delim 字节)以及错误信息:

1
func (b *Reader) ReadSlice(delim byte) (line []byte, err error)
  • 如果在找到 delim 字节之前,就遇到了错误,该函数会返回 buffer 中所有的数据以及错误信息
  • 如果缓冲区已经满了,但是还是没有遇到 delim 字节,ReadSlice 返回的错误是 ErrBufferFull
  • 当且仅当返回的 line 不是以 delim 字节结尾时,err 才不为 nil
  • 由于该函数是直接返回底层 buffer 的切片而不是副本,因此在下次 I/O 操作后,返回的切片数据可能就被覆盖掉了。为了避免这个问题,可以使用 Reader.ReadBytes 或者 Reader.ReadString 方法
  • 只要返回了数据,该接口都支持 UnreadByte 以回退上一个字节

Reader.ReadLine() 方法是一个低级别的行读取行数,一般不会用到该函数。它的原型如下:

1
func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error)
  • 该函数尝试读取一行,并返回该行内容(不包过结尾的结尾的 \n 或者 \r\n
  • 如果因为缓冲区满而无法完整地读取改行,此时会先返回该行中已经读取到的数据部分并将 isPrefix 设置为 true,期待下次继续调用该函数已返回剩余的内容
  • 如果返回的是最后一个分段(不管结尾是否已 /n 或者 /r/n),isPrefix 设置为 false

Reader.ReadBytes() 函数类似于 ReadSlice 函数,但是它是以副本形式返回所读取到的数据(而不是直接返回底层 buffer 的切片),因此该函数不会返回 ErrBufferFull 错误,因为它总是新申请内存来保存所读取到的数据,因此它会一直尝试读取,直到遇到 delim 字节或者遇到错误,而不需要考虑底层缓冲区满的问题。同样,当且仅当返回的数据不是以 delim 字节结尾时,err 才不为 nil。

1
func (b *Reader) ReadBytes(delim byte) ([]byte, error)

Reader.ReadString() 函数类似于 Reader.ReadBytes() 函数,但是它是以字符串的形式返回数据,因此也可以认为返回的是底层 buffer 数据的副本。同样,当且仅当返回的数据不是以 delim 字节结尾时,err 才不为 nil。

1
func (b *Reader) ReadString(delim byte) (string, error)

Unread 系列方法

缓冲 Reader 实现了 UnreadByte() 方法,可以回退上一次读取操作的最后一个字节:

1
func (b *Reader) UnreadByte() error

缓冲 Reader 实现了 UnreadRune() 方法,可以回退上一次 ReadRune 操作所读取的 rune:

1
func (b *Reader) UnreadRune() error

bufio.Reader 实现了 io.WriterTo 接口,因此提供了 WriteTo() 方法,其核心逻辑是:

1
func (b *Reader) WriteTo(w io.Writer) (n int64, err error)
  • 首先将缓冲区中剩余的、未被读取的数据写入 w 中
  • 如果底层 rd 实现了 io.WriterTo 接口,则直接调用其 rd.WriteTo 即可
  • 如果 w 实现了 io.ReaderFrom 接口,则直接调用 w.ReadFrom 方法即可
  • 否则一直尝试从底层 rd 中读取数据,并写入 w 中

需要注意,WriteTo 会使 Unread 系列函数失效。

其他方法

Reader.Peek() 方法用于 Peek n 个字节的数据,即获取 Reader 中接下来的 n 个字节数据,但不会影响 Reader 的状态

1
func (b *Reader) Peek(n int) ([]byte, error)
  • 由于是直接返回底层 buffer 的切片而不是副本,因此下次读取 I/O 操作后,这个字节切片可能就失效了
  • 如果返回的字节数小于 n,则错误信息一定非 nil。如果返回的字节数等于 n,则错误信息为 nil
  • 如果是因为缓冲区满而导致返回的字节数小于 n,则错误值为 ErrBufferFull
  • Peek 操作会使 Unread 系列函数失效

Reader.Discard() 方法丢弃 Reader 中的 n 个字节。它返回实际丢弃的字节数以及遇到的错误信息:

1
func (b *Reader) Discard(n int) (discarded int, err error)
  • 如果返回值 为 n,则 err 为 nil。如果返回值小于 n,则 err 肯定不为 nil
  • 如果 0 <= n <= Reader.Buffered(),该函数肯定不会失败,因为它不需要从底层 Reader 中读取数据
  • 只有需要从底层 Reader 中读取数据进行丢弃时,才有可能遇到错误
  • Discard 方法会使 Unread 系列函数失效

带缓冲的 Writer

bufio.Writer 实现了带缓冲的 io.Writer,它在实现写入操作时,首先尝试将数据写入缓冲 buffer 中,当缓冲区满了的时候会尝试将缓冲区数据写入底层 io.Writer 中。bufio.Writer 的定义如下:

1
2
3
4
5
6
7
8
9
10
type Writer struct {
// 错误信息
err error
// 所使用的缓冲区
buf []byte
// 当前已经缓冲的字节数
n int
// 封装的底层 Writer
wr io.Writer
}

缓冲 Writer 管理相关函数

NewWriter() 函数以默认缓冲区大小 4096 创建一个带缓冲的 Writer:

1
2
3
func NewWriter(w io.Writer) *Writer {
return NewWriterSize(w, defaultBufSize)
}

NewWriterSize() 函数以指定大小创建带缓冲的 Writer:

1
func NewWriterSize(w io.Writer, size int) *Writer

同样,Writer 所使用的缓冲区在创建时长度就已经设置为对应的缓冲区大小了,它是靠 Writer.n 来实现对写入位置的管理。

Writer.Reset() 方法可以重新设置新的底层 Writer,同时也会丢弃缓冲区中已有的数据、错误信息等:

1
func (b *Writer) Reset(w io.Writer)

Writer.Size() 方法返回整个缓冲区的大小:

1
func (b *Writer) Size() int { return len(b.buf) }

Writer.Buffered() 方法返回缓冲区中已经缓存的字节数:

1
func (b *Writer) Buffered() int { return b.n }

Writer.Available() 方法返回缓冲区中剩余可用的缓冲区长度,即当前缓冲区还可以继续缓存的字节数:

1
func (b *Writer) Available() int { return len(b.buf) - b.n }

Writer.AvailableBuffer() 方法以字节切片的形式返回当前缓冲剩余可用的缓冲区,返回的字节切片长度为 0,但是 capactityWriter.Available()

1
return b.buf[b.n:][:0]

一种典型用法是通过 append 等函数修改返回的 buffer 后,立即调用 Writer 方法进行写入:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
w := bufio.NewWriter(os.Stdout)

for _, i := range []int64{1, 2, 3, 4, 5} {
b := w.AvailableBuffer()
b = strconv.AppendInt(b, i, 10)
b = append(b, ' ')
w.Write(b)
}

w.Flush()
}

Writer.Flush() 方法将缓冲的数据都写入底层 io 中,并返回错误信息:

  • 如果写入时发生错误,此时将返回对应的错误信息。同时缓冲区中会保留剩余的未写入完成的数据
  • 如果写入的数据长度小于缓冲区长度,会返回 io.ErrShortWrite
  • 如果缓冲区中的数据都写入成功,且没有发生错误,返回 nil
1
func (b *Writer) Flush() error

Write 系列方法

Writer.Write() 方法用于实现将数据写入缓冲 Writer 中。它的代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (b *Writer) Write(p []byte) (nn int, err error) {
// 如果要写入的字节数超过当前 buffer 可用大小
for len(p) > b.Available() && b.err == nil {
var n int
// 当前没有缓冲的数据,直接写入
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
n, b.err = b.wr.Write(p)
} else {
// 将 p 中的部分数据拷贝到 buffer 中,进行一次性写入
n = copy(b.buf[b.n:], p)
b.n += n
b.Flush()
}
nn += n
p = p[n:]
}
if b.err != nil {
return nn, b.err
}
// 剩余的数据直接保存到 buffer 中即可
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
}

可以看到,其核心逻辑是:

  • 当要写入的数据长度大于缓冲区剩余空间时
    • 如果缓冲区没有数据,直接写入底层 Writer,避免 copy 的开销
    • 否则将数据写入剩余缓冲区空间中,再通过 Writer.Flush 一次性写入整个缓冲区数据
  • 如果要写入的数据长度小于缓冲区空间时,直接保存到缓冲区中即可

Writer.WriteByte() 方法用于将单个字节写入缓冲 Write 中:

1
func (b *Writer) WriteByte(c byte) error

Writer.WriteRune() 方法用于将一个 rune 写入缓冲 Write 中:

1
func (b *Writer) WriteRune(r rune) (size int, err error)

Writer.WriteString() 方法用于将一个字符串写入底层 Writer 中,它的实现逻辑类似于 Writer.Write,只不过当要写入的数据长度大于缓冲区长度,且当前缓冲区中又没有数据时,其会判断底层 Writer 是否实现 WriteString 方法,如果实现了则直接调用该方法进行写入。

1
func (b *Writer) WriteString(s string) (int, error)

bufio.Writer 实现了 io.ReaderFrom 接口,其 ReadFrom 方法的核心逻辑如下:

1
func (b *Writer) ReadFrom(r io.Reader) (n int64, err error)
  • 如果缓冲区中没有数据,且底层 Writer 实现了 io.ReaderFrom,则直接调用底层 Writer 的 ReadFrom 方法
  • 否则会尝试从 r 中读取数据并保存到缓冲区中,每次缓冲区满了,则尝试一次 Flush 操作

带缓冲的 ReadWriter

bufio 包还提供了一个带缓冲区的 ReadWriter 类型,它是直接通过嵌入匿名类型来实现的:

1
2
3
4
type ReadWriter struct {
*Reader
*Writer
}
1
2
3
func NewReadWriter(r *Reader, w *Writer) *ReadWriter {
return &ReadWriter{r, w}
}

Scanner

bufio.Scanner 提供了一个方便的接口来读取数据,可以按行、按单词、或者自定义分隔符的方式来读取数据。每次调用 Scanner.Scan 方法,会返回输入源中的一段 token,tokens 之间的分隔字节会被跳过。可以通过 Scanner.Split 方法来指定如何分割 token,默认就是以 换行符 作为分隔字节。

Scanner 也是基于一段缓冲区来保存从底层 Reader 中读取的数据,并基于缓冲区中的数据进行分割。缓冲区的最大长度是有限的,最大不能超过 64k。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Scanner struct {
// 底层 reader
r io.Reader // The reader provided by the client.
// 分割函数
split SplitFunc // The function to split the tokens.
// token 的最大大小
maxTokenSize int // Maximum size of a token; modified by tests.
// 分隔出的 token
token []byte // Last token returned by split.
// 所使用的底层 buffer
buf []byte // Buffer used as argument to split.
// buffer 中未处理数据的起始、结束位置
start int // First non-processed byte in buf.
end int // End of data in buf.
// 错误信息
err error // Sticky error.
// 连续读到空 token 的次数
empties int // Count of successive empty tokens.
// 是否已经调用过 Scan 方法了
scanCalled bool // Scan has been called; buffer is in use.
// 是否已经 done
done bool // Scan has finished.
}

NewScanner() 函数用于创建一个 Scanner 对象,其中参数 r 指定了底层 reader,即从该 reader 中读取数据,再按照指定的分隔方法来返回 token

1
func NewScanner(r io.Reader) *Scanner

Scanner.Scan() 方法从输入源中读取下一段 token,读取到的 token 可以通过 Scanner.Bytes() 或者 Scanner.Text() 获取。如果没有更多 tokens(有可能是遇到 EOF 或者其他错误),该函数返回 false,此时可以通过 Scanner.Err() 获取对应的错误(如果是因为 EOF 而终止,该函数返回 nil 错误)。如果存在 token,则返回 true。

1
func (s *Scanner) Scan() bool

Scan 函数的核心逻辑是:

  • 对缓冲区中已有的数据进行分割处理,判断是否能获取 token
  • 如果不能获取 token,则尝试从底层 reader 中继续读取数据并保存到缓冲区中,之后再重新进行分割处理
  • 如果缓冲区满了,会扩大缓冲区,但缓冲区大小不能超过 s.maxTokenSize

Scanner.Split() 方法用于指定分割函数,分隔函数的原型为 SplitFunc

1
2
3
4
5
// data 参数是待处理的数据,atEOF 表示是否还有更多数据
// 返回值:advance 表示表次前进处理的字节数、token 则表示分割出的 token,err 则表示错误信息
type SplitFunc func(data []byte, atEOF bool) (advance int, token []byte, err error)

func (s *Scanner) Split(split SplitFunc)

以下函数都是预定义的分隔函数:

1
2
3
4
5
6
7
8
9
10
11
// 每次返回一字节
func ScanBytes(data []byte, atEOF bool) (advance int, token []byte, err error)

// 每次返回一个 rune
func ScanRunes(data []byte, atEOF bool) (advance int, token []byte, err error)

// 每次返回一行
func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error)

// 每次返回一个单词
func ScanWords(data []byte, atEOF bool) (advance int, token []byte, err error)

Scanner.Buffer() 方法可以设置 Scanner 所使用的缓冲区,以及 token 的最大运行长度。

1
func (s *Scanner) Buffer(buf []byte, max int)

小结

bufio 包提供了带缓冲的 Reader 和 Writer,以提高 I/O 性能。同时 bufio 包还提供了一个 Scanner 类型,可以方便地从 io.Reader 中按照字节、rune、单词或行的方式来返回数据。