…
数据类型 数字型 数字型包括:
int:20
float:3.3
bool:True(True = 1, False = 0)
complex:4+2j, complex(4, 2)
运算方式有:加(+
),减(-
),乘(*
),除-浮点(/
),除-整数(//
),取余(%
),乘方(**
)
常用方法:
range(n):产生一个从0到n的序列。
range(m, n):产生一个从m到n的序列。
sum():对列表的所有元素求和
all():判断列表中所有元素是否都是True
any():判断列表中是否包含True
运算方法:
abs()
max():参数可以为序列。
min():参数可以为序列。
pow():乘幂运算。
round():四舍五入,可以指定小数点后的位数。
math.ceil():向上取整。
math.floor():向下取整。
math.exp()
math.fabs()
math.log()
math.log10()
math.modf():返回小数部分和整数部分(元组,先小数,后整数)。
math.sqrt()
math.pi:内置常量,PI
math.e:内置常量,E
随机函数:
random.choice():从给定的序列中随机选择一个元素。
random.randrange([start,] stop [, step]):在指定范围内按照基数底层的集合中获取一个随机数。基数默认为1。
random.random():生成一个0到1之间的实数。
random.seed():改变种子生成器。
random.shuffle():将给定的序列随机排列。
random.uniform(x, y):生成一个x到y之间的实数。
字符串 可以使用'inline'
、"inline"
、'''multi lines'''
方式。 可以使用转义字符。 索引从前面开始为0
;从后面开始为-1
。 字符串只读,不能修改某一个字符。
运算方式有(设str_a="123456789"
):
加(str_a + str_b
):字符串拼接。
乘(str_a * 2
):字符串重复。
切片(str_a[0:-1]
, str_a[3:]
):返回字串”12345678”,”456789”,遵循左闭右开。
禁用转义字符(r'\n'
):返回”\n”,R作用一以。
in:查看是否在其中。
not in
%:格式字符串
字符
描述
字符
描述
字符
描述
\
续行
\\
反斜线
\'
单引号
\"
双引号
\a
响铃
\b
退格
\0
空
\n
换行
\v
纵向制表符
\r
回车
\f
换页
\t
横向制表符
\oYY
八进制
\xYY
十六进制
常用方法
join():连接字符串
split():分隔字符串,返回列表
count(s):返回子串s出现次数
len():返回字符串长度
strip():去掉两边空格
find():查找子串位置,未找到返回 -1
replace(old, new):替换子串
bytes.decode(encoding=’UTF-8’,errors=’strict’):解码为字符串
encode(encoding=’UTF-8’,errors=’strict’):编码为二进制数据
ljust(width, fillchar):左对齐,返回填充后的字符串
rstrip(width, fillchar):右对齐,返回填充后的字符串
center(width, fillchar):居中对齐,返回填充后的字符串
原始格式化字符串 原始格式化字符串:
%c:字符
%s:字符串
%d:整数
%u:无符号整数
%o:无符号八进制
%x:无符号十六进制
%X:无符号十六进制(大写)
%f:浮点,可以指定小数点后位数
%e:科学计数法
%E:科学计数法
%g:作用同%f%e
%G:作用同%f%E
%p:变量地址
例如:
1 "%s use python %f" % ('User', 3.7)
辅助指令:
*
:定义宽度或小数点精度
-
:左对齐
+
:在正数前面显示加号
<sp>
:在正数前面显示空格
#
:在八进制数前面显示零(‘0’),在十六进制前面显示’0x’或者’0X’(取决于用的是’x’还是’X’)
0
:显示的数字前面填充’0’而不是默认的空格
%
:’%%’输出一个单一的’%’
var
:映射变量(字典参数)
m.n.
:m 是显示的最小总宽度,n 是小数点后的位数
参考
增强型格式化字符串 增强型写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 指定输出位置 "{1} {0} {1}".format("hello", "world") # 输出 'world hello world' # 指定输出 "姓名:{name}, 年龄 {age}".format(name="John", age=3) # 输出列表 my_list = ['First', 'Second'] print("姓:{0[0]}, 名 {0[1]}".format(my_list)) # 格式化输出 "{:.2f}".format(3.1415926) # 输出 3.14 # 转义输出 "{{}}".format() # 输出 {}
格式
描述
格式
描述
格式
描述
{:.2f}
保留小数点后两位
{:+.2f}
带符号保留
{:.0f}
不带小数
{:0>2d}
数字补零 (填充左边, 宽度为2)
{:x<4d}
数字补x (填充右边, 宽度为4)
{:x<4d}
数字补x (填充右边, 宽度为4)
{:,}
以逗号分隔
{:.2%}
百分比格式
{:.2e}
指数记法
{:>10d}
右对齐
{:<10d}
左对齐
{:^10d}
中间对齐
参考
f-string Python 3.6 新增写法:字符串以f
开头,字符串中的变量将会自动运算解析为结果。
1 2 3 f'Hello {name}' f'{1+2}' f'{w["name"]}: {w["age"]}'
f-string 使用{content:format}
设置字符串格式。其中 content 是替换并填入字符串的内容,可以是变量、表达式或函数等,format 是格式描述符。
格式
描述
格式
描述
格式
描述
<
左对齐
>
右对齐
^
居中
+
加正负号
-
负数加负号
空格
正数加空格,负数加负号
#
进制数切换数字显示方式
width
指定数字宽度
0width
指定宽度并高位补0
width.precision
宽度.显示精度
,
千位分隔符
_
千位分隔符
b/c/d/o/x
进制显示方式
s
字符串
e/E/f/F/g/G/%
浮点数显示方式
%a/%A/%w/%u
星期
%d
日
%b/%B/%m
月
%y/%Y
年
%H/%I
小时
%p
上下午
%M/%S/%f
分钟,秒,微秒
%j
一年的第几天
%z
UTC偏移量
例如
1 2 3 4 5 c = 12345678 f'c is {c:015,d}' # 'c is 000,012,345,678' e = datetime.datetime.today() f'the time is {e:%Y-%m-%d (%a) %H:%M:%S}'
Unicode 字符串 在Python2中,普通字符串是以8位ASCII码进行存储的,而Unicode字符串则存储为16位unicode字符串,这样能够表示更多的字符集。使用的语法是在字符串前面加上前缀 u。
在Python3中,所有的字符串都是Unicode字符串。
列表 列表中的元素的类型可以不同。
运算方式有(设list_a=[1, 2, "a", True]
):
加(list_a + list_b
):列表拼接。
乘(list_a * 2
):列表重复。
切片(list_a[0:-1]
, list_a[3:]
):返回列表[1, 2, "a"]
,[True]
。
常用方法:
count():统计某元素出现次数
len():返回元素个数
append():在末尾添加元素
pop(n):移除第n个元素,默认最后一个
index():返回该元素的索引
insert():插入一个元素
extend():在末尾追加另一个列表的所有元素
remove():移除第一个此元素
reverse():翻转列表
sort():元素排序
clear():清空
copy():复制
del list[x]:删除第x个元素
内置函数:
1 2 3 4 5 def is_odd(n): return n % 2 == 1 newlist = filter(is_odd, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) print(newlist) # [1, 3, 5, 7, 9]
1 2 3 4 5 def square(x): return x ** 2 map(square, [1,2,3,4,5]) # [1, 4, 9, 16, 25] map(lambda x: x ** 2, [1, 2, 3, 4, 5]) # 使用 lambda 匿名函数
1 2 3 4 5 def add(x, y) : # 两数相加 return x + y reduce(add, [1,2,3,4,5]) # 计算列表和:1+2+3+4+5=15 reduce(lambda x, y: x+y, [1,2,3,4,5]) # 使用 lambda 匿名函数
元组 元组的元素不可修改。 元组中的元素的类型可以不同。 如果元组包含了对象,那么对象是可以修改的。 构造空元组写作tup1 = ()
,构造单元素元组tup2 = (20,)
,注意有逗号。
运算方式有(设tuple_a=(1, 2, "a", True)
):
加(tuple_a + tuple_b
):元组拼接。
乘(tuple_a * 2
):元组重复。
切片(tuple_a[0:-1]
, tuple_a[3:]
):返回元组(1, 2, "a")
,(True)
。
集合 集合中的元素类型必须一致。 集合中的元素不能重复。 创建一个空集合必须使用set()
。
运算方式有:
求差集(a - b)
求并集(a | b)
求交集(a & b)
求二者中不同时拥有的元素(a ^ b)
常用操作:
add():添加元素
update():批量添加元素
remove():移除元素,如果不存在,会报错
discard():移除元素,如果不存在,不会报错
pop():随机删除一个
len():求元素个数
clear():情况集合
x in s:判断使用包含元素
字典 字典中的元素按照键值对存取。 字典中Key
必须是不可变的数据类型(字符串,元组,常量)。 字典中Key
值必须是唯一的。
常用操作:
clear():清空字典
copy():浅拷贝
get():返回某键的值,否则返回default值
key in dict:判断是否有该key
pop():删除某个key
类型转换 int():转换为整型,可以接受:字符串,Bytes对象,数字。 float():转换为浮点,可以接受:字符串,数字。 str():转换为字符串,可接受几乎所有对象,转换结果适用于人类阅读。 repr():转换为字符串,可接受几乎所有对象,转换结果适用于机器使用。 eval():将字符串作为Python语句执行,返回执行结果。 tuple():转化为元组,可以接受:列表,字符串等。 list():转化为列表,可以接受:元组,字符串等。 set():转化为集合。 dict():转化为字典,可以接受:(key, value)的序列。 chr():整数转化为字符。 ord():字符转整数。 unichr():整数转换为Unicode字符。 hex():整数转十六进制字符串。 oct():整数转八进制字符串。
chr():将数字按照ASCII转化为字符 ord():转换ASCII字符为整数 bytearray():返回一个Byte数组,参数是整数n,则初始化数组长度为n;如果是字符串,则将字符串转换为Bytes compile(source, filename, mode):将字符串编译为字节码,mode可以为exec、eval、single
类型注解 在Python中,我们创建变量,传递变量是不需要注明类型的。但是这也造成了不便。因此 Python 3 提供了类型注解的功能,来表明变量类型。
1 2 3 4 5 # x:int 注明x是一个int型变量,-> 指明了返回值类型为int def add(x:int, y:int) -> int: # 声明一个int行变量,并赋值 z:int = 10 return x + y
内置函数 input():标准输入 print():标准输出
1 2 3 4 5 6 7 8 # 输出的结尾:以逗号结尾,默认是以换行结尾 print(end=',') # 输出对象间隔号:以逗号间隔,默认是空格 print(sep=',') # 输出到文件 print(file='') # 是否强制刷新流 print(flush=',')
exec():执行Python语句,无返回值 eval():将给定表达式用Python执行,并返回执行结果 execfile(filename):执行一个文件,返回执行结果 file():创建一个FILE对象,同open() memoryview():查看对象的在内存的存储形式,对使用缓冲区的地方非常友好,尤其是对str与bytearray
1 2 3 4 5 6 7 8 9 10 11 12 a = 'aaaaaa' ma = memoryview(a) ma.readonly # True,只读的memoryview mb = ma[:2] # 不会产生新的字符串 a = bytearray('aaaaaa') ma = memoryview(a) ma.readonly # False,可写的memoryview mb = ma[:2] # 不会会产生新的bytearray mb[:2] = 'bb' # 对mb的改动就是对ma的改动 mb.tobytes() # 'bb' ma.tobytes() # 'bbaaaa'
globals():返回当前位置的全局变量,字典形式 locals():返回当前位置的局部变量,字典形式
id():获取对象的内存地址 hash():返回对象的哈希值 super():调用父类 vars():将对象转化为字典
reload():重新加载模块 __import__
():动态加载类或模块 help():查看模块或函数的帮助信息
查看类型 1 2 3 type(x) # 查看变量的类型,子类与父类不一致。 isinstance(x, int) # 查看变量是否是某种类型,子类和父类被认为一直。 issubclass(father, son) # 判断是否为子类
基本操作 for 循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 遍历列表 for x in x_list: pass # 遍历字符串 for c in "abcdefg": pass # 带索引遍历 for i, name in enumerate(name_list, start_index): print(f'index is {i},name is {name}') # 打包成元组遍历 a = [1,2,3], c = [4,5,6,7,8], zip(a,c) --> [(1, 4), (2, 5), (3, 6)] for i in zip(albums, years): print(i) # 单行 for 循环 s.split() for s in sentence # 相当于 for s in sentence: s.split() # 遍历字典 for k, v in knights.items(): print(k, v) # 遍历排序后的集合 for f in sorted(set(basket)): print(f)
迭代器 1 2 3 4 5 6 7 list=[1,2,3,4] it = iter(list) # 创建迭代器对象 for x in it: print (x, end=" ") it = iter(list) # 创建迭代器对象 print (next(it)) # 输出迭代器的下一个元素 print (next(it)) # 输出迭代器的下一个元素
StopIteration 异常用于标识迭代的完成,防止出现无限循环的情况,在 __next__()
方法中我们可以设置在完成指定循环次数后触发 StopIteration 异常来结束迭代。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MyNumbers: def __iter__(self): self.a = 1 return self def __next__(self): if self.a <= 20: x = self.a self.a += 1 return x else: raise StopIteration myclass = MyNumbers() myiter = iter(myclass) for x in myiter: print(x)
在 Python 中,使用了 yield 的函数被称为生成器。
跟普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个迭代器。
在调用生成器运行的过程中,每次遇到 yield 时函数会暂停并保存当前所有的运行信息,返回 yield 的值, 并在下一次执行 next() 方法时从当前位置继续运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import sys def fibonacci(n): # 生成器函数 - 斐波那契 a, b, counter = 0, 1, 0 while True: if (counter > n): return yield a a, b = b, a + b counter += 1 f = fibonacci(10) # f 是一个迭代器,由生成器返回生成 while True: try: print (next(f), end=" ") except StopIteration: sys.exit()
不定长传参 转为元组传入 *
1 def printinfo( arg1, *vartuple ):
转为字典传入 **
1 def printinfo( arg1, **var_args_dict ):
匿名函数 1 sum = lambda arg1, arg2: arg1 + arg2
强制位置参数 /
前面的参数不能使用关键字参数。 *
后面的参数必须使用关键字参数。
1 2 def f(a, b, /, c, d, *, e, f): print(a, b, c, d, e, f)
数据结构 堆栈 可以使用列表作为堆栈。
1 2 3 stack = [1, 2, 3] stack.append(4) x = stack.pop()
可以使用模块。
1 2 3 4 5 from queue import LifoQueue stack = LifoQueue() stack.put(1) while not stack.empty(): x = stack.get()
队列 使用列表作为队列,但是效率不高。
1 2 3 que = [1, 2, 3] que.append(4) x = stack.popleft()
可以使用模块。
1 2 3 4 5 from queue import Queue que = Queue() q.put(1) while not q.empty(): x = q.get()
优先队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from queue import PriorityQueue q = PriorityQueue() q.put(Task(5, 'Mid-level job')) while not q.empty(): next_job = q.get() class Task(object): def __init__(self, priority, description): self.priority = priority self.description = description return # 运算符重载 def __lt__(self, other): return self.priority < other.priority
列表推导式 可以方便用于创建列表
1 2 3 4 vec1 = [2, 4, 6] vec2 = [4, 3, -9] [3*x for x in vec1] # [6, 12, 18] [x*y for x in vec1 for y in vec2] # [8, 6, -18, 16, 12, -36, 24, 18, -54]
模块与包 __name__
属性来使该程序块仅在该模块自身运行时执行。
dir() 函数可以找到模块内定义的所有名称。以一个字符串列表的形式返回:
包:管理 Python 模块命名空间的形式,如sound包下的effects包下的echo模块,使用方法:
1 import sound.effects.echo
对应的目录结构为:
包的下面必须有__init__.py
文件(可以是空文件),否则将不会识别为包。
如果包定义文件 __init__.py
存在一个叫做 __all__
的列表变量,那么在使用 from package import * 的时候就把这个列表中的所有名字作为包内容导入。
读写文件 使用open()
可以打开文件,其完整的参数表为:
file: 必需,文件路径(相对或者绝对路径)。
mode: 可选,文件打开模式
buffering: 设置缓冲
encoding: 一般使用utf8
errors: 报错级别
newline: 区分换行符
closefd: 传入的file参数类型
opener:
mode指定了文件打开模式,默认为只读,常见方式有:
模式
描述
模式
描述
模式
描述
x
写模式,文件已存在则报错
b
二进制
+
读写
r
只读
rb
只读,二进制
r+
读写
rb+
读写,二进制
w
只写
wb
只写,二进制
w+
读写
wb+
读写,二进制
a
追加写
ab
追加写,二进制
a+
追加读写
ab+
追加读写,二进制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 f = open(filename) # 读取n个字符或字节,默认是全部内容 f.read(n) # 读取一行,如果为空,说明已经最后一行了 f.readline() # 读取所有行 f.readlines() # 写入 f.write(data) # 返回当前指针位置 f.tell() # 移动指针位置:0 从开头向后移动m个字节,1 从当前位置向后移动m个字节,2 从结尾向后移动m个字节 f.seek(m, 0) # 获取文件描述符 f.fileno() # 判断是否为终端设备 f.isatty() # 刷新缓冲区到文件 f.flush() # 关闭文件,释放资源 f.close()
如果觉得打开文件再关闭文件操作繁琐,Python还提供了with as
功能,可以打开后不管释放:
1 2 3 # 执行完毕自动释放 with open("/tmp/file.txt") as file: data = file.read()
with语句不仅可以用来操作文件,线程等资源也可以使用。
pickle模块,可以用来序列化和反序列化对象。利用这个模块,我们可以用来保存数据结构到文件中。
1 2 3 4 import pickle pickle.dump(obj, file) x = pickle.load(file)
OS 模块 目录与权限:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # 检验权限模式,尝试使用UserID,GroupID访问目录,检验是否有权限访问。mode: # os.F_OK 测试path是否存在。 # os.R_OK 测试path是否可读。 # os.W_OK 测试path是否可写。 # os.X_OK 测试path是否可执行。 os.access(path, mode) # 更改当前进程的看到的根目录 # 例如 os.chroot('/tmp') # 则 对于进程'/'目录就是系统的'/tmp'目录 os.chroot(path) # 切换工作目录 os.chdir(path) # 获取工作目录 os.getcwd() # 更改权限 os.chmod(path, mode) # 更改文件所有者 os.chown(path, uid, gid) # 获取路劲下的文件和文件夹 os.listdir(path) # 创建路径,mode为权限 0o755 os.makedirs(path[, mode]) # 删除路径为path的文件 os.remove(path) # 删除空目录,如果非空则异常 os.rmdir(path) # 重命名 os.rename(src, dst) # 获取path信息 os.stat(path) # 获取文件系统信息 os.statvfs(path)
文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # 获取文件描述符 fx = f.fileno() fx = os.open(filepath, os.O_RDONLY) # 关闭文件 os.close(fx) # 关闭所有文件,左闭右开 os.closerange(fx, fy) # 复制文件描述符 os.dup(fx) # 通过描述符改变工作目录,fx指向目录 os.fchdir(fx) # 修改文件所有权 os.fchown(fx, uid, gid) # 强制写入磁盘 os.fdatasync(fx) # 打开的文件的系统配置信息,name,'PC_LINK_MAX' 文件最大连接数,'PC_NAME_MAX' 文件名最长长度 os.fpathconf(fx, name) # 获取描述符状态,包括设备信息,文件修改时间,用户ID等 os.fstat(fx) # 获取描述符状态,包括文件系统块大小,可用块数,文件结点总数 os.fstatvfs(fx) # 创建命名管道,mode为权限 默认0o666 os.mkfifo(path[, mode]) # 打开一个终端 os.openpty() # 创建一个管道 os.pipe() # 从command打开一个管道,command 使用的命令,mode r默认 w,bufsize 0无缓冲 1有缓冲 os.popen(command[, mode[, bufsize]])
os.path 模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # 绝对路径 os.path.abspath(path) # 文件名 os.path.basename(path) # 文件路径 os.path.dirname(path) # 路径是否存在 os.path.exists(path) # 文件访问时间 os.path.getatime(path) # 文件修改时间 os.path.getmtime(path) # 路径创建时间 os.path.getctime(path) # 文件大小 os.path.getsize(path) # 是否为绝对路径 os.path.isabs(path) # 是否为文件 os.path.isfile(path) # 是否为目录 os.path.isdir(path) # 是否为链接 os.path.islink(path) # 是否为挂载点 os.path.ismount(path) # 合并目录与文件名 os.path.join(path1[, path2[, ...]]) # 转换path大小写与斜杠 os.path.normcase(path) # 规范path形式 os.path.normpath(path) # 返回path真实路径 os.path.realpath(path) # 判断目录,文件是否相同 os.path.samefile(path1, path2) # 判断是否指向同一文件 os.path.sameopenfile(fp1, fp2) # 分割路径与文件名 元组 os.path.split(path) # 返回驱动器名和路径 windows下 元组 os.path.splitdrive(path) # 分割路径,返回路径名 扩展名 元组 os.path.splitext(path) # 分割为加载点与文件 os.path.splitunc(path) # 遍历path,每个目录都调用visit函数 visit(arg, dirname 目录, names 目录下所有文件名) os.path.walk(path, visit, arg)
异常与断言 1 2 3 4 5 6 7 8 9 10 11 12 try: pass # except 后可加元组,可以包含多个Exception except (ZeroDivisionError, KeyboardInterrupt): pass else: pass except Exception: pass # finally不论发生异常与否都会执行,如果异常未被接住,则会在finally执行完毕后抛出 finally: pass
Python assert(断言)用于判断一个表达式,在表达式条件为 False 的时候触发异常。
1 2 3 4 # 语法 assert expression[, arguments] # 例如 assert 3 + 2 == 5, '结果不为 5' # 输出 AssertionError: 结果不为 5
对象 类的属性与方法的访问权限:
1 2 3 4 5 6 # 默认为公有 def fun(): # 保护 一个下划线 def _fun(): # 私有 两个下划线 def __fun():
类的专用方法有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 # 构造函数 def __init__(): # 析构函数 def __del__(): # 打印 def __repr__(): # 按索引赋值 def __setitem__(): # 按索引取值 def __getitem__(): # 获取长度 def __len__(): # 比较 def __cmp__(): # 调用 def __call__(): # 运算符重载 # 加 def __add__(): # 减 def __sub__(): # 乘 def __mul__(): # 除 def __truediv__(): # 取余 def __mod__(): # 乘方 def __pow__(): # 小于 def __lt__(): # 等于 def __eq__(): # 大于 def __gt__(): # 小于等于 def __le__(): # 不等于 def __ne__(): # 大于等于 def __ge__():
在类的继承中,子类不重写 __init__
,实例化子类时,会自动调用父类定义的 __init__
。子类重写 __init__
,就不会调用父类的初始化函数。如果都想执行,可以使用super()调用。
标准库 shutil shutil模块提供了针对日常的文件和目录管理任务:
1 2 3 import shutil shutil.copyfile('a.txt','b.txt') shutil.move('/dir_a/a.txt','/dir_b')
blog glob模块提供了一个函数用于从目录通配符搜索中生成文件列表
1 2 import glob glob.glob('*.py') # ['primes.py', 'random.py', 'quote.py']
sys sys可以读取命令行参数
1 2 import sys print(sys.argv) # ['demo.py', 'arg1', 'arg2', 'arg3']
也可以重定向输出,如stdin,stdout,stderr
1 sys.stderr.write('Warning, log file not found starting a new one\n')
re re模块为高级字符串处理提供了正则表达式工具。
1 2 3 4 5 import re re.findall(r'\bf[a-z]*', 'which foot or hand fell fastest') # ['foot', 'fell', 'fastest'] re.sub(r'(\b[a-z]+) \1', r'\1', 'cat in the the hat') # 'cat in the hat'
datetime datetime模块为日期和时间处理同时提供了简单和复杂的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from datetime import date, time, datetime # 格式化输出 now = date.today() now.strftime("%m-%d-%y. %d %b %Y is a %A on the %d day of %B.") # '12-02-03. 02 Dec 2003 is a Tuesday on the 02 day of December.' # 日期天数差 birthday = date(1964, 7, 31) age = now - birthday age.days # 14368 # 当前时间戳 time_stamp = time.time() # 转为日期时间 datetime.fromtimestamp(time_stamp) # 转为时间戳 int(time.mktime(today.timetuple())) # 补时差 today + datetime.timedelta(hours=8)
数据压缩 以下模块直接支持通用的数据打包和压缩格式:zlib,gzip,bz2,zipfile,以及 tarfile。
1 2 3 4 5 6 7 import zlib s = b'witch which has which witches wrist watch' len(s) # 41 t = zlib.compress(s) len(t) # 37 zlib.decompress(t) # b'witch which has which witches wrist watch' zlib.crc32(s) # 226805979
计时器 1 2 3 4 5 6 7 8 9 10 11 12 from timeit import Timer Timer('t=a; a=b; b=t', 'a=1; b=2').timeit() # 测试函数调用时间 def test(): L = [] for i in range(100): L.append(i) if __name__ == '__main__': import timeit print(timeit.timeit("test()", setup="from __main__ import test"))
测试 doctest模块提供了一个工具,扫描模块并根据程序中内嵌的文档字符串执行测试。
1 2 3 4 5 6 7 8 9 10 def average(values): """Computes the arithmetic mean of a list of numbers. >>> print(average([20, 30, 70])) 40.0 """ return sum(values) / len(values) import doctest doctest.testmod() # 根据所给注释,自动验证本文档所有函数
unittest模块可以在一个独立的文件里提供一个更全面的测试集。
1 2 3 4 5 6 7 8 9 10 11 import unittest class TestStatisticalFunctions(unittest.TestCase): def test_average(self): self.assertEqual(average([20, 30, 70]), 40.0) self.assertEqual(round(average([1, 5, 7]), 1), 4.3) self.assertRaises(ZeroDivisionError, average, []) self.assertRaises(TypeError, average, 20, 30, 70) unittest.main() # 从命令行调用,执行所有测试
正则表达式
模式
描述
模式
描述
模式
描述
^
开头
$
末尾
.
任意字符,除了换行符
[...]
一组字符
[^...]
不在[]
中的字符
re*
匹配0个或多个的表达式
re+
匹配1个或多个的表达式
re?
匹配0个或1个表达式片段
re{n}
匹配n个前面表达式片段
re{n,}
精确匹配n个前面表达式片段
re{n,m}
匹配 n 到 m 次由前面的正则表达式片段
`a
b`
(re)
匹配括号内的表达式
(?#...)
注释
\w
数字字母下划线
\W
非数字字母下划线
\s
任意空白字符
\S
任意非空字符
\d
任意数字
\D
任意非数字
\A
字符串开始
\Z
字符串结束或换行前
\z
字符串结束
\G
最后匹配完成的位置
\b
单词边界
\B
非单词边界
\n
,\t
换行符,制表符
\1
,…,\9
匹配第n个分组的内容
re.match 尝试从字符串的起始位置匹配一个模式,如果不是起始位置匹配成功的话,match()就返回none。
1 2 3 4 5 6 7 8 9 10 11 # pattern 正则表达式 # string 要匹配的字符串 # flag 标志位 # 未匹配返回None print(re.match('a', 'a.b.c').span()) # span 返回匹配开始与结束的位置 返回(0, 1) print(re.match('c', 'a.b.c')) # 返回 None obj = re.match(pattern, string, flags=0) obj.group() # 原始对象 obj.group(1) # 获取匹配的值 obj.group(2) # 获取匹配的值
re.search 扫描整个字符串并返回第一个成功的匹配。
1 re.search(pattern, string, flags=0).span() # 返回匹配的位置
re.sub用于替换字符串中的匹配项
1 2 3 4 # repl 替换的字符串,也可以是函数 # count 最大替换次数 # 返回 替换次数 re.sub(pattern, repl, string, count=0, flags=0)
compile 函数用于编译正则表达式,生成一个正则表达式( Pattern )对象,供 match() 和 search() 这两个函数使用。
1 2 3 4 pt = re.compile(pattern[, flags]) # 例如 pt = re.compile(r'\d+') m = pt.match("abcd")
findall()在字符串中找到正则表达式所匹配的所有子串,并返回一个列表。
1 2 3 4 pt.findall(string[, pos[, endpos]]) # 例如 pt = re.compile(pattern[, flags]) pt.findall("abcd")
split 方法按照能够匹配的子串将字符串分割后返回列表
1 re.split(pattern, string[, maxsplit=0, flags=0])
正则表达式练习
网络 HTTP HTTP请求头部格式为:HTTP 字段名: 字段内容
,主要有以下几种:
头部
描述
头部
描述
Content-type:text/html
请求的MIME信息
Expires: Date
响应过期的日期和时间
Location: URL
重定向接收方到非请求URL的位置
Last-modified: Date
请求资源的最后修改时间
Content-length: N
请求的内容长度
Set-Cookie: String
设置Http Cookie
HTTP响应头部还包括了:
Allow:服务器支持的协议
Content-Encoding:编码
Location:如果是重定向301,则跳转到该页面
Date:服务器时间
Last-Modified:文档最后修改时间
Server:服务器名字
Set-Cookie:设置cookie
比较复杂的是Content-type,它包含:
text/html : HTML格式
text/plain :纯文本格式
text/xml : XML格式
image/gif :gif图片格式
image/jpeg :jpg图片格式
image/png:png图片格式
application/xhtml+xml:XHTML格式
application/xml:XML数据格式
application/atom+xml:Atom XML聚合格式
application/json:JSON数据格式
application/pdf:pdf格式
application/msword:Word文档格式
application/octet-stream:二进制流数据(如常见的文件下载)
multipart/form-data:需要在表单中进行文件上传时,就需要使用该格式
Socket Socket API 中定义的协议族(family)参数是指调用者期待返回的套接字地址结构的类型,主要包含(AF有时也写作PF):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 AF_UNSPEC 0 /* 未指定 */ AF_UNIX 1 /* Unix domain sockets */ AF_LOCAL 1 /* POSIX name for AF_UNIX */ AF_INET 2 /* IPv4 */ AF_AX25 3 /* 业余无线电 AX.25 */ AF_IPX 4 /* Novell IPX */ AF_APPLETALK 5/* AppleTalk 地址 */ AF_NETROM 6 /* 业余无线电 NET/ROM */ AF_BRIDGE 7 /* 多协议网桥 */ AF_ATMPVC 8 /* ATM PVCs */ AF_X25 9 /* 保留 for X.25 project */ AF_INET6 10 /* IPv6 */ AF_ROSE 11 /* 业余无线电 X.25 PLP */ AF_DECnet 12 /* 保留 for DECnet project */ AF_NETBEUI 13/* 保留 for 802.2LLC project*/ AF_SECURITY 14/* Security callback pseudo AF */ AF_KEY 15 /* PF_KEY key management API */ AF_NETLINK 16 /* Only for Linux */ AF_ROUTE AF_NETLINK /* Alias to emulate 4.4BSD */ AF_PACKET 17 /* Packet family */ AF_ASH 18 /* Ash */ AF_ECONET 19 /* Acorn Econet */ AF_ATMSVC 20 /* ATM SVCs */ AF_RDS 21 /* RDS sockets */ AF_SNA 22 /* Linux SNA Project (nutters!) */ AF_IRDA 23 /* IRDA sockets */ AF_PPPOX 24 /* PPPoX sockets */ AF_WANPIPE 25 /* Wanpipe API Sockets */ AF_LLC 26 /* Linux LLC */ AF_IB 27 /* Native InfiniBand address */ AF_CAN 29 /* Controller Area Network */ AF_TIPC 30 /* TIPC sockets */ AF_BLUETOOTH 31/* Bluetooth sockets */ AF_IUCV 32 /* IUCV sockets */ AF_RXRPC 33 /* RxRPC sockets */ AF_ISDN 34 /* mISDN sockets */ AF_PHONET 35 /* Phonet sockets */ AF_IEEE802154 36/* IEEE802154 sockets */ AF_CAIF 37 /* CAIF sockets */ AF_ALG 38 /* Algorithm sockets */ AF_NFC 39 /* NFC sockets */ AF_VSOCK 40 /* vSockets */ AF_MAX 41 /* 保留 */
参考
定义的类型(type)包含:
1 2 3 4 5 6 7 SOCK_STREAM = 1, // TCP SOCK_DGRAM = 2, // UDP SOCK_RAW = 3, // 原始类型,可以自定义 SOCK_RDM = 4, // 提供可靠的数据包连接 SOCK_SEQPACKET= 5, // 提供连续可靠的数据包连接 SOCK_DCCP = 6, // 数据报拥塞控制协议,具有内置拥塞控制的不可靠数据报的传输 SOCK_PACKET = 10, // 与网络驱动程序直接通信
定义的协议(protocol)包含:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 enum { IPPROTO_IP = 0, /* Dummy protocol for TCP */ IPPROTO_ICMP = 1, /* Internet Control Message Protocol */ IPPROTO_IGMP = 2, /* Internet Group Management Protocol */ IPPROTO_IPIP = 4, /* IPIP tunnels (older KA9Q tunnels use 94) */ IPPROTO_TCP = 6, /* Transmission Control Protocol */ IPPROTO_EGP = 8, /* Exterior Gateway Protocol */ IPPROTO_PUP = 12, /* PUP protocol */ IPPROTO_UDP = 17, /* User Datagram Protocol */ IPPROTO_IDP = 22, /* XNS IDP protocol */ IPPROTO_DCCP = 33, /* Datagram Congestion Control Protocol */ IPPROTO_RSVP = 46, /* RSVP protocol */ IPPROTO_GRE = 47, /* Cisco GRE tunnels (rfc 1701,1702) */ IPPROTO_IPV6 = 41, /* IPv6-in-IPv4 tunnelling */ IPPROTO_ESP = 50, /* Encapsulation Security Payload protocol */ IPPROTO_AH = 51, /* Authentication Header protocol */ IPPROTO_BEETPH = 94, /* IP option pseudo header for BEET */ IPPROTO_PIM = 103, /* Protocol Independent Multicast */ IPPROTO_COMP = 108, /* Compression Header protocol */ IPPROTO_SCTP = 132, /* Stream Control Transport Protocol */ IPPROTO_UDPLITE = 136, /* UDP-Lite (RFC 3828) */ IPPROTO_RAW = 255, /* Raw IP packets */ IPPROTO_MAX };
在Python中,主要的使用方式如下:
1 socket.socket([family[, type[, proto]]])
family,套接字协议族,常见有:
socket.AF_UNIX:只能够用于单一的Unix系统进程间通信
socket.AF_INET:服务器之间网络通信,IPv4
socket.AF_INET6:服务器之间网络通信,IPv6
type: 套接字类型,包括:
socket.SOCK_STREAM:流式socket,用于TCP
socket.SOCK_DGRAM:数据报式socket,用于UDP
socket.SOCK_SEQPACKET:可靠的连续数据包服务
socket.SOCK_RDM:可靠的UDP数据报
socket.SOCK_RAW:原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。
protocol:
默认:写 0 即可
CAN_RAW / CAN_BCM:使用 AF_CAN 协议时
连接方面:
s.bind():绑定地址到套接字,IPv4下,使用(host, port)
绑定。
s.listen():开启TCP监听。
s.accept():等待连接(阻塞)。
s.connect():主动连接服务器,IPv4下,使用(host, port)
,如果连接失败,返回socket.error
。
s.connect_ex():主动连接服务器,出错时返回出错码。
s.close():关闭套接字。
s.getpeername():返回远程地址。
s.getsockname():返回自己的地址。
s.settimeout(timeout):设置超时时间,例如连接等待时间。
s.gettimeout():获取超时时间。
数据传输:
s.recv():接收TCP数据,可以指定最大接收量。
s.send():发送TCP数据,返回发送的字节数。
s.sendall():发送完整TCP数据,如果失败抛出异常。
s.recvfrom():接收UDP数据,返回(data, address)。
s.sendto():发送UDP数据,参数为(data, (ip, port)),返回发送的字节数。
s.setsockopt(level,optname,value):设置套接字。
s.getsockopt(level,optname[.buflen]):获取设置。
s.fileno():返回套接字的文件描述符。
s.setblocking(flag):设置为非阻塞模式。
s.makefile():创建套接字文件。
TCP服务器例程:
1 2 3 4 5 6 7 server = socket.socket() # 默认 TCP serber.bind(("127.0.0.1", 9001)) server.listen(5) conn, address = server.accept() conn.send("".encode("utf8")) data = conn.recv(1024) conn.close()
TCP客户端例程:
1 2 3 4 client = socket.socket() client.connect(("127.0.0.1", 9001)) data = client.recv(1024) client.close()
UDP服务器例程:
1 2 3 server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) serber.bind(("127.0.0.1", 9001)) data, client = conn.recvfrom(1024)
UDP客户端例程:
1 2 client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.sendto(data, ("127.0.0.1", 9001))
uWSGI uWSGI 是Python搭建Web服务所用的中间件,是调和Web服务于Web应用直接的协议问题。
首先安装uWSGI:
1 2 3 pip install uwsgi # uwsgitop 用于监控数据 pip install uwsgitop
假设当前Nginx配置为:
1 2 3 4 location / { include uwsgi_params; uwsgi_pass 127.0.0.1:3031; }
我们启动一个uWSGI服务:
1 2 3 4 5 6 7 # --processes 添加更多的进程,用于并发 # --threads 添加更多的线程,用于并发 # --stats 使用 stats 子系统,可以执行监控任务 (uwsgitop) # --http-socket 启动地址,结合Nginx用 # --wsgi-file 指定入口文件 # --chdir 指定项目目录,如Django项目目录 uwsgi --http-socket 127.0.0.1:3031 --chdir /home/foobar/myproject/ --wsgi-file myproject/wsgi.py --master --processes 4 --threads 2 --stats 127.0.0.1:9191
也可以写成配置文件:
1 2 3 4 5 6 7 [uwsgi] socket = 127.0.0.1:3031 chdir = /home/foobar/myproject/ wsgi-file = myproject/wsgi.py processes = 4 threads = 2 stats = 127.0.0.1:9191
接着执行:
如果不用Django框架,而是单独文件server.py
,或是Flask框架:
1 2 3 4 # uWSGI Python 加载器将会搜索的默认函数 application def application(env, start_response): start_response('200 OK', [('Content-Type','text/html')]) return [b"Hello World"]
多线程 Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。也就是说,尽管有了线程模块,Python几乎依然是单线程处理 。
尽管如此,在IO密集型的多线程应用中,Python的多线程threading库表现却依然还行。但在并行计算型应用中,如果想真正实现多线程,就得在Python中可以使用多线程threading,并自行设计锁结构,或使用多进程multiprocessing,并在主进程设置消息队列,共享内存,管道等方式传递数据。
threading 模块 创建线程,可以直接使用:
1 2 3 4 5 6 7 8 9 10 11 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主线程') threading.enumerate() # 所有线程列表
也可以通过子类继承后使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主线程')
Thead 对象的常用方法有:
isAlive():是否运行
getName():获取线程名称
setName():设置线程名称
x.join():当前线程等待x线程结束再继续执行。
setDaemon(True):设置为守护线程
守护线性 :如果设置一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。主线程只会等待所有非守护线程都结束后才退出。
threading 模块的常用方法有:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
使用 同步锁 可以防止数据竞争问题:
1 2 3 4 5 6 R = threading.Lock() R.acquire() ''' 临界区 ''' R.release()
但是使用锁的时候,一定要解决好死锁的问题。解决方法可以参考《操作系统》相关章节。
线程间通信,可以使用消息队列,可以使用共享内存的方式进行通信。下面使用队列方式通信(注意互斥访问队列):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # JoinableQueue # 队列长度,多线程下不够准确 Queue.qsize() # 队列判空 Queue.empty() # 队列判满 Queue.full() # 入队,是否阻塞 Queue.put(item, block=True, timeout=None) # 入队,不阻塞 Queue.put_nowait(item) # 出队,是否阻塞 Queue.get(block=True, timeout=None) # 出队,不阻塞 Queue.get_nowait() # 提示让出队列,提示join停止阻塞 Queue.task_done() # 阻塞直到队列为空 Queue.join()
multiprocessing 模块 多进程的创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': # 会发现每一个进程都有不同的 PID # 且进程的数据各自保留一份,互不相关 # 之间传递数据必须使用工具 p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
创建共享内存实现主进程与子进程通信:
1 2 3 4 5 6 7 8 9 10 11 12 import multiprocessing def f(a): a[0] = 5 # 创建共享内存 arr = multiprocessing.Array('i', range(10)) # 子进程处理 p = multiprocessing.Process(target=f, args=(arr,)) p.start() p.join() print(arr[0])
使用Manger通信,本质也是共享内存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import multiprocessing def f(ls): ls.append('Hello') # Manager 要在主进程创建 server = multiprocessing.Manager() # 每调用一次list产生一个共享内存 # 除了list外,也可以是其他形式,如队列、锁、字典、数组等 ls = server.list() # ls = server.Queue() # 子进程处理 proc = multiprocessing.Process(target=f, args=(ls,)) proc.start() proc.join() print(ls)
多进程间通过队列通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import multiprocessing from multiprocessing import Queue que = Queue(3) # 队列容量 3 que.put("msg 1") que.put("msg 2") que.full() # False que.put("msg 3") que.full() # True que.empty() # False def fun(que): d = que.get() p1 = multiprocessing.Process(target=fun, args=(que,))
进程池:
1 2 3 4 5 6 7 8 9 10 11 12 import multiprocessing from multiprocessing import Pool def fun(arg): pass pool = Pool(3) # 进程容量 3 for i in range(10): pool.apply_async(fun, (arg,)) # 添加任务 pool.close() # 关闭,不再接受新请求 pool.join() # 等待退出,必须在close之后
ctypes ctypes可以让Python直接调用任意的C动态库的导出函数,由于ctypes会在调用C函数前释放GIL,因此也可以实现多线程。
我们可以将写好的Task编译为C的动态库,例如lib_task.so
或lib_task.dll
,然后在Python中调用该库。打包动态库可以使用Visual Studio建立相关项目,Visual Studio就会自动生成一个DLL模板。或使用GCC创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 lib_task.h #ifndef LIB_TASK_H #define LIB_TASK_H #ifdef __cplusplus extern " C " { #endif // DLL 关键字 __declspec (dllexport) extern __declspec (dllexport) void Task(int arg); #ifdef __cplusplus } #endif #endif lib_task.c #include "print.h" // 在这里实现多线程 void Task(int arg) { while(arg); return; }
编辑DLL:
1 2 3 4 5 6 7 8 9 10 11 gcc --share lib_task.c -o lib_task.dll from ctypes import * from threading import Thread # lib_task.h 与 lib_task.dll 必须在这个目录下 # 给DLL传递参数时,要将参数转化为C的类型 lib = cdll.LoadLibrary("lib_task.dll") t = Thread(target=lib.Task, args=(1,)) t.start() lib.Task()
线程池 线程池可以帮助我们自动调度线程,在需要多线程任务量巨大的情况下是非常好用的工具,省去我们考虑线程同步的问题,也节省了上下文切换的时间。
第三方线程池 threadpool:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 pip install threadpool from threadpool import ThreadPool, makeRequests # 创建一个容纳4个线程的线程池 pool = ThreadPool(4) requests = makeRequests( some_callable, # 多线程的任务 list_of_args, # 参数 callback # 回调函数,可空 ) for req in requests: pool.putRequest(req) # 等待线程池完成任务 pool.wait()
另外还有ThreadPoolExecutor,ProcessPoolExecutor,线程(进程)池也可以使用。
Executor提供了以下常用的方法:
submit(fn, *args,**kwargs)
:将fn函提交给池子;*args是传给fn函数的参数;**kwargs表示以关键字的形式为fn的参数。
map(func, *iterables, timeout=None, chunksize=1)
:类似于全局函数的map,只是该函数将会启动多个线程,以异步的方式立即对*iterables执行map处理,就是把for循环和submit结合在一起了。
shutdown(wait=True)
:关闭池子,wait=True时等待池内所有任务执行完毕回收完资源后才继续;wait=False时立即返回,并不会等待池内的任务执行完毕;但不管wait参数为何值,整个程序都会等到所有任务执行完毕才会清空池子,所以submit和map必须在shutdown之前执行。
程序将task函数submit之后,submit会返回一个Future对象,Future类主要用于获取线程或进程任务函数的返回值。Future中提供了一下方法:
cancel()
:取消Future代表的线程或者进程任务,如果任务正在执行,不可取消,返回False;否则任务取消,返回Ture。
cancelled()
:返回Future代表的任务是否被成功取消。
running()
:返回Future代表的任务是否增正在执行。
done()
:返回Future代表的任务是否已经结束。
result(timmeout=None)
:返回Future代表的任务的结果,如果任务没有完成,该方法将会阻塞当前线程,timeout指定阻塞多长时间。
exception()
:返回Future代表的任务的异常,如果没有异常,则返回None。
add_done_callback(fn)
:给Future代表的任务加一个’回调函数’,当该任务成功之后,执行这个fn函数。
创建线程池 ThreadPoolExecutor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import time,threading from concurrent.futures import ThreadPoolExecutor def f(n): time.sleep(2) print(f"线程号 {threading.get_ident()}",n) return n*n if __name__ == '__main__': # 创建线程池,线程数 5 t_pool = ThreadPoolExecutor(max_workers=5) t_l = list() for i in range(1,5): t = t_pool.submit(f,i) t_l.append(t) t_pool.shutdown() for i in t_l: print('===',i.result())
创建进程池 ProcessPoolExecutor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import time,threading from concurrent.futures import ProcessPoolExecutor def callback_fun(x): pass def f(n): time.sleep(2) print(f"进程PID {os.getpid()}",n) return n*n if __name__ == '__main__': # 创建进程池,进程数 5 p_pool = ProcessPoolExecutor(max_workers=5) p_l = list() for i in range(5): t = p_pool.submit(f,i) # 也可以设置回调函数,回调的参数由任务函数提供 # t.add_done_callback(callback_fun) p_l.append(t) # 也可以写成 # s = p_pool.map(f,range(1,5)) p_pool.shutdown(wait = True) for i in p_l: print('===',i.result())
multiprocessing 模块也提供了进程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import os,time from multiprocessing import Process,Pool def f(n): print(f"进程PID {os.getpid()}") time.sleep(1) return n*n # 返回值交给回调函数 def cb_fun(n): pass if __name__ == '__main__': # 创建工作进程 p = Pool(3) p_l = list() for i in range(1,10): re = p.apply( f, # 多线程工作函数 args=(i,), # 传递的参数 callback=cb_fun # 回调函数 ) p_l.append(re) print(p_l) p_l.close() p_l.join()
迭代器 迭代器:可以节省内存空间,用每步生成的方式代替查表的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from collections import Iterable isinstance("abc", Iterable) # 可迭代类型 class Student(object): names = [] def __iter__(self): return StudentIterator(self) class StudentIterator(object): def __init__(self, obj): self.obj = obj self.cnt = 0 def __next__(self): if cnt > len(self.obj.names): raise StopIteration # 告诉 for 已经迭代完毕 self.cnt += 1 return self.obj.names[0] def __iter__(self): pass # 也可以合并为一个类 class Fibonacci(object): def __init__(self, all_num): self.all_num = all_num self.cnt = 0 self.a = 0 self.b = 1 def __iter__(self): return self def __next__(self): if cnt >= all_num: raise StopIteration # 告诉 for 已经迭代完毕 self.cnt += 1 self.a, self.b = self.b, self.a + self.b return self.b fib = Fibonacci(10) for x in fib: print(x)
对于如下代码,执行步骤为:
判断obj是否是迭代类型(是否有__iter__
方法);
调用__iter__
方法得到迭代器,获取__iter__
的返回值(返回了一个迭代器)。
每for一次,调用一次__next__
迭代器实例:
1 2 range(10) # 直接生成数据 xrange(10) # 底层为迭代器,可以节省内存
生成器 生成器是一种特殊的迭代器。
1 2 3 4 5 6 7 # 普通方式创建数组 nums = [x for x in range(10)] # 生成器方式,节省空间 nums = (x for x in range(10)) # 迭代生成器 for x in nums: print(x)
将函数变为生成器:只要函数中包含yield
即可,此时调用函数,得到一个生成器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def fib(n): a, b = 0, 1 cnt = 0 while cnt < n: yield a a, b = b, a + b cnt += 1 # return 可有可无 return "over" obj = fib(10) # 得到生成器 for x in obj: print(x) obj2 = fib(4) while True: try: ret = next(obj2) except Except as ex: print(ex.value) # 此处存储 fib 的 return break
通过send
方法启动生成器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def fib(n): a, b = 0, 1 cnt = 0 while cnt < n: ret = yield a # 此处 ret 接收 send 方法给的值 a, b = b, a + b cnt += 1 obj = fib(10) ret = next(obj) # 启动生成器 得到第一个元素 给 ret ret = obj.send("xx") # 给 生成器 中的 ret 传递值 xx print(ret) # 注:send 必须在 next 之后执行;要么在next之前执行,传递 None
协程 greenlet
模块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from greenlet import greenlet def fun1(): ... gr2.switch() ... pass def fun2(): ... gr1.switch() ... pass g1 = greenlet(fun1) g2 = greenlet(fun2) g1.switch() # 切入 fun1 运行
gevent
模块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import gevent def fun(n): for i in range(n): print(gevent.getcurrent(), i) gevent.sleep(0.5) # gevent 内部必须使用自己的延时等待 g1 = gevent.spawn(fun, 5) # 创建协程 函数 fun 参数 5 g2 = gevent.spawn(fun, 5) g3 = gevent.spawn(fun, 5) g1.join() g2.join() g3.join()
给代码打补丁升级:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import gevent from gevent import monkey monkey.patch_all() # 打补丁,临时改写代码 def fun(n): for i in range(n): print(gevent.getcurrent(), i) time.sleep(0.5) # 有了补丁,可以不必更改延时函数 # 写法与上面的等效 gevent.joinall([ gevent.spawn(fun, 5), gevent.spawn(fun, 5) ])
XML 与 JSON XML XML 指可扩展标记语言(eXtensible Markup Language),形式同HTML,是一种用于标记电子文件使其具有结构性的标记语言。XML也可以用于数据以文本格式存储下来。格式如下(DOM):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <collection shelf="New Arrivals"> <movie title="Enemy Behind"> <type>War, Thriller</type> <format>DVD</format> <year>2003</year> <rating>PG</rating> <stars>10</stars> <description>Talk about a US-Japan war</description> </movie> <movie title="Transformers"> <type>Anime, Science Fiction</type> <format>DVD</format> <year>1989</year> <rating>R</rating> <stars>8</stars> <description>A schientific fiction</description> </movie> <movie title="Trigun"> <type>Anime, Action</type> <format>DVD</format> <episodes>4</episodes> <rating>PG</rating> <stars>10</stars> <description>Vash the Stampede!</description> </movie> <movie title="Ishtar"> <type>Comedy</type> <format>VHS</format> <rating>PG</rating> <stars>2</stars> <description>Viewable boredom</description> </movie> </collection>
解析 XML 可以使用 SAX 模块,SAX 模块用事件驱动模型,通过在解析 XML 的过程中触发一个个的事件并调用用户定义的回调函数来处理 XML 文件。SAX 模块非常适用于对大型文件进行处理,且只需要文件部分信息时使用。
通过使用ContentHandler类读取数据。ContentHandler的方法有:
startDocument():文档启动时调用。
endDocument():到达结尾时调用。
startElement(name, attrs):遇到开始标签<..>调用。
endElement(name):遇到结束标签</..>调用。
characters(content):分情况看,有
从行开始,遇到标签之前,若存在字符,则content的值为这些字符串。
从一个标签,遇到下一个标签之前,若存在字符,则content的值为这些字符串。
从一个标签,遇到行结束符之前,若存在字符,则content的值为这些字符串。
标签可以是开始标签,也可以是结束标签。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import xml.sax class MovieHandler( xml.sax.ContentHandler ): def __init__(self): # 此处定义对象属性 pass # 元素开始调用 def startElement(self, tag, attributes): pass # 元素结束调用 def endElement(self, tag): pass # 读取字符时调用 def characters(self, content): pass # 创建XML阅读器 parser = xml.sax.make_parser() # 关闭命名空间 parser.setFeature(xml.sax.handler.feature_namespaces, 0) # 创建对象 Handler = MovieHandler() # 设置XML阅读器 parser.setContentHandler(Handler) # 开始解析 parser.parse("movies.xml")
如果解析的文件不大,且需要文件的全部信息,可以使用DOM解析器。这个解析器可以一次性将整个文档读入内存,且可读可写到文件。
1 2 3 4 5 6 7 8 9 10 11 from xml.dom.minidom import parse import xml.dom.minidom # 使用minidom解析器打开 XML 文档 DOMTree = xml.dom.minidom.parse("movies.xml") collection = DOMTree.documentElement if collection.hasAttribute("shelf"): print (f"Root element : {collection.getAttribute('shelf')}") # 在集合中获取所有电影 movies = collection.getElementsByTagName("movie")
Json JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式,适合于网络间传输数据,如前后端使用Ajax传输,则偏向于传输Json。
1 2 3 4 5 6 7 8 9 10 11 12 import json data = { 'no' : 1, 'name' : 'Runoob', 'url' : 'http://www.runoob.com' } # Python 字典类型转换为 JSON 对象 json_str = json.dumps(data) # 将 JSON 对象转换为 Python 字典 data = json.loads(json_str)
Python 技巧 数据结构
使用元组存储数据,节省空间。
1 2 3 4 5 6 7 8 9 10 # 方式 1 Name, Age, Gender, Email = 1, 2, 3, 4 student = ('mike', 18, 'male', '123@qq.com') name = student[Name] # 方式 2 from collections import namedtuple Student = namedtuple('Student', ['NName', 'Age', 'Gender', 'Email']) student = Student('jim', 16, ...) name = student.name
统计序列中的元素出现频率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 产生数据 from random import randint data = [randint(0, 20) for _ in range(30)] # 方式 1: 产生结果为字典 c = dict.fromkeys(data, 0) for x in data: c[x] += 1 # 方式 2: 结果也是字典,但是还有其他统计信息 from collections import Counter c2 = Counter(data) c2.most_common(3) # 频数最大的 3 个元素 # 例如统计词频 c3 = Counter(re.split("\W+", filename))
根据字典中值的大小,排序字典序
1 2 3 4 5 6 7 8 9 # 产生数据 from random import randint d = {x: randint(60, 100) for x in 'xyzabc'} # 方式 1 sorted(zip(d.values(), d.keys())) # 方式 2 sorted(d.items(), key=lambda x: x[1])
找到多个字典的公共键
1 2 3 4 5 6 7 8 9 10 11 12 # 产生数据 from random import randint, sample s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} # 方式 1 s1.keys() & s2.keys() & s3.keys() # 方式 2 from functools import reduce reduce(lambda a, b: a & b, map(dict.keys, [s1, s2, s3]))
让字典保持有序
1 2 3 4 5 6 7 # 方式 1: 使用有序字典 from collections import OrderedDict d = OrderedDict() d['a'] = (1, 10) d['b'] = (2, 16) d['c'] = (3, 20) # 按照输入的顺序存储
实现历史记录功能
1 2 3 4 5 6 7 8 # 方式 1: 使用双端队列 from collections import deque q = deque([], 5) # 初始值 容量 q.append(1) import pickle # python 对象持久化 pickle.dump(q, open("filename", "w")) q = pickle.load(open("filename"))
切片操作
1 2 3 4 5 6 d[1:3] # 创建可切片的迭代器 from itertools import islice for line in islice(d, 1, 3): print(line)
同时迭代多个对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 产生数据 from random import randint a = [randint(60, 100) for _ in range(40)] b = [randint(60, 100) for _ in range(40)] # 方式 1: 采用索引 for i in range(len(a)): pass # 方式 2: 采用zip for x, y in zip(a, b): pass # 方式 3: 串行连接多个迭代对象 from itertools import chain for x in chain(a, b): pass
修改列表的元素
1 2 3 4 5 6 7 8 9 10 11 12 # 产生数据 from random import randint a = [x: randint(60, 100) for _ in range(40)] # 方式 1: 有Bug for item in d: i = d.index(item) d[i] += 1 # 方式 2: for i, item in enumerate(d): pass
字符串
拆分字符串
1 2 3 4 5 6 7 8 9 10 # 产生数据 s = 'ab;cd|efg|hi,jkl|mn\topq;;rs t,uvw\txyz' # 方式 1: s.split() # 默认为 \t 空格 s.split(';') # 方式 2: 正则 import re re.split('[,;\t|]+', s)
是否有某一前缀、后缀
1 2 3 4 5 6 # 产生数据 s = 'www.baidu.com' # 方式 1: s.startwith("www") s.endwith(("com", "org")) # 只能是元组
调整字符串格式
1 2 3 4 5 6 7 # 产生数据 s = '2016-05-01' # 改为 05/01/2016 # 方式 1: import re re.sub(r'(\d{4})-(\d{2})-(\d{2})', r'\2/\1/\3', s) re.sub(r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', s)
拼接多个字符串
1 2 3 4 5 6 # 产生数据 s1 = '12345' s2 = '67890' # 方式 1: ';'.join([s1, s2])
字符串对齐
1 2 3 4 5 6 7 8 9 10 11 12 # 产生数据 d = {'name': 'mike', 'age': 19} # 方式 1: d['name'].ljust(10) d['name'].rjust(10, '.') d['name'].center(10) # 方式 2: format(s, '<20') format(s, '>20') format(s, '^20')
去除不需要的字符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # 产生数据 s1 = ' abc 123 ' # 方式 1 s.strip(' ') # 去掉两端字符,默认为 空格 s.lstrip() s.rstrip() # 方法 2 s = s[:3] + s[4:] # 方法 3 s.replace('\t', '') # 方法 4 re.sub(r'[\t\r]', '', s) # 方法 5 import string s.translate(string.maketrans('abcxyz', 'xyzabc')) s.translate(None, ' ') # 删除字符
文件
处理二进制文件
1 2 3 4 5 6 7 8 9 10 # 产生数据 f = open("filename", "rb") # 方式 1 import struct # 模式:h-short i-int struct.unpack('h', '\x01\x02') # 小端 struct.unpack('>h', '\x01\x02') # 大端 import array buf = array.array('h', (0 for _ in range(length)))
文件缓冲
1 2 3 4 5 6 # 全缓冲 f = open('', 'w', buffering=2048) # 缓冲区默认 4096 大小 # 行缓冲 f = open('', 'w', buffering=1) # 无缓冲 f = open('', 'w', buffering=0)
将文件映射到内存
1 2 3 4 5 6 f = open('', 'r+b') # 映射文件描述符到内存,0表示映射区域为文件全文 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE, offset=mmap.PAGESIZE * 4) m[1:10] m[3] = '0x30'
文件状态
1 2 3 4 5 6 7 8 9 10 11 12 13 # 系统调用 import os os.stat(path) os.lstat(path) # 不跟随符号链接 os.fstat(open("", "")) # 包括: 文件类型,文件权限,时间(访问,修改,创建),文件大小 os.path.isdir() os.path.isfile() os.path.islink() os.path.getatime() os.path.getsize()
使用临时文件
1 2 3 from tempfile import TemporaryFile, NamedTemporaryFile f = TemporaryFile() f.write()
读写 CSV
1 2 3 4 5 6 7 8 9 import csv rf = open('', 'rb') # 一定是二进制打开 reader = csv.reader(rf) head = reader.next() for row in reader: print(row[0], row[1]) writer = csv.writer(wf) writer.writerow() writer.flush()
读写 JSON
1 2 json.loads() json.dumps()
读写 XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from xml.etree.ElementTree import parse f = open() et = parse(f) root = et.getroot() root.tag root.attrib root.text for child in root: print(child.get('name')) # 在子元素中找 root.find() root.findall() # 在所有子孙中找 root.iter() root.findall(".//node") # 这里使用 XPATH 表示
读写 Excel
1 2 3 4 5 6 7 8 import xlrd xlwt book = xlrd.open_workbook() sheet = book.sheets()[0] sheet.nrows sheet.ncols cell = sheet.cell(3, 5) cell.ctype sheet.row(1)
对象
创建大量实例并节省内存
1 2 3 4 5 6 7 8 class Player(object): # __slots__ 限制有哪些属性,关闭动态字典属性 __slots__ = ['uid', 'name', 'stat', 'level'] def __init__(self, uid, name, status=0, level=1): self.uid = uid self.name = name self.stat = status self.level = level
使用上下文管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Client(object): def __init__(self): pass def start(self): pass def cleanup(self): pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): # 后面参数为异常 pass return True # 不向上抛出异常 return False / None # 向上抛出异常 with Client() as c: pass
创建可管理的对象属性
1 2 3 4 5 6 7 8 9 10 11 12 class Circle(object): radius = 0 def getRadius(self): return self.radius def setRadius(self, r): if not isinstance(r, (int, float)): raise ValueError('wrong type.') self.radius = float(r) R = property(getRadius, setRadius) c = Circle() c.R = 1 print(c.R)
比较操作
1 2 3 4 5 6 7 8 9 10 11 12 class Circle(object): radius = 0 def getRadius(self): return self.radius def setRadius(self, r): if not isinstance(r, (int, float)): raise ValueError('wrong type.') self.radius = float(r) def __lt__(self, obj): pass def __ge__(self, obj): pass
类型检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Attr(object): def __init__(self, name, type_): self.name = name self.type_ = type_ def __get__(self, instance, clazz): return instance.__dict__[self.name] def __set__(self, instance, value): # 此处插入类型检查 if not isinstance(value, self.type_): raise TypeError(f'expected an {self.type_}') instance.__dict__[self.name] = value def __delete__(self, instance): del instance.__dict__[self.name] class Person(object): name = Attr('name', str) age = Attr('name', int)
循环引用垃圾回收问题
1 2 3 4 # 使用弱引用 import wearref a = A() a_wref = weakref.ref(a)
通过字符串调用实例方法
1 2 3 4 5 6 7 8 # 方法 1: s = Person() fun = getattr(s, 'getName', None) # 第三个参数为找不到时的默认值 if fun: fun() # 方法 2: from operator import methodcaller methodcaller('findPerson', 'name', age)(s)
函数装饰器