Python

数据类型

数字型

数字型包括:

  • int:20
  • float:3.3
  • bool:True(True = 1, False = 0)
  • complex:4+2j, complex(4, 2)

运算方式有:加(+),减(-),乘(*),除-浮点(/),除-整数(//),取余(%),乘方(**)

常用方法:

  • range(n):产生一个从0到n的序列。
  • range(m, n):产生一个从m到n的序列。
  • sum():对列表的所有元素求和
  • all():判断列表中所有元素是否都是True
  • any():判断列表中是否包含True

运算方法:

  • abs()
  • max():参数可以为序列。
  • min():参数可以为序列。
  • pow():乘幂运算。
  • round():四舍五入,可以指定小数点后的位数。
  • math.ceil():向上取整。
  • math.floor():向下取整。
  • math.exp()
  • math.fabs()
  • math.log()
  • math.log10()
  • math.modf():返回小数部分和整数部分(元组,先小数,后整数)。
  • math.sqrt()
  • math.pi:内置常量,PI
  • math.e:内置常量,E

随机函数:

  • random.choice():从给定的序列中随机选择一个元素。
  • random.randrange([start,] stop [, step]):在指定范围内按照基数底层的集合中获取一个随机数。基数默认为1。
  • random.random():生成一个0到1之间的实数。
  • random.seed():改变种子生成器。
  • random.shuffle():将给定的序列随机排列。
  • random.uniform(x, y):生成一个x到y之间的实数。

字符串

可以使用'inline'"inline"'''multi lines'''方式。 可以使用转义字符。 索引从前面开始为0;从后面开始为-1。 字符串只读,不能修改某一个字符。

运算方式有(设str_a="123456789"):

  • 加(str_a + str_b):字符串拼接。
  • 乘(str_a * 2):字符串重复。
  • 切片(str_a[0:-1], str_a[3:]):返回字串”12345678”,”456789”,遵循左闭右开。
  • 禁用转义字符(r'\n'):返回”\n”,R作用一以。
  • in:查看是否在其中。
  • not in
  • %:格式字符串
字符 描述 字符 描述 字符 描述
\ 续行 \\ 反斜线 \' 单引号
\" 双引号 \a 响铃 \b 退格
\0 \n 换行 \v 纵向制表符
\r 回车 \f 换页 \t 横向制表符
\oYY 八进制 \xYY 十六进制

常用方法

  • join():连接字符串
  • split():分隔字符串,返回列表
  • count(s):返回子串s出现次数
  • len():返回字符串长度
  • strip():去掉两边空格
  • find():查找子串位置,未找到返回 -1
  • replace(old, new):替换子串
  • bytes.decode(encoding=’UTF-8’,errors=’strict’):解码为字符串
  • encode(encoding=’UTF-8’,errors=’strict’):编码为二进制数据
  • ljust(width, fillchar):左对齐,返回填充后的字符串
  • rstrip(width, fillchar):右对齐,返回填充后的字符串
  • center(width, fillchar):居中对齐,返回填充后的字符串

原始格式化字符串

原始格式化字符串:

  • %c:字符
  • %s:字符串
  • %d:整数
  • %u:无符号整数
  • %o:无符号八进制
  • %x:无符号十六进制
  • %X:无符号十六进制(大写)
  • %f:浮点,可以指定小数点后位数
  • %e:科学计数法
  • %E:科学计数法
  • %g:作用同%f%e
  • %G:作用同%f%E
  • %p:变量地址

例如:

1
"%s use python %f" % ('User', 3.7)

辅助指令:

  • *:定义宽度或小数点精度
  • -:左对齐
  • +:在正数前面显示加号
  • <sp>:在正数前面显示空格
  • #:在八进制数前面显示零(‘0’),在十六进制前面显示’0x’或者’0X’(取决于用的是’x’还是’X’)
  • 0:显示的数字前面填充’0’而不是默认的空格
  • %:’%%’输出一个单一的’%’
  • var:映射变量(字典参数)
  • m.n.:m 是显示的最小总宽度,n 是小数点后的位数

参考

增强型格式化字符串

增强型写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 指定输出位置
"{1} {0} {1}".format("hello", "world")
# 输出
'world hello world'

# 指定输出
"姓名:{name}, 年龄 {age}".format(name="John", age=3)
# 输出列表
my_list = ['First', 'Second']
print("姓:{0[0]}, 名 {0[1]}".format(my_list))
# 格式化输出
"{:.2f}".format(3.1415926) # 输出 3.14
# 转义输出
"{{}}".format() # 输出 {}
格式 描述 格式 描述 格式 描述
{:.2f} 保留小数点后两位 {:+.2f} 带符号保留 {:.0f} 不带小数
{:0>2d} 数字补零 (填充左边, 宽度为2) {:x<4d} 数字补x (填充右边, 宽度为4) {:x<4d} 数字补x (填充右边, 宽度为4)
{:,} 以逗号分隔 {:.2%} 百分比格式 {:.2e} 指数记法
{:>10d} 右对齐 {:<10d} 左对齐 {:^10d} 中间对齐

参考

f-string

Python 3.6 新增写法:字符串以f开头,字符串中的变量将会自动运算解析为结果。

1
2
3
f'Hello {name}'
f'{1+2}'
f'{w["name"]}: {w["age"]}'

f-string 使用{content:format}设置字符串格式。其中 content 是替换并填入字符串的内容,可以是变量、表达式或函数等,format 是格式描述符。

格式 描述 格式 描述 格式 描述
< 左对齐 > 右对齐 ^ 居中
+ 加正负号 - 负数加负号 空格 正数加空格,负数加负号
# 进制数切换数字显示方式 width 指定数字宽度 0width 指定宽度并高位补0
width.precision 宽度.显示精度 , 千位分隔符 _ 千位分隔符
b/c/d/o/x 进制显示方式 s 字符串 e/E/f/F/g/G/% 浮点数显示方式
%a/%A/%w/%u 星期 %d %b/%B/%m
%y/%Y %H/%I 小时 %p 上下午
%M/%S/%f 分钟,秒,微秒 %j 一年的第几天 %z UTC偏移量

例如

1
2
3
4
5
c = 12345678
f'c is {c:015,d}' # 'c is 000,012,345,678'

e = datetime.datetime.today()
f'the time is {e:%Y-%m-%d (%a) %H:%M:%S}'

Unicode 字符串

在Python2中,普通字符串是以8位ASCII码进行存储的,而Unicode字符串则存储为16位unicode字符串,这样能够表示更多的字符集。使用的语法是在字符串前面加上前缀 u。

在Python3中,所有的字符串都是Unicode字符串。

列表

列表中的元素的类型可以不同。

运算方式有(设list_a=[1, 2, "a", True]):

  • 加(list_a + list_b):列表拼接。
  • 乘(list_a * 2):列表重复。
  • 切片(list_a[0:-1], list_a[3:]):返回列表[1, 2, "a"][True]

常用方法:

  • count():统计某元素出现次数
  • len():返回元素个数
  • append():在末尾添加元素
  • pop(n):移除第n个元素,默认最后一个
  • index():返回该元素的索引
  • insert():插入一个元素
  • extend():在末尾追加另一个列表的所有元素
  • remove():移除第一个此元素
  • reverse():翻转列表
  • sort():元素排序
  • clear():清空
  • copy():复制
  • del list[x]:删除第x个元素

内置函数:

  • filter:过滤函数
1
2
3
4
5
def is_odd(n):
return n % 2 == 1

newlist = filter(is_odd, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(newlist) # [1, 3, 5, 7, 9]
  • map:映射函数
1
2
3
4
5
def square(x):
return x ** 2

map(square, [1,2,3,4,5]) # [1, 4, 9, 16, 25]
map(lambda x: x ** 2, [1, 2, 3, 4, 5]) # 使用 lambda 匿名函数
  • reduce:累积函数
1
2
3
4
5
def add(x, y) :            # 两数相加
return x + y

reduce(add, [1,2,3,4,5]) # 计算列表和:1+2+3+4+5=15
reduce(lambda x, y: x+y, [1,2,3,4,5]) # 使用 lambda 匿名函数

元组

元组的元素不可修改。 元组中的元素的类型可以不同。 如果元组包含了对象,那么对象是可以修改的。 构造空元组写作tup1 = (),构造单元素元组tup2 = (20,),注意有逗号。

运算方式有(设tuple_a=(1, 2, "a", True)):

  • 加(tuple_a + tuple_b):元组拼接。
  • 乘(tuple_a * 2):元组重复。
  • 切片(tuple_a[0:-1], tuple_a[3:]):返回元组(1, 2, "a")(True)

集合

集合中的元素类型必须一致。 集合中的元素不能重复。 创建一个空集合必须使用set()

运算方式有:

  • 求差集(a - b)
  • 求并集(a | b)
  • 求交集(a & b)
  • 求二者中不同时拥有的元素(a ^ b)

常用操作:

  • add():添加元素
  • update():批量添加元素
  • remove():移除元素,如果不存在,会报错
  • discard():移除元素,如果不存在,不会报错
  • pop():随机删除一个
  • len():求元素个数
  • clear():情况集合
  • x in s:判断使用包含元素

字典

字典中的元素按照键值对存取。 字典中Key必须是不可变的数据类型(字符串,元组,常量)。 字典中Key值必须是唯一的。

常用操作:

  • clear():清空字典
  • copy():浅拷贝
  • get():返回某键的值,否则返回default值
  • key in dict:判断是否有该key
  • pop():删除某个key

类型转换

int():转换为整型,可以接受:字符串,Bytes对象,数字。 float():转换为浮点,可以接受:字符串,数字。 str():转换为字符串,可接受几乎所有对象,转换结果适用于人类阅读。 repr():转换为字符串,可接受几乎所有对象,转换结果适用于机器使用。 eval():将字符串作为Python语句执行,返回执行结果。 tuple():转化为元组,可以接受:列表,字符串等。 list():转化为列表,可以接受:元组,字符串等。 set():转化为集合。 dict():转化为字典,可以接受:(key, value)的序列。 chr():整数转化为字符。 ord():字符转整数。 unichr():整数转换为Unicode字符。 hex():整数转十六进制字符串。 oct():整数转八进制字符串。

chr():将数字按照ASCII转化为字符 ord():转换ASCII字符为整数 bytearray():返回一个Byte数组,参数是整数n,则初始化数组长度为n;如果是字符串,则将字符串转换为Bytes compile(source, filename, mode):将字符串编译为字节码,mode可以为exec、eval、single

类型注解

在Python中,我们创建变量,传递变量是不需要注明类型的。但是这也造成了不便。因此 Python 3 提供了类型注解的功能,来表明变量类型。

1
2
3
4
5
# x:int 注明x是一个int型变量,-> 指明了返回值类型为int
def add(x:int, y:int) -> int:
# 声明一个int行变量,并赋值
z:int = 10
return x + y

内置函数

input():标准输入 print():标准输出

1
2
3
4
5
6
7
8
# 输出的结尾:以逗号结尾,默认是以换行结尾
print(end=',')
# 输出对象间隔号:以逗号间隔,默认是空格
print(sep=',')
# 输出到文件
print(file='')
# 是否强制刷新流
print(flush=',')

exec():执行Python语句,无返回值 eval():将给定表达式用Python执行,并返回执行结果 execfile(filename):执行一个文件,返回执行结果 file():创建一个FILE对象,同open() memoryview():查看对象的在内存的存储形式,对使用缓冲区的地方非常友好,尤其是对str与bytearray

1
2
3
4
5
6
7
8
9
10
11
12
a = 'aaaaaa'
ma = memoryview(a)
ma.readonly # True,只读的memoryview
mb = ma[:2] # 不会产生新的字符串

a = bytearray('aaaaaa')
ma = memoryview(a)
ma.readonly # False,可写的memoryview
mb = ma[:2] # 不会会产生新的bytearray
mb[:2] = 'bb' # 对mb的改动就是对ma的改动
mb.tobytes() # 'bb'
ma.tobytes() # 'bbaaaa'

globals():返回当前位置的全局变量,字典形式 locals():返回当前位置的局部变量,字典形式

id():获取对象的内存地址 hash():返回对象的哈希值 super():调用父类 vars():将对象转化为字典

reload():重新加载模块 __import__():动态加载类或模块 help():查看模块或函数的帮助信息

查看类型

1
2
3
type(x)  # 查看变量的类型,子类与父类不一致。
isinstance(x, int) # 查看变量是否是某种类型,子类和父类被认为一直。
issubclass(father, son) # 判断是否为子类

基本操作

for 循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 遍历列表
for x in x_list:
pass
# 遍历字符串
for c in "abcdefg":
pass
# 带索引遍历
for i, name in enumerate(name_list, start_index):
print(f'index is {i},name is {name}')
# 打包成元组遍历 a = [1,2,3], c = [4,5,6,7,8], zip(a,c) --> [(1, 4), (2, 5), (3, 6)]
for i in zip(albums, years):
print(i)
# 单行 for 循环
s.split() for s in sentence
# 相当于
for s in sentence:
s.split()
# 遍历字典
for k, v in knights.items():
print(k, v)
# 遍历排序后的集合
for f in sorted(set(basket)):
print(f)

迭代器

1
2
3
4
5
6
7
list=[1,2,3,4]
it = iter(list) # 创建迭代器对象
for x in it:
print (x, end=" ")
it = iter(list) # 创建迭代器对象
print (next(it)) # 输出迭代器的下一个元素
print (next(it)) # 输出迭代器的下一个元素

StopIteration 异常用于标识迭代的完成,防止出现无限循环的情况,在 __next__() 方法中我们可以设置在完成指定循环次数后触发 StopIteration 异常来结束迭代。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyNumbers:
def __iter__(self):
self.a = 1
return self

def __next__(self):
if self.a <= 20:
x = self.a
self.a += 1
return x
else:
raise StopIteration

myclass = MyNumbers()
myiter = iter(myclass)

for x in myiter:
print(x)

在 Python 中,使用了 yield 的函数被称为生成器。

跟普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个迭代器。

在调用生成器运行的过程中,每次遇到 yield 时函数会暂停并保存当前所有的运行信息,返回 yield 的值, 并在下一次执行 next() 方法时从当前位置继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import sys

def fibonacci(n): # 生成器函数 - 斐波那契
a, b, counter = 0, 1, 0
while True:
if (counter > n):
return
yield a
a, b = b, a + b
counter += 1
f = fibonacci(10) # f 是一个迭代器,由生成器返回生成

while True:
try:
print (next(f), end=" ")
except StopIteration:
sys.exit()

不定长传参

转为元组传入 *

1
def printinfo( arg1, *vartuple ):

转为字典传入 **

1
def printinfo( arg1, **var_args_dict ):

匿名函数

1
sum = lambda arg1, arg2: arg1 + arg2

强制位置参数

/前面的参数不能使用关键字参数。 *后面的参数必须使用关键字参数。

1
2
def f(a, b, /, c, d, *, e, f):
print(a, b, c, d, e, f)

数据结构

堆栈

可以使用列表作为堆栈。

1
2
3
stack = [1, 2, 3]
stack.append(4)
x = stack.pop()

可以使用模块。

1
2
3
4
5
from queue import LifoQueue
stack = LifoQueue()
stack.put(1)
while not stack.empty():
x = stack.get()

队列

使用列表作为队列,但是效率不高。

1
2
3
que = [1, 2, 3]
que.append(4)
x = stack.popleft()

可以使用模块。

1
2
3
4
5
from queue import Queue
que = Queue()
q.put(1)
while not q.empty():
x = q.get()

优先队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from queue import PriorityQueue
q = PriorityQueue()
q.put(Task(5, 'Mid-level job'))
while not q.empty():
next_job = q.get()

class Task(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
return

# 运算符重载
def __lt__(self, other):
return self.priority < other.priority

列表推导式

可以方便用于创建列表

1
2
3
4
vec1 = [2, 4, 6]
vec2 = [4, 3, -9]
[3*x for x in vec1] # [6, 12, 18]
[x*y for x in vec1 for y in vec2] # [8, 6, -18, 16, 12, -36, 24, 18, -54]

模块与包

__name__属性来使该程序块仅在该模块自身运行时执行。

dir() 函数可以找到模块内定义的所有名称。以一个字符串列表的形式返回:

包:管理 Python 模块命名空间的形式,如sound包下的effects包下的echo模块,使用方法:

1
import sound.effects.echo

对应的目录结构为:

  • sound/
    • __init__.py
    • effects/
      • __init__.py
      • echo.py

包的下面必须有__init__.py文件(可以是空文件),否则将不会识别为包。

如果包定义文件 __init__.py 存在一个叫做 __all__ 的列表变量,那么在使用 from package import * 的时候就把这个列表中的所有名字作为包内容导入。

读写文件

使用open()可以打开文件,其完整的参数表为:

  • file: 必需,文件路径(相对或者绝对路径)。
  • mode: 可选,文件打开模式
  • buffering: 设置缓冲
  • encoding: 一般使用utf8
  • errors: 报错级别
  • newline: 区分换行符
  • closefd: 传入的file参数类型
  • opener:

mode指定了文件打开模式,默认为只读,常见方式有:

模式 描述 模式 描述 模式 描述
x 写模式,文件已存在则报错 b 二进制 + 读写
r 只读 rb 只读,二进制 r+ 读写
rb+ 读写,二进制 w 只写 wb 只写,二进制
w+ 读写 wb+ 读写,二进制 a 追加写
ab 追加写,二进制 a+ 追加读写 ab+ 追加读写,二进制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
f = open(filename)
# 读取n个字符或字节,默认是全部内容
f.read(n)
# 读取一行,如果为空,说明已经最后一行了
f.readline()
# 读取所有行
f.readlines()
# 写入
f.write(data)
# 返回当前指针位置
f.tell()
# 移动指针位置:0 从开头向后移动m个字节,1 从当前位置向后移动m个字节,2 从结尾向后移动m个字节
f.seek(m, 0)
# 获取文件描述符
f.fileno()
# 判断是否为终端设备
f.isatty()
# 刷新缓冲区到文件
f.flush()
# 关闭文件,释放资源
f.close()

如果觉得打开文件再关闭文件操作繁琐,Python还提供了with as功能,可以打开后不管释放:

1
2
3
# 执行完毕自动释放
with open("/tmp/file.txt") as file:
data = file.read()

with语句不仅可以用来操作文件,线程等资源也可以使用。

pickle模块,可以用来序列化和反序列化对象。利用这个模块,我们可以用来保存数据结构到文件中。

1
2
3
4
import pickle

pickle.dump(obj, file)
x = pickle.load(file)

OS 模块

目录与权限:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 检验权限模式,尝试使用UserID,GroupID访问目录,检验是否有权限访问。mode:
# os.F_OK 测试path是否存在。
# os.R_OK 测试path是否可读。
# os.W_OK 测试path是否可写。
# os.X_OK 测试path是否可执行。
os.access(path, mode)

# 更改当前进程的看到的根目录
# 例如 os.chroot('/tmp')
# 则 对于进程'/'目录就是系统的'/tmp'目录
os.chroot(path)

# 切换工作目录
os.chdir(path)
# 获取工作目录
os.getcwd()

# 更改权限
os.chmod(path, mode)
# 更改文件所有者
os.chown(path, uid, gid)

# 获取路劲下的文件和文件夹
os.listdir(path)
# 创建路径,mode为权限 0o755
os.makedirs(path[, mode])
# 删除路径为path的文件
os.remove(path)
# 删除空目录,如果非空则异常
os.rmdir(path)
# 重命名
os.rename(src, dst)
# 获取path信息
os.stat(path)
# 获取文件系统信息
os.statvfs(path)

文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 获取文件描述符
fx = f.fileno()
fx = os.open(filepath, os.O_RDONLY)
# 关闭文件
os.close(fx)
# 关闭所有文件,左闭右开
os.closerange(fx, fy)
# 复制文件描述符
os.dup(fx)
# 通过描述符改变工作目录,fx指向目录
os.fchdir(fx)
# 修改文件所有权
os.fchown(fx, uid, gid)
# 强制写入磁盘
os.fdatasync(fx)
# 打开的文件的系统配置信息,name,'PC_LINK_MAX' 文件最大连接数,'PC_NAME_MAX' 文件名最长长度
os.fpathconf(fx, name)
# 获取描述符状态,包括设备信息,文件修改时间,用户ID等
os.fstat(fx)
# 获取描述符状态,包括文件系统块大小,可用块数,文件结点总数
os.fstatvfs(fx)

# 创建命名管道,mode为权限 默认0o666
os.mkfifo(path[, mode])
# 打开一个终端
os.openpty()
# 创建一个管道
os.pipe()
# 从command打开一个管道,command 使用的命令,mode r默认 w,bufsize 0无缓冲 1有缓冲
os.popen(command[, mode[, bufsize]])

os.path 模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 绝对路径
os.path.abspath(path)
# 文件名
os.path.basename(path)
# 文件路径
os.path.dirname(path)
# 路径是否存在
os.path.exists(path)
# 文件访问时间
os.path.getatime(path)
# 文件修改时间
os.path.getmtime(path)
# 路径创建时间
os.path.getctime(path)
# 文件大小
os.path.getsize(path)
# 是否为绝对路径
os.path.isabs(path)
# 是否为文件
os.path.isfile(path)
# 是否为目录
os.path.isdir(path)
# 是否为链接
os.path.islink(path)
# 是否为挂载点
os.path.ismount(path)
# 合并目录与文件名
os.path.join(path1[, path2[, ...]])
# 转换path大小写与斜杠
os.path.normcase(path)
# 规范path形式
os.path.normpath(path)
# 返回path真实路径
os.path.realpath(path)
# 判断目录,文件是否相同
os.path.samefile(path1, path2)
# 判断是否指向同一文件
os.path.sameopenfile(fp1, fp2)
# 分割路径与文件名 元组
os.path.split(path)
# 返回驱动器名和路径 windows下 元组
os.path.splitdrive(path)
# 分割路径,返回路径名 扩展名 元组
os.path.splitext(path)
# 分割为加载点与文件
os.path.splitunc(path)
# 遍历path,每个目录都调用visit函数 visit(arg, dirname 目录, names 目录下所有文件名)
os.path.walk(path, visit, arg)

异常与断言

1
2
3
4
5
6
7
8
9
10
11
12
try:
pass
# except 后可加元组,可以包含多个Exception
except (ZeroDivisionError, KeyboardInterrupt):
pass
else:
pass
except Exception:
pass
# finally不论发生异常与否都会执行,如果异常未被接住,则会在finally执行完毕后抛出
finally:
pass

Python assert(断言)用于判断一个表达式,在表达式条件为 False 的时候触发异常。

1
2
3
4
# 语法 assert expression[, arguments] 
# 例如
assert 3 + 2 == 5, '结果不为 5'
# 输出 AssertionError: 结果不为 5

对象

类的属性与方法的访问权限:

1
2
3
4
5
6
# 默认为公有
def fun():
# 保护 一个下划线
def _fun():
# 私有 两个下划线
def __fun():

类的专用方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 构造函数
def __init__():
# 析构函数
def __del__():
# 打印
def __repr__():
# 按索引赋值
def __setitem__():
# 按索引取值
def __getitem__():
# 获取长度
def __len__():
# 比较
def __cmp__():
# 调用
def __call__():
# 运算符重载
# 加
def __add__():
# 减
def __sub__():
# 乘
def __mul__():
# 除
def __truediv__():
# 取余
def __mod__():
# 乘方
def __pow__():
# 小于
def __lt__():
# 等于
def __eq__():
# 大于
def __gt__():
# 小于等于
def __le__():
# 不等于
def __ne__():
# 大于等于
def __ge__():

在类的继承中,子类不重写 __init__,实例化子类时,会自动调用父类定义的 __init__。子类重写 __init__,就不会调用父类的初始化函数。如果都想执行,可以使用super()调用。

标准库

shutil

shutil模块提供了针对日常的文件和目录管理任务:

1
2
3
import shutil
shutil.copyfile('a.txt','b.txt')
shutil.move('/dir_a/a.txt','/dir_b')

blog

glob模块提供了一个函数用于从目录通配符搜索中生成文件列表

1
2
import glob
glob.glob('*.py') # ['primes.py', 'random.py', 'quote.py']

sys

sys可以读取命令行参数

1
2
import sys
print(sys.argv) # ['demo.py', 'arg1', 'arg2', 'arg3']

也可以重定向输出,如stdin,stdout,stderr

1
sys.stderr.write('Warning, log file not found starting a new one\n')

re

re模块为高级字符串处理提供了正则表达式工具。

1
2
3
4
5
import re
re.findall(r'\bf[a-z]*', 'which foot or hand fell fastest')
# ['foot', 'fell', 'fastest']
re.sub(r'(\b[a-z]+) \1', r'\1', 'cat in the the hat')
# 'cat in the hat'

datetime

datetime模块为日期和时间处理同时提供了简单和复杂的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from datetime import date, time, datetime
# 格式化输出
now = date.today()
now.strftime("%m-%d-%y. %d %b %Y is a %A on the %d day of %B.")
# '12-02-03. 02 Dec 2003 is a Tuesday on the 02 day of December.'

# 日期天数差
birthday = date(1964, 7, 31)
age = now - birthday
age.days # 14368

# 当前时间戳
time_stamp = time.time()
# 转为日期时间
datetime.fromtimestamp(time_stamp)
# 转为时间戳
int(time.mktime(today.timetuple()))
# 补时差
today + datetime.timedelta(hours=8)

数据压缩

以下模块直接支持通用的数据打包和压缩格式:zlib,gzip,bz2,zipfile,以及 tarfile。

1
2
3
4
5
6
7
import zlib
s = b'witch which has which witches wrist watch'
len(s) # 41
t = zlib.compress(s)
len(t) # 37
zlib.decompress(t) # b'witch which has which witches wrist watch'
zlib.crc32(s) # 226805979

计时器

1
2
3
4
5
6
7
8
9
10
11
12
from timeit import Timer
Timer('t=a; a=b; b=t', 'a=1; b=2').timeit()

# 测试函数调用时间
def test():
L = []
for i in range(100):
L.append(i)

if __name__ == '__main__':
import timeit
print(timeit.timeit("test()", setup="from __main__ import test"))

测试

doctest模块提供了一个工具,扫描模块并根据程序中内嵌的文档字符串执行测试。

1
2
3
4
5
6
7
8
9
10
def average(values):
"""Computes the arithmetic mean of a list of numbers.

>>> print(average([20, 30, 70]))
40.0
"""
return sum(values) / len(values)

import doctest
doctest.testmod() # 根据所给注释,自动验证本文档所有函数

unittest模块可以在一个独立的文件里提供一个更全面的测试集。

1
2
3
4
5
6
7
8
9
10
11
import unittest

class TestStatisticalFunctions(unittest.TestCase):

def test_average(self):
self.assertEqual(average([20, 30, 70]), 40.0)
self.assertEqual(round(average([1, 5, 7]), 1), 4.3)
self.assertRaises(ZeroDivisionError, average, [])
self.assertRaises(TypeError, average, 20, 30, 70)

unittest.main() # 从命令行调用,执行所有测试

正则表达式

模式 描述 模式 描述 模式 描述
^ 开头 $ 末尾 . 任意字符,除了换行符
[...] 一组字符 [^...] 不在[]中的字符 re* 匹配0个或多个的表达式
re+ 匹配1个或多个的表达式 re? 匹配0个或1个表达式片段 re{n} 匹配n个前面表达式片段
re{n,} 精确匹配n个前面表达式片段 re{n,m} 匹配 n 到 m 次由前面的正则表达式片段 `a b`
(re) 匹配括号内的表达式 (?#...) 注释
\w 数字字母下划线 \W 非数字字母下划线
\s 任意空白字符 \S 任意非空字符 \d 任意数字
\D 任意非数字 \A 字符串开始 \Z 字符串结束或换行前
\z 字符串结束 \G 最后匹配完成的位置 \b 单词边界
\B 非单词边界 \n,\t 换行符,制表符 \1,…,\9 匹配第n个分组的内容

re.match 尝试从字符串的起始位置匹配一个模式,如果不是起始位置匹配成功的话,match()就返回none。

1
2
3
4
5
6
7
8
9
10
11
# pattern 正则表达式
# string 要匹配的字符串
# flag 标志位
# 未匹配返回None
print(re.match('a', 'a.b.c').span()) # span 返回匹配开始与结束的位置 返回(0, 1)
print(re.match('c', 'a.b.c')) # 返回 None

obj = re.match(pattern, string, flags=0)
obj.group() # 原始对象
obj.group(1) # 获取匹配的值
obj.group(2) # 获取匹配的值

re.search 扫描整个字符串并返回第一个成功的匹配。

1
re.search(pattern, string, flags=0).span() # 返回匹配的位置

re.sub用于替换字符串中的匹配项

1
2
3
4
# repl 替换的字符串,也可以是函数
# count 最大替换次数
# 返回 替换次数
re.sub(pattern, repl, string, count=0, flags=0)

compile 函数用于编译正则表达式,生成一个正则表达式( Pattern )对象,供 match() 和 search() 这两个函数使用。

1
2
3
4
pt = re.compile(pattern[, flags])
# 例如
pt = re.compile(r'\d+')
m = pt.match("abcd")

findall()在字符串中找到正则表达式所匹配的所有子串,并返回一个列表。

1
2
3
4
pt.findall(string[, pos[, endpos]])
# 例如
pt = re.compile(pattern[, flags])
pt.findall("abcd")

split 方法按照能够匹配的子串将字符串分割后返回列表

1
re.split(pattern, string[, maxsplit=0, flags=0])

正则表达式练习

网络

HTTP

HTTP请求头部格式为:HTTP 字段名: 字段内容,主要有以下几种:

头部 描述 头部 描述
Content-type:text/html 请求的MIME信息 Expires: Date 响应过期的日期和时间
Location: URL 重定向接收方到非请求URL的位置 Last-modified: Date 请求资源的最后修改时间
Content-length: N 请求的内容长度 Set-Cookie: String 设置Http Cookie

HTTP响应头部还包括了:

  • Allow:服务器支持的协议
  • Content-Encoding:编码
  • Location:如果是重定向301,则跳转到该页面
  • Date:服务器时间
  • Last-Modified:文档最后修改时间
  • Server:服务器名字
  • Set-Cookie:设置cookie

比较复杂的是Content-type,它包含:

  • text/html : HTML格式
  • text/plain :纯文本格式
  • text/xml : XML格式
  • image/gif :gif图片格式
  • image/jpeg :jpg图片格式
  • image/png:png图片格式
  • application/xhtml+xml:XHTML格式
  • application/xml:XML数据格式
  • application/atom+xml:Atom XML聚合格式
  • application/json:JSON数据格式
  • application/pdf:pdf格式
  • application/msword:Word文档格式
  • application/octet-stream:二进制流数据(如常见的文件下载)
  • multipart/form-data:需要在表单中进行文件上传时,就需要使用该格式

Socket

Socket API 中定义的协议族(family)参数是指调用者期待返回的套接字地址结构的类型,主要包含(AF有时也写作PF):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
AF_UNSPEC 0  /* 未指定 */ 
AF_UNIX 1 /* Unix domain sockets */
AF_LOCAL 1 /* POSIX name for AF_UNIX */
AF_INET 2 /* IPv4 */
AF_AX25 3 /* 业余无线电 AX.25 */
AF_IPX 4 /* Novell IPX */
AF_APPLETALK 5/* AppleTalk 地址 */
AF_NETROM 6 /* 业余无线电 NET/ROM */
AF_BRIDGE 7 /* 多协议网桥 */
AF_ATMPVC 8 /* ATM PVCs */
AF_X25 9 /* 保留 for X.25 project */
AF_INET6 10 /* IPv6 */
AF_ROSE 11 /* 业余无线电 X.25 PLP */
AF_DECnet 12 /* 保留 for DECnet project */
AF_NETBEUI 13/* 保留 for 802.2LLC project*/
AF_SECURITY 14/* Security callback pseudo AF */
AF_KEY 15 /* PF_KEY key management API */
AF_NETLINK 16 /* Only for Linux */
AF_ROUTE AF_NETLINK /* Alias to emulate 4.4BSD */
AF_PACKET 17 /* Packet family */
AF_ASH 18 /* Ash */
AF_ECONET 19 /* Acorn Econet */
AF_ATMSVC 20 /* ATM SVCs */
AF_RDS 21 /* RDS sockets */
AF_SNA 22 /* Linux SNA Project (nutters!) */
AF_IRDA 23 /* IRDA sockets */
AF_PPPOX 24 /* PPPoX sockets */
AF_WANPIPE 25 /* Wanpipe API Sockets */
AF_LLC 26 /* Linux LLC */
AF_IB 27 /* Native InfiniBand address */
AF_CAN 29 /* Controller Area Network */
AF_TIPC 30 /* TIPC sockets */
AF_BLUETOOTH 31/* Bluetooth sockets */
AF_IUCV 32 /* IUCV sockets */
AF_RXRPC 33 /* RxRPC sockets */
AF_ISDN 34 /* mISDN sockets */
AF_PHONET 35 /* Phonet sockets */
AF_IEEE802154 36/* IEEE802154 sockets */
AF_CAIF 37 /* CAIF sockets */
AF_ALG 38 /* Algorithm sockets */
AF_NFC 39 /* NFC sockets */
AF_VSOCK 40 /* vSockets */
AF_MAX 41 /* 保留 */

参考

定义的类型(type)包含:

1
2
3
4
5
6
7
SOCK_STREAM = 1,   // TCP
SOCK_DGRAM = 2, // UDP
SOCK_RAW = 3, // 原始类型,可以自定义
SOCK_RDM = 4, // 提供可靠的数据包连接
SOCK_SEQPACKET= 5, // 提供连续可靠的数据包连接
SOCK_DCCP = 6, // 数据报拥塞控制协议,具有内置拥塞控制的不可靠数据报的传输
SOCK_PACKET = 10, // 与网络驱动程序直接通信

定义的协议(protocol)包含:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
enum
{
IPPROTO_IP = 0, /* Dummy protocol for TCP */
IPPROTO_ICMP = 1, /* Internet Control Message Protocol */
IPPROTO_IGMP = 2, /* Internet Group Management Protocol */
IPPROTO_IPIP = 4, /* IPIP tunnels (older KA9Q tunnels use 94) */
IPPROTO_TCP = 6, /* Transmission Control Protocol */
IPPROTO_EGP = 8, /* Exterior Gateway Protocol */
IPPROTO_PUP = 12, /* PUP protocol */
IPPROTO_UDP = 17, /* User Datagram Protocol */
IPPROTO_IDP = 22, /* XNS IDP protocol */
IPPROTO_DCCP = 33, /* Datagram Congestion Control Protocol */
IPPROTO_RSVP = 46, /* RSVP protocol */
IPPROTO_GRE = 47, /* Cisco GRE tunnels (rfc 1701,1702) */
IPPROTO_IPV6 = 41, /* IPv6-in-IPv4 tunnelling */
IPPROTO_ESP = 50, /* Encapsulation Security Payload protocol */
IPPROTO_AH = 51, /* Authentication Header protocol */
IPPROTO_BEETPH = 94, /* IP option pseudo header for BEET */
IPPROTO_PIM = 103, /* Protocol Independent Multicast */
IPPROTO_COMP = 108, /* Compression Header protocol */
IPPROTO_SCTP = 132, /* Stream Control Transport Protocol */
IPPROTO_UDPLITE = 136, /* UDP-Lite (RFC 3828) */
IPPROTO_RAW = 255, /* Raw IP packets */
IPPROTO_MAX
};

在Python中,主要的使用方式如下:

1
socket.socket([family[, type[, proto]]])

family,套接字协议族,常见有:

  • socket.AF_UNIX:只能够用于单一的Unix系统进程间通信
  • socket.AF_INET:服务器之间网络通信,IPv4
  • socket.AF_INET6:服务器之间网络通信,IPv6

type: 套接字类型,包括:

  • socket.SOCK_STREAM:流式socket,用于TCP
  • socket.SOCK_DGRAM:数据报式socket,用于UDP
  • socket.SOCK_SEQPACKET:可靠的连续数据包服务
  • socket.SOCK_RDM:可靠的UDP数据报
  • socket.SOCK_RAW:原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。

protocol:

  • 默认:写 0 即可
  • CAN_RAW / CAN_BCM:使用 AF_CAN 协议时

连接方面:

  • s.bind():绑定地址到套接字,IPv4下,使用(host, port)绑定。
  • s.listen():开启TCP监听。
  • s.accept():等待连接(阻塞)。
  • s.connect():主动连接服务器,IPv4下,使用(host, port),如果连接失败,返回socket.error
  • s.connect_ex():主动连接服务器,出错时返回出错码。
  • s.close():关闭套接字。
  • s.getpeername():返回远程地址。
  • s.getsockname():返回自己的地址。
  • s.settimeout(timeout):设置超时时间,例如连接等待时间。
  • s.gettimeout():获取超时时间。

数据传输:

  • s.recv():接收TCP数据,可以指定最大接收量。
  • s.send():发送TCP数据,返回发送的字节数。
  • s.sendall():发送完整TCP数据,如果失败抛出异常。
  • s.recvfrom():接收UDP数据,返回(data, address)。
  • s.sendto():发送UDP数据,参数为(data, (ip, port)),返回发送的字节数。
  • s.setsockopt(level,optname,value):设置套接字。
  • s.getsockopt(level,optname[.buflen]):获取设置。
  • s.fileno():返回套接字的文件描述符。
  • s.setblocking(flag):设置为非阻塞模式。
  • s.makefile():创建套接字文件。

TCP服务器例程:

1
2
3
4
5
6
7
server = socket.socket()  # 默认 TCP
serber.bind(("127.0.0.1", 9001))
server.listen(5)
conn, address = server.accept()
conn.send("".encode("utf8"))
data = conn.recv(1024)
conn.close()

TCP客户端例程:

1
2
3
4
client = socket.socket()
client.connect(("127.0.0.1", 9001))
data = client.recv(1024)
client.close()

UDP服务器例程:

1
2
3
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
serber.bind(("127.0.0.1", 9001))
data, client = conn.recvfrom(1024)

UDP客户端例程:

1
2
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(data, ("127.0.0.1", 9001))

uWSGI

uWSGI 是Python搭建Web服务所用的中间件,是调和Web服务于Web应用直接的协议问题。

首先安装uWSGI:

1
2
3
pip install uwsgi
# uwsgitop 用于监控数据
pip install uwsgitop

假设当前Nginx配置为:

1
2
3
4
location / {
include uwsgi_params;
uwsgi_pass 127.0.0.1:3031;
}

我们启动一个uWSGI服务:

1
2
3
4
5
6
7
# --processes 添加更多的进程,用于并发
# --threads 添加更多的线程,用于并发
# --stats 使用 stats 子系统,可以执行监控任务 (uwsgitop)
# --http-socket 启动地址,结合Nginx用
# --wsgi-file 指定入口文件
# --chdir 指定项目目录,如Django项目目录
uwsgi --http-socket 127.0.0.1:3031 --chdir /home/foobar/myproject/ --wsgi-file myproject/wsgi.py --master --processes 4 --threads 2 --stats 127.0.0.1:9191

也可以写成配置文件:

1
2
3
4
5
6
7
[uwsgi]
socket = 127.0.0.1:3031
chdir = /home/foobar/myproject/
wsgi-file = myproject/wsgi.py
processes = 4
threads = 2
stats = 127.0.0.1:9191

接着执行:

1
uwsgi yourfile.ini

如果不用Django框架,而是单独文件server.py,或是Flask框架:

1
2
3
4
# uWSGI Python 加载器将会搜索的默认函数 application 
def application(env, start_response):
start_response('200 OK', [('Content-Type','text/html')])
return [b"Hello World"]

多线程

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。也就是说,尽管有了线程模块,Python几乎依然是单线程处理

尽管如此,在IO密集型的多线程应用中,Python的多线程threading库表现却依然还行。但在并行计算型应用中,如果想真正实现多线程,就得在Python中可以使用多线程threading,并自行设计锁结构,或使用多进程multiprocessing,并在主进程设置消息队列,共享内存,管道等方式传递数据。

threading 模块

创建线程,可以直接使用:

1
2
3
4
5
6
7
8
9
10
11
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)

if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('主线程')
threading.enumerate() # 所有线程列表

也可以通过子类继承后使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)


if __name__ == '__main__':
t = Sayhi('egon')
t.start()
print('主线程')

Thead 对象的常用方法有:

  • isAlive():是否运行
  • getName():获取线程名称
  • setName():设置线程名称
  • x.join():当前线程等待x线程结束再继续执行。
  • setDaemon(True):设置为守护线程

守护线性:如果设置一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。主线程只会等待所有非守护线程都结束后才退出。

threading 模块的常用方法有:

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

使用 同步锁 可以防止数据竞争问题:

1
2
3
4
5
6
R = threading.Lock()
R.acquire()
'''
临界区
'''
R.release()

但是使用锁的时候,一定要解决好死锁的问题。解决方法可以参考《操作系统》相关章节。

线程间通信,可以使用消息队列,可以使用共享内存的方式进行通信。下面使用队列方式通信(注意互斥访问队列):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# JoinableQueue
# 队列长度,多线程下不够准确
Queue.qsize()
# 队列判空
Queue.empty()
# 队列判满
Queue.full()
# 入队,是否阻塞
Queue.put(item, block=True, timeout=None)
# 入队,不阻塞
Queue.put_nowait(item)
# 出队,是否阻塞
Queue.get(block=True, timeout=None)
# 出队,不阻塞
Queue.get_nowait()
# 提示让出队列,提示join停止阻塞
Queue.task_done()
# 阻塞直到队列为空
Queue.join()

multiprocessing 模块

多进程的创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os
def work():
print('hello',os.getpid())

if __name__ == '__main__':
# 会发现每一个进程都有不同的 PID
# 且进程的数据各自保留一份,互不相关
# 之间传递数据必须使用工具
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主线程/主进程pid',os.getpid())

创建共享内存实现主进程与子进程通信:

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing

def f(a):
a[0] = 5

# 创建共享内存
arr = multiprocessing.Array('i', range(10))
# 子进程处理
p = multiprocessing.Process(target=f, args=(arr,))
p.start()
p.join()
print(arr[0])

使用Manger通信,本质也是共享内存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing

def f(ls):
ls.append('Hello')

# Manager 要在主进程创建
server = multiprocessing.Manager()
# 每调用一次list产生一个共享内存
# 除了list外,也可以是其他形式,如队列、锁、字典、数组等
ls = server.list()
# ls = server.Queue()
# 子进程处理
proc = multiprocessing.Process(target=f, args=(ls,))
proc.start()
proc.join()
print(ls)

多进程间通过队列通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing
from multiprocessing import Queue

que = Queue(3) # 队列容量 3
que.put("msg 1")
que.put("msg 2")
que.full() # False
que.put("msg 3")
que.full() # True
que.empty() # False

def fun(que):
d = que.get()

p1 = multiprocessing.Process(target=fun, args=(que,))

进程池:

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing
from multiprocessing import Pool

def fun(arg):
pass

pool = Pool(3) # 进程容量 3
for i in range(10):
pool.apply_async(fun, (arg,)) # 添加任务

pool.close() # 关闭,不再接受新请求
pool.join() # 等待退出,必须在close之后

ctypes

ctypes可以让Python直接调用任意的C动态库的导出函数,由于ctypes会在调用C函数前释放GIL,因此也可以实现多线程。

我们可以将写好的Task编译为C的动态库,例如lib_task.solib_task.dll,然后在Python中调用该库。打包动态库可以使用Visual Studio建立相关项目,Visual Studio就会自动生成一个DLL模板。或使用GCC创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
lib_task.h
#ifndef LIB_TASK_H
#define LIB_TASK_H

#ifdef __cplusplus
extern " C " {
#endif

// DLL 关键字 __declspec (dllexport)
extern __declspec (dllexport) void Task(int arg);

#ifdef __cplusplus
}
#endif

#endif
lib_task.c
#include "print.h"

// 在这里实现多线程
void Task(int arg)
{
while(arg);
return;
}

编辑DLL:

1
2
3
4
5
6
7
8
9
10
11
gcc --share lib_task.c -o lib_task.dll
from ctypes import *
from threading import Thread

# lib_task.h 与 lib_task.dll 必须在这个目录下
# 给DLL传递参数时,要将参数转化为C的类型
lib = cdll.LoadLibrary("lib_task.dll")
t = Thread(target=lib.Task, args=(1,))
t.start()

lib.Task()

线程池

线程池可以帮助我们自动调度线程,在需要多线程任务量巨大的情况下是非常好用的工具,省去我们考虑线程同步的问题,也节省了上下文切换的时间。

第三方线程池 threadpool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pip install threadpool
from threadpool import ThreadPool, makeRequests

# 创建一个容纳4个线程的线程池
pool = ThreadPool(4)
requests = makeRequests(
some_callable, # 多线程的任务
list_of_args, # 参数
callback # 回调函数,可空
)
for req in requests:
pool.putRequest(req)
# 等待线程池完成任务
pool.wait()

另外还有ThreadPoolExecutor,ProcessPoolExecutor,线程(进程)池也可以使用。

Executor提供了以下常用的方法:

submit(fn, *args,**kwargs):将fn函提交给池子;*args是传给fn函数的参数;**kwargs表示以关键字的形式为fn的参数。

map(func, *iterables, timeout=None, chunksize=1):类似于全局函数的map,只是该函数将会启动多个线程,以异步的方式立即对*iterables执行map处理,就是把for循环和submit结合在一起了。

shutdown(wait=True):关闭池子,wait=True时等待池内所有任务执行完毕回收完资源后才继续;wait=False时立即返回,并不会等待池内的任务执行完毕;但不管wait参数为何值,整个程序都会等到所有任务执行完毕才会清空池子,所以submit和map必须在shutdown之前执行。

程序将task函数submit之后,submit会返回一个Future对象,Future类主要用于获取线程或进程任务函数的返回值。Future中提供了一下方法:

cancel():取消Future代表的线程或者进程任务,如果任务正在执行,不可取消,返回False;否则任务取消,返回Ture。

cancelled():返回Future代表的任务是否被成功取消。

running():返回Future代表的任务是否增正在执行。

done():返回Future代表的任务是否已经结束。

result(timmeout=None):返回Future代表的任务的结果,如果任务没有完成,该方法将会阻塞当前线程,timeout指定阻塞多长时间。

exception():返回Future代表的任务的异常,如果没有异常,则返回None。

add_done_callback(fn):给Future代表的任务加一个’回调函数’,当该任务成功之后,执行这个fn函数。

创建线程池 ThreadPoolExecutor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time,threading
from concurrent.futures import ThreadPoolExecutor

def f(n):
time.sleep(2)
print(f"线程号 {threading.get_ident()}",n)
return n*n

if __name__ == '__main__':
# 创建线程池,线程数 5
t_pool = ThreadPoolExecutor(max_workers=5)
t_l = list()
for i in range(1,5):
t = t_pool.submit(f,i)
t_l.append(t)
t_pool.shutdown()
for i in t_l:
print('===',i.result())

创建进程池 ProcessPoolExecutor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time,threading
from concurrent.futures import ProcessPoolExecutor

def callback_fun(x):
pass

def f(n):
time.sleep(2)
print(f"进程PID {os.getpid()}",n)
return n*n

if __name__ == '__main__':
# 创建进程池,进程数 5
p_pool = ProcessPoolExecutor(max_workers=5)
p_l = list()
for i in range(5):
t = p_pool.submit(f,i)
# 也可以设置回调函数,回调的参数由任务函数提供
# t.add_done_callback(callback_fun)
p_l.append(t)
# 也可以写成
# s = p_pool.map(f,range(1,5))
p_pool.shutdown(wait = True)
for i in p_l:
print('===',i.result())

multiprocessing 模块也提供了进程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import os,time
from multiprocessing import Process,Pool

def f(n):
print(f"进程PID {os.getpid()}")
time.sleep(1)
return n*n # 返回值交给回调函数

def cb_fun(n):
pass

if __name__ == '__main__':
# 创建工作进程
p = Pool(3)
p_l = list()
for i in range(1,10):
re = p.apply(
f, # 多线程工作函数
args=(i,), # 传递的参数
callback=cb_fun # 回调函数
)
p_l.append(re)
print(p_l)
p_l.close()
p_l.join()

迭代器

迭代器:可以节省内存空间,用每步生成的方式代替查表的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from collections import Iterable

isinstance("abc", Iterable) # 可迭代类型

class Student(object):
names = []
def __iter__(self):
return StudentIterator(self)

class StudentIterator(object):
def __init__(self, obj):
self.obj = obj
self.cnt = 0
def __next__(self):
if cnt > len(self.obj.names):
raise StopIteration # 告诉 for 已经迭代完毕
self.cnt += 1
return self.obj.names[0]
def __iter__(self):
pass

# 也可以合并为一个类
class Fibonacci(object):
def __init__(self, all_num):
self.all_num = all_num
self.cnt = 0
self.a = 0
self.b = 1
def __iter__(self):
return self
def __next__(self):
if cnt >= all_num:
raise StopIteration # 告诉 for 已经迭代完毕
self.cnt += 1
self.a, self.b = self.b, self.a + self.b
return self.b

fib = Fibonacci(10)
for x in fib:
print(x)

对于如下代码,执行步骤为:

1
for item in obj: ...
  1. 判断obj是否是迭代类型(是否有__iter__方法);
  2. 调用__iter__方法得到迭代器,获取__iter__的返回值(返回了一个迭代器)。
  3. 每for一次,调用一次__next__

迭代器实例:

1
2
range(10)   # 直接生成数据
xrange(10) # 底层为迭代器,可以节省内存

生成器

生成器是一种特殊的迭代器。

1
2
3
4
5
6
7
# 普通方式创建数组
nums = [x for x in range(10)]
# 生成器方式,节省空间
nums = (x for x in range(10))
# 迭代生成器
for x in nums:
print(x)

将函数变为生成器:只要函数中包含yield即可,此时调用函数,得到一个生成器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def fib(n):
a, b = 0, 1
cnt = 0
while cnt < n:
yield a
a, b = b, a + b
cnt += 1
# return 可有可无
return "over"

obj = fib(10) # 得到生成器
for x in obj:
print(x)

obj2 = fib(4)
while True:
try:
ret = next(obj2)
except Except as ex:
print(ex.value) # 此处存储 fib 的 return
break

通过send方法启动生成器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fib(n):
a, b = 0, 1
cnt = 0
while cnt < n:
ret = yield a # 此处 ret 接收 send 方法给的值
a, b = b, a + b
cnt += 1

obj = fib(10)
ret = next(obj) # 启动生成器 得到第一个元素 给 ret
ret = obj.send("xx") # 给 生成器 中的 ret 传递值 xx
print(ret)

# 注:send 必须在 next 之后执行;要么在next之前执行,传递 None

协程

greenlet 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from greenlet import greenlet

def fun1():
...
gr2.switch()
...
pass
def fun2():
...
gr1.switch()
...
pass

g1 = greenlet(fun1)
g2 = greenlet(fun2)

g1.switch() # 切入 fun1 运行

gevent 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import gevent

def fun(n):
for i in range(n):
print(gevent.getcurrent(), i)
gevent.sleep(0.5) # gevent 内部必须使用自己的延时等待

g1 = gevent.spawn(fun, 5) # 创建协程 函数 fun 参数 5
g2 = gevent.spawn(fun, 5)
g3 = gevent.spawn(fun, 5)

g1.join()
g2.join()
g3.join()

给代码打补丁升级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent import monkey

monkey.patch_all() # 打补丁,临时改写代码

def fun(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(0.5) # 有了补丁,可以不必更改延时函数

# 写法与上面的等效
gevent.joinall([
gevent.spawn(fun, 5),
gevent.spawn(fun, 5)
])

XML 与 JSON

XML

XML 指可扩展标记语言(eXtensible Markup Language),形式同HTML,是一种用于标记电子文件使其具有结构性的标记语言。XML也可以用于数据以文本格式存储下来。格式如下(DOM):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<collection shelf="New Arrivals">
<movie title="Enemy Behind">
<type>War, Thriller</type>
<format>DVD</format>
<year>2003</year>
<rating>PG</rating>
<stars>10</stars>
<description>Talk about a US-Japan war</description>
</movie>
<movie title="Transformers">
<type>Anime, Science Fiction</type>
<format>DVD</format>
<year>1989</year>
<rating>R</rating>
<stars>8</stars>
<description>A schientific fiction</description>
</movie>
<movie title="Trigun">
<type>Anime, Action</type>
<format>DVD</format>
<episodes>4</episodes>
<rating>PG</rating>
<stars>10</stars>
<description>Vash the Stampede!</description>
</movie>
<movie title="Ishtar">
<type>Comedy</type>
<format>VHS</format>
<rating>PG</rating>
<stars>2</stars>
<description>Viewable boredom</description>
</movie>
</collection>

解析 XML 可以使用 SAX 模块,SAX 模块用事件驱动模型,通过在解析 XML 的过程中触发一个个的事件并调用用户定义的回调函数来处理 XML 文件。SAX 模块非常适用于对大型文件进行处理,且只需要文件部分信息时使用。

通过使用ContentHandler类读取数据。ContentHandler的方法有:

  • startDocument():文档启动时调用。
  • endDocument():到达结尾时调用。
  • startElement(name, attrs):遇到开始标签<..>调用。
  • endElement(name):遇到结束标签</..>调用。
  • characters(content):分情况看,有
    • 从行开始,遇到标签之前,若存在字符,则content的值为这些字符串。
    • 从一个标签,遇到下一个标签之前,若存在字符,则content的值为这些字符串。
    • 从一个标签,遇到行结束符之前,若存在字符,则content的值为这些字符串。
    • 标签可以是开始标签,也可以是结束标签。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import xml.sax

class MovieHandler( xml.sax.ContentHandler ):
def __init__(self): # 此处定义对象属性
pass
# 元素开始调用
def startElement(self, tag, attributes):
pass
# 元素结束调用
def endElement(self, tag):
pass
# 读取字符时调用
def characters(self, content):
pass

# 创建XML阅读器
parser = xml.sax.make_parser()
# 关闭命名空间
parser.setFeature(xml.sax.handler.feature_namespaces, 0)
# 创建对象
Handler = MovieHandler()
# 设置XML阅读器
parser.setContentHandler(Handler)
# 开始解析
parser.parse("movies.xml")

如果解析的文件不大,且需要文件的全部信息,可以使用DOM解析器。这个解析器可以一次性将整个文档读入内存,且可读可写到文件。

1
2
3
4
5
6
7
8
9
10
11
from xml.dom.minidom import parse
import xml.dom.minidom

# 使用minidom解析器打开 XML 文档
DOMTree = xml.dom.minidom.parse("movies.xml")
collection = DOMTree.documentElement
if collection.hasAttribute("shelf"):
print (f"Root element : {collection.getAttribute('shelf')}")

# 在集合中获取所有电影
movies = collection.getElementsByTagName("movie")

Json

JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式,适合于网络间传输数据,如前后端使用Ajax传输,则偏向于传输Json。

1
2
3
4
5
6
7
8
9
10
11
12
import json

data = {
'no' : 1,
'name' : 'Runoob',
'url' : 'http://www.runoob.com'
}

# Python 字典类型转换为 JSON 对象
json_str = json.dumps(data)
# 将 JSON 对象转换为 Python 字典
data = json.loads(json_str)

Python 技巧

数据结构

  1. 使用元组存储数据,节省空间。
1
2
3
4
5
6
7
8
9
10
# 方式 1
Name, Age, Gender, Email = 1, 2, 3, 4
student = ('mike', 18, 'male', '123@qq.com')
name = student[Name]

# 方式 2
from collections import namedtuple
Student = namedtuple('Student', ['NName', 'Age', 'Gender', 'Email'])
student = Student('jim', 16, ...)
name = student.name
  1. 统计序列中的元素出现频率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 产生数据
from random import randint
data = [randint(0, 20) for _ in range(30)]

# 方式 1: 产生结果为字典
c = dict.fromkeys(data, 0)
for x in data:
c[x] += 1

# 方式 2: 结果也是字典,但是还有其他统计信息
from collections import Counter
c2 = Counter(data)
c2.most_common(3) # 频数最大的 3 个元素

# 例如统计词频
c3 = Counter(re.split("\W+", filename))
  1. 根据字典中值的大小,排序字典序
1
2
3
4
5
6
7
8
9
# 产生数据
from random import randint
d = {x: randint(60, 100) for x in 'xyzabc'}

# 方式 1
sorted(zip(d.values(), d.keys()))

# 方式 2
sorted(d.items(), key=lambda x: x[1])
  1. 找到多个字典的公共键
1
2
3
4
5
6
7
8
9
10
11
12
# 产生数据
from random import randint, sample
s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}

# 方式 1
s1.keys() & s2.keys() & s3.keys()

# 方式 2
from functools import reduce
reduce(lambda a, b: a & b, map(dict.keys, [s1, s2, s3]))
  1. 让字典保持有序
1
2
3
4
5
6
7
# 方式 1: 使用有序字典
from collections import OrderedDict
d = OrderedDict()
d['a'] = (1, 10)
d['b'] = (2, 16)
d['c'] = (3, 20)
# 按照输入的顺序存储
  1. 实现历史记录功能
1
2
3
4
5
6
7
8
# 方式 1: 使用双端队列
from collections import deque
q = deque([], 5) # 初始值 容量
q.append(1)

import pickle # python 对象持久化
pickle.dump(q, open("filename", "w"))
q = pickle.load(open("filename"))
  1. 切片操作
1
2
3
4
5
6
d[1:3]

# 创建可切片的迭代器
from itertools import islice
for line in islice(d, 1, 3):
print(line)
  1. 同时迭代多个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 产生数据
from random import randint
a = [randint(60, 100) for _ in range(40)]
b = [randint(60, 100) for _ in range(40)]

# 方式 1: 采用索引
for i in range(len(a)):
pass

# 方式 2: 采用zip
for x, y in zip(a, b):
pass

# 方式 3: 串行连接多个迭代对象
from itertools import chain
for x in chain(a, b):
pass
  1. 修改列表的元素
1
2
3
4
5
6
7
8
9
10
11
12
# 产生数据
from random import randint
a = [x: randint(60, 100) for _ in range(40)]

# 方式 1: 有Bug
for item in d:
i = d.index(item)
d[i] += 1

# 方式 2:
for i, item in enumerate(d):
pass

字符串

  1. 拆分字符串
1
2
3
4
5
6
7
8
9
10
# 产生数据
s = 'ab;cd|efg|hi,jkl|mn\topq;;rs t,uvw\txyz'

# 方式 1:
s.split() # 默认为 \t 空格
s.split(';')

# 方式 2: 正则
import re
re.split('[,;\t|]+', s)
  1. 是否有某一前缀、后缀
1
2
3
4
5
6
# 产生数据
s = 'www.baidu.com'

# 方式 1:
s.startwith("www")
s.endwith(("com", "org")) # 只能是元组
  1. 调整字符串格式
1
2
3
4
5
6
7
# 产生数据
s = '2016-05-01' # 改为 05/01/2016

# 方式 1:
import re
re.sub(r'(\d{4})-(\d{2})-(\d{2})', r'\2/\1/\3', s)
re.sub(r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', s)
  1. 拼接多个字符串
1
2
3
4
5
6
# 产生数据
s1 = '12345'
s2 = '67890'

# 方式 1:
';'.join([s1, s2])
  1. 字符串对齐
1
2
3
4
5
6
7
8
9
10
11
12
# 产生数据
d = {'name': 'mike', 'age': 19}

# 方式 1:
d['name'].ljust(10)
d['name'].rjust(10, '.')
d['name'].center(10)

# 方式 2:
format(s, '<20')
format(s, '>20')
format(s, '^20')
  1. 去除不需要的字符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 产生数据
s1 = ' abc 123 '

# 方式 1
s.strip(' ') # 去掉两端字符,默认为 空格
s.lstrip()
s.rstrip()

# 方法 2
s = s[:3] + s[4:]

# 方法 3
s.replace('\t', '')

# 方法 4
re.sub(r'[\t\r]', '', s)

# 方法 5
import string
s.translate(string.maketrans('abcxyz', 'xyzabc'))
s.translate(None, ' ') # 删除字符

文件

  1. 处理二进制文件
1
2
3
4
5
6
7
8
9
10
# 产生数据
f = open("filename", "rb")

# 方式 1
import struct
# 模式:h-short i-int
struct.unpack('h', '\x01\x02') # 小端
struct.unpack('>h', '\x01\x02') # 大端
import array
buf = array.array('h', (0 for _ in range(length)))
  1. 文件缓冲
1
2
3
4
5
6
# 全缓冲
f = open('', 'w', buffering=2048) # 缓冲区默认 4096 大小
# 行缓冲
f = open('', 'w', buffering=1)
# 无缓冲
f = open('', 'w', buffering=0)
  1. 将文件映射到内存
1
2
3
4
5
6
f = open('', 'r+b')
# 映射文件描述符到内存,0表示映射区域为文件全文
m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE, offset=mmap.PAGESIZE * 4)
m[1:10]
m[3] = '0x30'
  1. 文件状态
1
2
3
4
5
6
7
8
9
10
11
12
13
# 系统调用
import os
os.stat(path)
os.lstat(path) # 不跟随符号链接
os.fstat(open("", ""))

# 包括: 文件类型,文件权限,时间(访问,修改,创建),文件大小

os.path.isdir()
os.path.isfile()
os.path.islink()
os.path.getatime()
os.path.getsize()
  1. 使用临时文件
1
2
3
from tempfile import TemporaryFile, NamedTemporaryFile
f = TemporaryFile()
f.write()
  1. 读写 CSV
1
2
3
4
5
6
7
8
9
import csv
rf = open('', 'rb') # 一定是二进制打开
reader = csv.reader(rf)
head = reader.next()
for row in reader:
print(row[0], row[1])
writer = csv.writer(wf)
writer.writerow()
writer.flush()
  1. 读写 JSON
1
2
json.loads()
json.dumps()
  1. 读写 XML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from xml.etree.ElementTree import parse
f = open()
et = parse(f)
root = et.getroot()
root.tag
root.attrib
root.text
for child in root:
print(child.get('name'))
# 在子元素中找
root.find()
root.findall()
# 在所有子孙中找
root.iter()
root.findall(".//node") # 这里使用 XPATH 表示
  1. 读写 Excel
1
2
3
4
5
6
7
8
import xlrd xlwt
book = xlrd.open_workbook()
sheet = book.sheets()[0]
sheet.nrows
sheet.ncols
cell = sheet.cell(3, 5)
cell.ctype
sheet.row(1)

对象

  1. 创建大量实例并节省内存
1
2
3
4
5
6
7
8
class Player(object):
# __slots__ 限制有哪些属性,关闭动态字典属性
__slots__ = ['uid', 'name', 'stat', 'level']
def __init__(self, uid, name, status=0, level=1):
self.uid = uid
self.name = name
self.stat = status
self.level = level
  1. 使用上下文管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Client(object):
def __init__(self):
pass
def start(self):
pass
def cleanup(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb): # 后面参数为异常
pass
return True # 不向上抛出异常
return False / None # 向上抛出异常

with Client() as c:
pass
  1. 创建可管理的对象属性
1
2
3
4
5
6
7
8
9
10
11
12
class Circle(object):
radius = 0
def getRadius(self):
return self.radius
def setRadius(self, r):
if not isinstance(r, (int, float)):
raise ValueError('wrong type.')
self.radius = float(r)
R = property(getRadius, setRadius)
c = Circle()
c.R = 1
print(c.R)
  1. 比较操作
1
2
3
4
5
6
7
8
9
10
11
12
class Circle(object):
radius = 0
def getRadius(self):
return self.radius
def setRadius(self, r):
if not isinstance(r, (int, float)):
raise ValueError('wrong type.')
self.radius = float(r)
def __lt__(self, obj):
pass
def __ge__(self, obj):
pass
  1. 类型检查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Attr(object):
def __init__(self, name, type_):
self.name = name
self.type_ = type_
def __get__(self, instance, clazz):
return instance.__dict__[self.name]
def __set__(self, instance, value):
# 此处插入类型检查
if not isinstance(value, self.type_):
raise TypeError(f'expected an {self.type_}')
instance.__dict__[self.name] = value
def __delete__(self, instance):
del instance.__dict__[self.name]
class Person(object):
name = Attr('name', str)
age = Attr('name', int)
  1. 循环引用垃圾回收问题
1
2
3
4
# 使用弱引用
import wearref
a = A()
a_wref = weakref.ref(a)
  1. 通过字符串调用实例方法
1
2
3
4
5
6
7
8
# 方法 1:
s = Person()
fun = getattr(s, 'getName', None) # 第三个参数为找不到时的默认值
if fun: fun()

# 方法 2:
from operator import methodcaller
methodcaller('findPerson', 'name', age)(s)

函数装饰器