0%

python cookbook(05):文件与 I/O

所有程序都要处理输入和输出,这里将介绍与文件处理、I/O 操作相关的 Python 编码技巧。

读写文本数据

问题

你需要读写各种不同编码的文本数据,例如 ASCII、UTF-8 或 UTF-16 编码等。

解决方案

  • 使用 rt 模式读取文本文件
  • 使用 wt 模式写入文本文件。此时如果该文件之前包含了内容,则这些内容会被清除。如果想以追加的方式写入,可以使用 at
  • 文件的读写操作默认使用系统编码,可以通过 sys.getdefaultencoding() 得到系统编码。如果想指定编码,可以在 open() 函数中指定 encoding 参数
  • latin-1 编码是字节 0-255 到 U+0000 至 U+00FF 范围内 Unicode 字符的直接映射。当读取一个未知编码的文本时,使用 latin-1 编码永远不会产生解码错误。虽然不一定会产生正确的文本解码数据,但是它能够提取出足够多的有用数据。而且将数据写回时,原先的数据还是会被保留。也就是说,对任何编码的字节流使用 latin-1 编码都不会有解码问题
  • 使用 with 语句可以被使用的文件创建一个上下文环境,with 块结束时,文件会自动关闭。如果不使用 with 语句,就要记得手动关闭文件
  • 默认情况下,Python 会以统一的方式处理换行符。此时读取文本时,Python 可以识别系统换行符(Windows 下是 \r\n,Unix 下是 \n)并将其转换为单个 \n 字符。输出时也是类似处理,将 \n 转换为系统默认的换行符。如果不希望这样处理,可以在 open() 函数中指定 newline=''
  • 可以在 open() 函数中使用 errors 参数来处理编码错误,但这不是推荐的方式。对于文本处理的首要原则就是要确保你总是使用正确的编码

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> with open('somefile.txt', 'rt') as f:
... data = f.read()
...

>>> with open('somefile.txt', 'rt') as f:
... for line in f:
... print(line)
...

>>> with open('somefile.txt', 'wt') as f:
... f.write('hello, world')
...
>>> with open('somefile.txt', 'wt') as f:
... print('hello, world', file=f)
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
>>> with open('somefile.txt', 'wt') as f:
... f.write('中文')
...
2

>>> with open('somefile.txt', 'rt', encoding='ascii') as f:
... data = f.read()
... print(data)
...
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "/usr/lib/python3.10/encodings/ascii.py", line 26, in decode
return codecs.ascii_decode(input, self.errors)[0]
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)

>>> with open('somefile.txt', 'rt', encoding='utf-8') as f:
... data = f.read()
... print(data)
...
中文

```python
>>> f = open('somefile.txt', 'rt', encoding='latin-1')
>>> data = f.read()
>>> len(data)
6
>>> bytes(data, encoding='latin-1')
b'\xe4\xb8\xad\xe6\x96\x87'
>>> f.close()

>>> f = open('somefile.txt', 'rt', encoding='utf-8')
>>> data = f.read()
>>> len(data)
2
>>> bytes(data, encoding='utf-8')
b'\xe4\xb8\xad\xe6\x96\x87'

>>> [hex(ord(c)) for c in bytes('中文', encoding='utf-8').decode('latin-1')]
['0xe4', '0xb8', '0xad', '0xe6', '0x96', '0x87']

# ls -l somefile.txt
-rw-r--r-- 1 root root 6 Jan 11 22:50 somefile.txt
# hexdump somefile.txt
0000000 b8e4 e6ad 8796
0000006
1
2
3
4
5
6
7
8
9
10
>>> f = open('somefile.txt', 'rt', encoding='ascii')
>>> f.read()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.10/encodings/ascii.py", line 26, in decode
return codecs.ascii_decode(input, self.errors)[0]
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)
>>> f = open('somefile.txt', 'rt', encoding='ascii', errors='ignore')
>>> f.read()
''

打印输出到文件中

问题

想将 print() 函数的输出重定向到一个文件中。

解决方案

  • print() 函数中可以指定 file 关键字参数。此时文件必须是以文本模式打开,如果是以二进制模式打开,会出错

示例

1
2
3
>>> with open('somefile.txt', 'wt') as f:
... print('hello world', file=f)
...

使用其他分隔符或行终止符打印

问题

在使用 print() 输出时,想改变默认的分隔符或行尾符。

解决方案

  • print() 函数支持使用 sep 参数设置分隔符(默认为空格)、使用 end 参数设置行尾符(默认为 \n

示例

1
2
3
4
5
6
>>> print('ACME', 50, 91.5)
ACME 50 91.5
>>> print('ACME', 50, 91.5, sep=',')
ACME,50,91.5
>>> print('ACME', 50, 91.5, sep=',', end='.')
ACME,50,91.5.>>>

读写字节数据

问题

需要读写二进制文件,例如图片、声音文件等。

解决方案

  • 在使用 open() 函数时,使用 rb 以二进制可读模式打开文件、使用 wb 以二进制可写模式打开文件
  • 在读取二进制数据时,所有返回的数据都是以 字节字符串,而不是 文本字符串。类似地,在写入对象时,必须保证参数也是字节形式
  • 需要时刻牢记,当读取到二进制数据后,在数据处理时,索引和迭代操作返回的是字节的值
  • 当需要从二进制模式的文件中读取或写入文本文件,需要进行解码和编码操作
  • 二进制 I/O 还有一个鲜为人知的特性是数组和 C 结构体类型能直接被写入,而不需要中间转换为自己的对象。这个适合于任何实现了被称之为 缓冲接口 的对象,这种对象会直接暴露其底层的内存缓冲区给能处理它的操作,二进制数据的写入就是这类操作之一。很多对象还允许直接通过文件对象的 readinto() 方法直接读取二进制数据到其底层内存中去。使用这种技术需要格外小心,因为它通常具有平台相关性,可能会依赖字长、字节序等

示例

1
2
3
4
5
6
7
8
>>> with open('somefile.bin', 'wb') as f:
... f.write(b'hello, wolrd')
...
12
>>> with open('somefile.bin', 'rb') as f:
... print(f.read())
...
b'hello, wolrd'
1
2
3
4
5
>>> f = open('somefile.bin', 'rb')
>>> d = f.read()
>>> d[0]
104
>>> f.close()
1
2
3
4
5
6
7
8
>>> with open('somefile.bin', 'wb') as f:
... f.write('hello, world'.encode('utf-8'))
...
12
>>> with open('somefile.bin', 'rb') as f:
... print(f.read().decode('utf-8'))
...
hello, world
1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import array
>>> nums = array.array('i', [1, 2, 3, 4])
>>> with open('data.bin', 'wb') as f:
... f.write(nums)
...
16
>>> a = array.array('i', [0, 0, 0, 0, 0])
>>> with open('data.bin', 'rb') as f:
... f.readinto(a)
...
16
>>> a
array('i', [1, 2, 3, 4, 0])

文件不存在才能写入

问题

想在文件中写入数据,但是前提是这个文件当前不存在。即不允许覆盖已存在的文件内容。

解决方案

  • 由于 w 模式打开文件后会将文件内容清空,因此对于已存在的文件内容,都会被清除掉
  • open() 函数中,可以使用 x 模式来替代 w 来解决该问题。此时如果文件已经存在,则直接抛出异常

示例

1
2
3
4
5
6
7
>>> if not os.path.exists('somefile.txt'):
... with open('somefile.txt', 'wt') as f:
... f.write('hello\n')
... else:
... print('file exists')
...
file exists
1
2
3
4
5
6
>>> with open('somefile.txt', 'xt') as f:
... f.write('hello\n')
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
FileExistsError: [Errno 17] File exists: 'somefile.txt'

字符串的 I/O 操作

问题

想使用类似于操作文件对象的方法来操作文本或二进制字符串。

解决方案

  • 使用 io.StringIO()io.BytesIO() 类来创建 类文件对象,之后在这些对象上执行字符串、二进制字符串操作
  • 当想模拟一个普通文件的时候,StringIOBytesIO 类是很有用的,有时候,可以将该类型的对象传递到某个参数为普通文件对象的函数。但是由于它们没有正确的文件描述符,因此不能用在那些需要使用真实系统级文件的地方

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
>>> from io import StringIO
>>> from io import BytesIO
>>> s = StringIO()
>>> s.write('Hello World\n')
12
>>> s.getvalue()
'Hello World\n'

>>> print('this is test', file=s)
>>> s.getvalue()
'Hello World\nthis is test\n'
>>> s = StringIO('Hello\nWorld\n')
>>> s.read(4)
'Hell'
>>> s.read()
'o\nWorld\n'

>>> b = BytesIO()
>>> b.write(b'binary data')
11
>>> b.getvalue()
b'binary data'

读写压缩文件

问题

想读写 gzip 或 bzip2 格式的压缩文件。

解决方案

  • gzip 和 bzip2 模块可以很容易处理这些文件。两个模块都为 open() 函数提供了来解决该问题
  • gzip.open()bzip2.open() 接受和 open() 函数一样的参数,包括 encodingerrorsnewline 等等。但是默认打开是二进制模式
  • 当写入压缩数据时,可以使用 compresslevel 的可选关键字参数来指定一个压缩级别,默认等级是 9,即最高的压缩级别
  • gzip.open()bzip2.open() 还支持一个特性:可以作用在一个已经存在并以二进制模式打开的文件上。这就允许 gzipbz 模块可以作用在许多类文件对象上,例如套接字、管道和内存中的文件等等

示例

1
2
3
4
5
6
7
8
9
10
11
>>> import gzip
>>> with gzip.open('somefile.txt.gz', 'rt') as f:
... print(f.read())
...
hello world

>>> import bz2
>>> with bz2.open('somefile.txt.bz2', 'rt') as f:
... print(f.read())
...
hello, world
1
2
3
4
5
>>> import gzip
>>> with gzip.open('somefile.txt.gz', 'wt', compresslevel=5) as f:
... f.write('new line')
...
8
1
2
3
4
5
6
>>> import gzip
>>> f = open('somefile.txt.gz', 'rb')
>>> with gzip.open(f, 'rt') as g:
... print(g.read())
...
new line

固定大小记录的文件迭代

问题

想在一个固定长度记录或者数据块的集合上迭代。

解决方案

  • 通过 functools.partial() 函数可以对 read 函数进行包装,并将其 size 参数固化下来
  • 如果给 iter() 函数传递一个可调用对象和一个标记值。此时可以创建一个迭代器,每次迭代的时候都会调用该函数,并且当函数的返回值和指定的标记值相同时,表示迭代结束

示例

1
2
3
4
5
6
7
8
>>> from functools import partial

>>> RECORD_SIZE=32
>>> with open('somefile.bin', 'rb') as f:
... records = iter(partial(f.read, RECORD_SIZE), b'')
... for r in records:
... pass
>>>

读取二进制数据到可变缓冲区中

问题

想直接读取二进制数据到一个可变缓冲区中,而不需要做任何的中间复制操作。或者想原地修改数据并将它写回到一个文件中。

解决方案

  • 文件对象的 readinto 方法可以在预先分配的内存中填充数据,而不是为新对象重新分配内存再返回它们,因此可以用它来避免大量的内存分配操作
  • memoryview 可以通过零复制的方式对已经存在的缓冲区执行切片操作,甚至可以修改它的内容
  • Python 中其他函数库和模块也提供了一些 into 相关的函数,例如 recv_intopack_into 等,也是用来支持直接 I/O,即用来填充或修改缓冲区中的内容

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def read_into_buffer(filename):
buf = bytearray(os.path.getsize(filename))
with open(filename, 'rb') as f:
f.readinto(buf)
return buf


filename = 'sample.bin'
with open(filename, 'wb') as f:
f.write(b'hello world')


buf = read_into_buffer(filename)
print(buf)
1
2
3
4
5
6
7
8
9
10
11
>>> buf = bytearray(b'hello, world')
>>> m1 = memoryview(buf)
>>> m1
<memory at 0x7fa5a29847c0>
>>> m2 = m1[-5:]
>>> m2
<memory at 0x7fa5a2984880>
>>> m2 = m1[-5:]
>>> m2[:] = b'WORLD'
>>> buf
bytearray(b'hello, WORLD')

内存映射的二进制文件

问题

想内存映射一个二进制文件到一个可变字节数组中,目的是为了随机访问它的内容或者是原地做些修改。

解决方案

  • 使用 mmap 模块来内存映射文件,此时就将文件内容映射到了内存中,之后就可以随机访问文件内容。为了使用该函数,需要一个已经创建并且内容不为空的文件
  • 默认 mmap.map() 返回的 mmap 对象同时支持读和写操作,任何修改内容都会复制回原文件。如果需要只读模式,可以将参数 access 设置为 mmap.ACCESS_READ。如果想在本地修改数据,同时又不希望将修改写回到原始文件中,可以使用 mmap.ACCESS_COPY
  • mmap.map() 返回的 mmap 对象同样可以作为一个上下文管理器来使用,此时底层文件会自动关闭

内存映射并不会导致整个整个文件内容被读取到内存中,相反,操作系统仅仅为文件内容保留了一段虚拟内存。当你访问文件的不同区域时,这些区域的内容才会根据需要被读取到内存区域中。整个过程是透明的,在幕后完成。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
import mmap


def memory_map(filename, access=mmap.ACCESS_DEFAULT):
size = os.path.getsize(filename)
fd = os.open(filename, os.O_RDWR)
return mmap.mmap(fd, size, access=access)


with memory_map('sample.bin') as m:
print(len(m))
print(m[:])
m[:] = b'HELLO WORLD'


with open('sample.bin', 'rb') as f:
print(f.read())
1
2
3
4
# python3 mmap_test.py
11
b'hello world'
b'HELLO WORLD'

文件路径名的操作

问题

需要使用路径名来获取文件名、目录名、绝对目录等。

解决方案

  • 可以使用 os.path 模块中的函数来操作路径名
  • 对于文件名操作,不要使用标准字符串操作来构造自己的代码,因为这样的代码可移植性不佳

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> import os

>>> path = '/root/code/private/python/python_cookbook/05/data.bin'
>>> os.path.basename(path)
'data.bin'

>>> os.path.dirname(path)
'/root/code/private/python/python_cookbook/05'

>>> os.path.join('tmp', 'data', os.path.basename(path))
'tmp/data/data.bin'

>>> path = '~/data'
>>> os.path.expanduser(path)
'/root/data'

>>> os.path.splitext('/root/code/private/python/python_cookbook/05/data.bin')
('/root/code/private/python/python_cookbook/05/data', '.bin')

测试文件是否存在

问题

想测试一个文件或者目录是否存在。

解决方案

  • 使用 os.path 模块可以测试一个文件或者目录是否存在。
  • os.path 模块也提供了一系列函数用于测试该文件的类型
  • os.path 模块也提供了一系列函数用于获取文件的元数据,例如文件大小、修改日期

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> import os
>>>
>>> os.path.exists('/etc/password')
False
>>> os.path.exists('/etc/passwd')
True

>>> os.path.isfile('/etc/passwd')
True
>>> os.path.isdir('/etc/passwd')
False
>>> os.path.islink('/usr/local/bin/cz')
True
>>> os.path.realpath('/usr/local/bin/cz')
'/usr/local/lib/node_modules/commitizen/bin/git-cz'

>>> os.path.getsize('/etc/passwd')
1822
>>> os.path.getmtime('/etc/passwd')
1656403216.6407757

获取文件夹中的文件列表

问题

想获取文件系统中某个目录下的所有文件列表。

解决方案

  • 使用 os.listdir() 函数来获取某个目录中的文件列表。该函数会返回目录中的所有文件列表,包括文件、子目录、符号链接等
  • 如果需要对返回的列表进行过滤,可以考虑使用 os.path 库中的一些函数,对于文件名的匹配,也可以考虑使用 glob 或 fnmatch 模块
  • 函数 os.listdir() 返回的实体列表会根据系统默认的文件名编码来解码,如果碰到一些不能正常解码的文件名,此时可能需要进行特殊处理

示例

1
2
3
>>> import os
>>> os.listdir('.')
['somefile.txt.bz2', 'sample.bin', 'somefile.txt.gz', 'data.bin', 'readinto.py', 'mmap_test.py', 'somefile.bin']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
>>> names = [name for name in os.listdir('.') if os.path.isfile(os.path.join('.', name))]
>>> names
['somefile.txt.bz2', 'sample.bin', 'somefile.txt.gz', 'data.bin', 'readinto.py', 'mmap_test.py', 'somefile.bin']
>>> dirnames = [name for name in os.listdir('.') if os.path.isdir(os.path.join('.', name))]
>>> dirnames
[]
>>> pyfiles = [name for name in os.listdir('.') if name.endswith('.py')]
>>> pyfiles
['readinto.py', 'mmap_test.py']

>>> import glob
>>> pyfiles = glob.glob('./*.py')
>>> pyfiles
['./readinto.py', './mmap_test.py']
>>>
>>> from fnmatch import fnmatch
>>> pyfiles = [name for name in os.listdir('.') if fnmatch(name, '*.py')]
>>> pyfiles
['readinto.py', 'mmap_test.py']

>>> name_sz_date = [(name, os.path.getsize(name), os.path.getmtime(name)) for name in pyfiles]
>>> name_sz_date
[('readinto.py', 359, 1673753877.911599), ('mmap_test.py', 425, 1673841335.510991)]

>>> file_meta = [(name, os.stat(name)) for name in pyfiles]
>>> [(name, m.st_size, m.st_mtime) for name, m in file_meta]
[('readinto.py', 359, 1673753877.911599), ('mmap_test.py', 425, 1673841335.510991)]

忽略文件名编码

问题

想使用原始文件名执行文件的 I/O 操作,即文件名没有经过系统默认编码去解码或者编码过。

解决方案

  • 默认情况下,所有的文件名都会根据 sys.getfilesystemencoding() 返回的文本编码来编码或者解码
  • 如果因为某种原因你想忽略这种编码,可以使用一个原始字节字符串来指定一个文件名即可

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> import sys
>>> sys.getfilesystemencoding()
'utf-8'

>>> with open('中文.txt', 'w') as f:
... f.write('hello')
...
5

>>> os.listdir('.')
['somefile.txt.bz2', '中文.txt', 'sample.bin', 'somefile.txt.gz', 'data.bin', 'readinto.py', 'mmap_test.py', 'somefile.bin']

>>> os.listdir(b'.')
[b'somefile.txt.bz2', b'\xe4\xb8\xad\xe6\x96\x87.txt', b'sample.bin', b'somefile.txt.gz', b'data.bin', b'readinto.py', b'mmap_test.py', b'somefile.bin']

>>> with open(b'\xe4\xb8\xad\xe6\x96\x87.txt') as f:
... print(f.read())
...
hello

打印不合法的文件名

问题

程序在获取目录中的文件名列表后,尝试打印文件名出现崩溃,此时抛出 UnicodeEncodeError 异常并打印 surrogates not allowed 错误提示。

解决方案

默认情况下,Python 假定文件名都已经根据 sys.getfilesystemencoding() 的值编码过了。但是有时候也会出现一些文件名没有经过正确编码,此时执行 os.listdir() 就会出现问题,因为 Python 不知道如何将这些文件名转换为文本字符串。此时 Python 的解决方案是从文件名中尝试获取未解码的字节值 \xhh 并将它映射为 Unicode 字符 \udchh 表示的所谓 代理编码

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
>>> with open('b\udce4d.txt', 'w') as f:
... f.write('hello')
...
5
>>> files = os.listdir('.')
>>> for name in files:
... print(name)
...
somefile.txt.bz2
中文.txt
sample.bin
somefile.txt.gz
data.bin
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
UnicodeEncodeError: 'utf-8' codec can't encode character '\udce4' in position 1: surrogates not allowed

>>> def bad_filename(name):
... return repr(name)[1:-1]
...
>>> for name in files:
... try:
... print(name)
... except UnicodeEncodeError:
... print(bad_filename(name))
...
somefile.txt.bz2
中文.txt
sample.bin
somefile.txt.gz
data.bin
b\udce4d.txt
readinto.py
mmap_test.py
somefile.bin

增加或改变已打开文件的编码

问题

如何在不关闭一个已打开文件的前提下增加或改变它的 Unicode 编码。

解决方案

Python 的 I/O 系统由一系列层次构建而成:

1
2
3
4
5
6
7
>>> f = open('sample.txt', 'w')
>>> f
<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
>>> f.buffer
<_io.BufferedWriter name='sample.txt'>
>>> f.buffer.raw
<_io.FileIO name='sample.txt' mode='wb' closefd=True>
  • io.TextIOWrapper 是一个编码和解码 Unicode 的文本处理层
  • io.BufferedWriter 是一个处理二进制数据的带缓冲的 I/O 层
  • io.FileIO 则是一个表示操作系统底层文件描述符的原始文件

如果想给一个以二进制模式打开的文件添加 Unicode 编码/解码方式,可以使用 io.TextIOWrapper 对象包装它。如果你想修改一个已经打开的文本模式的文件的编码方式,可以先使用 detach() 方法移除掉已经存在的文本编码层,并使用新的编码方式替代。

  • 通过访问属性值来直接操作不同的层是不安全的
  • detach() 方法会断开文件的最顶层并返回第二层,一旦断开最顶层后,就可以给返回结果添加一个新的最顶层
  • 这种技术除了可以用于改变编码,还可以用于改变文件行处理、错误机制以及文件处理的其他方面

示例

1
2
3
4
5
6
>>> import urllib.request
>>> import io
>>>
>>> u = urllib.request.urlopen('http://www.baidu.com')
>>> f = io.TextIOWrapper(u, encoding='utf-8')
>>> text = f.read()
1
2
3
4
5
6
>>> import sys
>>> sys.stdout.encoding
'utf-8'
>>> sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='latin-1')
>>> sys.stdout.encoding
'latin-1'
1
2
3
4
5
6
7
8
>>> f
<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
>>> b = f.detach()
>>> b
<_io.BufferedWriter name='sample.txt'>
>>> f = io.TextIOWrapper(b, encoding='latin-1')
>>> f
<_io.TextIOWrapper name='sample.txt' encoding='latin-1'>
1
2
3
4
5
>>> print('Jalape\u00f1o')
Jalapeño
>>> sys.stdout=io.TextIOWrapper(sys.stdout.detach(), encoding='ascii', errors='xmlcharrefreplace')
>>> print('Jalape\u00f1o')
Jalape&#241;o

将字节写入文本文件

问题

你想在文本模式打开的文件中写入原始的字节数据。

解决方案

I/O 系统以层级结构的形式构建而成。文本文件是通过在一个拥有缓冲的二进制模式文件上增加一个 Unicode 编码/解码层来创建。buffer 属性就是对应的缓冲区,直接访问它的话就可以绕过文本编码/解码。

示例

1
2
3
4
5
6
7
8
9
>>> import sys

>>> sys.stdout.write(b'hello')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: write() argument must be str, not bytes

>>> sys.stdout.buffer.write(b'hello')
hello5

将文件描述符包装成文件对象

问题

想把一个已打开文件描述符包装成一个更高层的 Python 文件对象。

解决方案

  • 对于一个文件描述符,可以使用 open() 函数来将其包装为一个 Python 文件对象。此时只需要将该文件描述符作为第一个参数传递给 open() 即可(替代文件名)
  • 当高层的文件对象关闭或者破坏时,底层的文件描述符也会被关闭。如果你想改变该行为,可以给 open() 函数传递一个可选的 closefd=False

在 Unix 系统中,这种包装文件描述符的技术可以很方便地将一个类文件接口作用于一个以不同方式打开的 I/O 通道上(例如管道、套接字等)。你也可以使用该技术来构造一个别名,允许你以不同于第一次打开文件的方式使用它。

  • 尽管可以将一个已存在的文件描述符包装成一个正常的文件对象,但是需要注意并不是所有的文件模式都支持,并且对某些类型的文件描述符可能会有副作用,在不同的操作系统上行为也不一致
  • 如果想将类文件接口作用在一个套接字并希望代码可以跨平台,可以使用套接字对象的 makefile 方法

示例

1
2
3
4
5
6
>>> import os
>>> fd = os.open('sample.txt', os.O_WRONLY | os.O_CREAT)
>>> f = open(fd, 'wt')
>>> f.write('hello')
5
>>> f.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def echo_client(client_sock, addr):
print("Got connection from", addr)

client_in = open(client_sock.fileno(), 'rt', encoding='latin-1',
closed=False)
client_out = open(client_sock.fileno(), 'wt', encoding='latin-1',
closed=False)

for line in client_in:
client_out.write(line)
client_out.flush()

client_sock.close()


def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(1)

while True:
client, addr = sock.accept()
echo_client(client, addr)
1
2
3
4
5
6
>>> import sys
>>> bstdout = open(sys.stdout.fileno(), 'wb', closefd=False)
>>> bstdout.write(b'hello')
5
>>> bstdout.flush()
hello>>>

创建临时文件和文件夹

问题

需要在程序运行时创建一个临时文件或目录,并且希望使用之后可以自动销毁掉。

解决方案

  • tempfile 模块中有很多函数可以完成该任务。使用 tempfile.TemporaryFile 创建一个匿名的临时文件,可以使用 NamedTemporaryFile() 来创建一个带名字的临时文件。
  • 关闭临时文件时,文件会自动删除,可以使用 delete=False 改变这一行为
  • 使用 tempfile.TemporaryDirectory() 创建一个临时目录
  • 更低级别的创建临时文件和目录的方法是 mkstemp()mkdtemp()

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> from tempfile import TemporaryFile
>>>
>>> with TemporaryFile('w+t') as f:
... f.write('Hello')
... f.write('World\n')
... f.seek(0)
... data = f.read()
...
5
6
0
>>> data
'HelloWorld\n'
1
2
3
4
5
>>> from tempfile import NamedTemporaryFile
>>> with NamedTemporaryFile('w+t') as f:
... print(f.name)
...
/tmp/tmp6bo6rv1m

与串行端口的数据通信

问题

想通过串行端口读写数据,典型场景是和一些硬件设备打交道。

解决方案

串行通信最好的选择是使用 pySerial 包。涉及到串口的 I/O 都是二进制模式的,因此需要确保代码操作的的是字节而不是文本。

示例

1
2
3
4
5
6
7
8
>>> import serial
>>> ser = serial.Serial('/dev/tty.usbmodem641', # Device name varies
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1)
>>> ser.write(b'G1 X50 Y50\r\n')
>>> resp = ser.readline()

序列化 Python 对象

问题

需要将一个 Python 对象序列化为一个字节流,以便将它保存到一个文件、存储到数据库或者通过网络传输它。

解决方案

序列化操作最普遍的做法就是使用 pickle 模块:

  • 为了将一个对象保存到一个文件中,可以使用 dump(),从文件中恢复使用 load()
  • 为了将一个对象直接转换为字节序列,使用 dumps(),从字节流中恢复一个对象使用 loads()

pickle 是一种 Python 特有的自描述的数据编码,通过自描述,被序列化后的数据包含每个对象开始和结束以及它的类型信息。

有些类型的对象是不能序列化的,这些通常是那些依赖外部系统状态的对象,例如打开的文件、网络连接、线程、进程、栈帧等等。用户自定义类型可以通过提供 __getstate()____setstate__() 方法来绕过这些限制。pickle.dump() 会调用 __getstate()__ 获取序列化的对象。而 __setstate()__ 会在反序列化时被调用。

pickle 也有一些限制:

  • 对于大型数据结构比如 array、numpy 模块创建的二进制数组,pickle 并不是一个高效的编码方式
  • 由于 pickle 是 Python 特有的并且附着在源码之上,所以如果需要长期存储数据不应该选用它。对于在数据库和存档文件中存储数据时,最好选用更加标准的数据编码格式例如 XML、CSV 和 JSON

示例

1
2
3
4
5
6
7
8
9
10
11
12
>>> import pickle
>>>
>>> data = {'a': 1, 'b':2}
>>> f = open('sample.bin', 'wb')
>>> pickle.dump(data, f)
>>> f.close()

>>> t = open('sample.bin', 'rb')
>>> d = pickle.load(t)
>>> d
{'a': 1, 'b': 2}
>>> t.close()
1
2
3
4
5
6
>>> s = pickle.dumps(d)
>>> s
b'\x80\x04\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x01a\x94K\x01\x8c\x01b\x94K\x02u.'
>>> d = pickle.loads(s)
>>> d
{'a': 1, 'b': 2}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> import pickle
>>> f = open('sample.bin', 'wb')
>>> pickle.dump([1,2,3,4], f)
>>> pickle.dump('hello', f)
>>> pickle.dump(100, f)
>>> f.close()
>>>
>>> f = open('sample.bin', 'rb')
>>> pickle.load(f)
[1, 2, 3, 4]
>>> pickle.load(f)
'hello'
>>> pickle.load(f)
100
>>> pickle.load(f)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
EOFError: Ran out of input
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
import threading


class Countdown:
def __init__(self, n):
self.n = n
self.thr = threading.Thread(target=self.run)
self.thr.daemon = True
self.thr.start()

def run(self):
while self.n > 0:
print('T-minus', self.n)
self.n -= 1
time.sleep(5)

def __getstate__(self):
return self.n

def __setstate__(self, n):
self.__init__(n)
1
2
3
4
5
6
7
8
9
10
11
12
>>> import countdown
>>> c = countdown.Countdown(30)
T-minus 30
T-minus 29
T-minus 28
T-minus 27
T-minus 26

>>> import pickle
>>> f = open('cstate.p', 'wb')
>>> pickle.dump(c, f)
>>> f.close()
1
2
3
4
5
6
>>> import pickle
>>> pickle.load(f)
T-minus 25
<countdown.Countdown object at 0x7f5e887803d0>
>>> T-minus 24
>>> f.close()