Pyhton标准库

文本

1.string:capwords 所有单词首字母大写

2.str内建:maketrans 应用于多个替换规则时

3.string:string.Template 多个变量的文本拼接

4.re

1
2
3
4
一般正则三段论
1.pattern = '匹配规则'
2.text = '待匹配文本'
3.match对象 = re.search(pattern, text) # 不能匹配返回None,对象包含规则、文本、起始位置等
1
2
3
4
预编译正则三段论
1.regex = re.compile('匹配规则')
2.text = '待匹配文本'
3.match对象 = regex.search(text)
1
2
3
多重匹配,有多少匹配多少,search只匹配一个
for mat in re.finditer(pattern, text):
print(mat.start(), mat.end())
1
2
3
4
5
6
7
8
9
10
11
12
13
* 0-n
+ 1-n
?0/1,其余之后加?可消除贪心行为匹配最少的字符
{m}{m,n} 次数或范围
[abc][a-zA-Z] 字符集
[^] 非这些字符
. 任何单个字符
\d 一个数字 大写为非
\s 一个空白符(tab,space,\n)
\w 字母或数字
^ 字符串或行开始位置查找
$ 字符串或行结束位置查找
() 进行分组,match.group()返回匹配的字符串序列
1
re.match与re.search区别:match含一个开始的锚,只能匹配开头,否则为None

数据结构

1.collections.Counter 计算字符个数

1
2
3
4
两步走
text = 'my name is zhangpei. I love chenwenyan. I am a student. I love reading. I love sports'
1.初始化 count=collections.Counter(text.split‘ ’) # 计字符输入单个字符串,计词输入list
2.查找最高频词 print(count.most_common(3))
1
+ 计数相加 - 计数相减 & 取最小计数 | 取最大计数

2.collections.defaultdict 初始化时由你指定默认值

1
2
dic = collections.defaultdict(int) # 默认值可还为float/list/set,或由函数定义返回值用函数作参
print(dic['a']) # int默认为0

3.collections.deque 双端队列,线程安全,可控制为队列和栈

1
2
3
4
增:extend/append/extendleft/appendleft (extend将序列按个输入,append整体输入序列)
删:pop/popleft/remove
改/查:deq[index]
旋转:rotate(+index)顺时针移动 rotate(-index)逆时针移动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from collections import deque # 双端队列多线程删除
import threading
import time
deq = deque()
deq.extend(range(5))
deq.extendleft('abcde')
print(list(deq))
def delete(direction, process):
while True:
try:
pop_element = process()
except IndexError:
print('不好删了')
break
else:
print('%10s: %5s' % (direction, pop_element))
time.sleep(1)
print('%10s: done' % direction)
pop1 = threading.Thread(target=delete, args=('left', deq.popleft))
pop2 = threading.Thread(target=delete, args=('right', deq.pop))
pop1.start() # start()方法用于启动线程
pop2.start()
pop1.join() # 调用Thread.join将会使主调线程堵塞,直到被调用线程运行结束或超时
pop2.join()

4.collections.namedtuple 指定元素名字的元组,内存使用也高效

1
2
3
4
5
1.定义由类表示 Person = namedtuple('Person', 'name age gender grade')
# 类名+元素名字符串 # 参数rename=True可将冲突字段名自动改成下标
2.定义实例 zp = Person('zp', 23, 'male', 90)
3.查字段值 zp[1] == zp.age
4.查所有字段名 Person._fields

5.collections.OrderedDict 有序字典,记录增加元素的顺序,顺序不同字典不同

1
2
定义 people = OrderedDict()
增/改 people['age']=23

6.array 数组,元素类型相同时使用,省内存,用法同list

1
2
3
4
定义:a = array.array('i') # i为整型,u为字符,f为浮点型
增:a.append()/a.extend([])
删:a.pop()
查:a[2:5]

7.heapq 最小堆

1
2
3
4
5
6
data = [8, 6, 10, 1, 12, 4]
1.定义:heapq.heapify(data)
2.增:heapq.heappush(data, 3)
3.删:heapq.heappop(data)
4.改:heapq.heapreplace(data, 11) # 总是改最小值
5.查:heapq.nlargest(2, data)/heapq.nsmallest(2, data)

8.bisect 有序列表

1
2
3
4
5
6
7
8
9
10
import bisect
import random
l = list()
random.seed(1)
for i in range(1, 15):
r = random.randint(1, 100)
bisect.insort(l, r) # 按序插入,从左插入为insort_left
position = bisect.bisect(l, r) # 返回插入的位置,bisect_left
print(l)

9.queue.Queue 队列,线程安全,多线程可以处理同一个队列

1
2
3
4
5
6
1.定义:q = queue.Queue()
2.增:q.put(i)
3.删:p = q.get()
4.查:q.empty()
q.qsize() 返回队列的大小
q.full() 队列满

10.queue.LifoQueue 栈,线程安全

1
2
3
4
1.定义:q = queue.LifoQueue()
2.增:q.put(i)
3.删:p = q.get()
4.查:q.empty()

11.queue.PriorityQueue 优先权队列,默认调用类型内置比较函数,由堆实现,线程安全

1
2
3
4
1.定义:q = queue.PriorityQueue()
2.增:q.put(i) # 调用内置比较函数进行比较后加入堆
3.删:p = q.get()
4.查:q.empty()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from queue import PriorityQueue
import threading
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
# def __cmp__(self, other): # python3取消了内置cmp
# return cmp(self.priority, other.priority)
def __lt__(self, other):
return self.priority < other.priority
pq = PriorityQueue()
pq.put(Job(10, 'last to process'))
pq.put(Job(5, 'second to process'))
pq.put(Job(1, 'first to process'))
def process_job(q):
while True:
try:
member = q.get()
print(member.description)
except Exception as e:
print(e)
t1 = threading.Thread(target=process_job, args=(pq,))
t2 = threading.Thread(target=process_job, args=(pq,))
t = list()
t.append(t1)
t.append(t2)
for i in t:
i.start() # start()方法用于启动线程
pq.join() # q.join() 实际上意味着等到队列为空,再执行别的操作

12.struct 处理二进制

1
2
3
4
5
6
7
8
9
10
1.非缓存
import struct
import binascii
values = (1, 'ab'.encode('utf-8'), 2.7)
s = struct.Struct('I 2s f') # 对应各个值的类型
packed_data = s.pack(*values)
print(binascii.hexlify(packed_data))
unpacked_data = s.unpack(packed_data)
print(unpacked_data)
1
2
3
4
5
6
7
8
9
10
11
12
2.缓存
import binascii
import ctypes
import struct
s = struct.Struct('I 2s f')
values = (1, 'ab'.encode('utf-8'), 2.7)
b = ctypes.create_string_buffer(s.size) # 定义buffer的大小,可大于数据
print('Before :', binascii.hexlify(b.raw))
s.pack_into(b, 0, *values)
print('After :', binascii.hexlify(b.raw))
print('Unpacked:', s.unpack_from(b, 0))

算法(回头看)

日期与时间

1.time

1
2
3
4
5
6
import time
t1 = time.time() # 获取浮点型的秒数,可加减
t2 = time.ctime() # 获取格式化的时间
time.strptime(t2)) # 解析时间
time.strftime('%a %b %d %H:%M:%S %Y', time.strptime(t1)) # 格式化显示时间,(星期月日时分秒年)
time.sleep(2) # 交出当前线程,等待系统再次唤醒,只有一个线程即阻塞应用

2.datetime

3.calendar

数值计算

1
2
3
4
5
6
7
8
import decimal
a = decimal.Decimal('Infinity')
print(a + 1) # Infinity
b = decimal.Decimal('-Infinity')
print(b + 1) # -Infinity
c = decimal.Decimal('NaN')
print(c + 1) # NaN
1
2
3
4
5
6
7
import random
random.random() 0~1浮点数
random.seed(1) 伪随机数,程序生成随机值,种子同随机数同
random.uniform(m,n) m~n浮点数
random.randint(m,n) m~n整数
random.randrange(m,n,step) m~n按步长取的数
1
2
3
4
5
6
7
8
9
10
11
12
13
import math
pi = math.pi
e = math.e
math.trunc() 取小数的整数部分
math.modf() 返回浮点数的小数和整数
math.floor() 小于其一点的整数
math.ceil() 大于其一点的整数
math.fabs() 取绝对值
math.fsum() 求list和
math.pow(x,a) 幂运算
math.log(x,a) 取对数
math.log10(x) 取底数为10的对数

文件系统

1.linecache 高效读取文本文件

2.shutil 高级文件操作,如复制和设置权限

3.mmap 内存映射文件,进行读写,如原地修改文件某一行

4.StringIO 类文件的文本缓冲区,像读写文件一样处理内存缓冲区

5.os.path 文件路径的处理

数据持久存储

1.pickle 对象序列化

2.shelve 利用pickle持久存储对象

3.anydbm DBM数据库,以字符串为键的类字典接口

4.xml.etree.ElementTree 生成和解析XML文件

5.csv 处理csv文件

6.sqlite3 嵌入式关系型数据库,非MySQL等单独的数据库

1
1.定义数据库:db_filename = 'name.db'
1
2.连接数据库:with sqlite3.connect(db_filename) as conn: # 不存在数据库文件则新建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3.建数据表:schema_filename = 'name.sql'
with open(schema_filename, 'rt') as f: # t为自动识别Windows的换行符
schema = f.read()
conn.executescript(schema) # execute是执行一句
----------------------------------------------------------
conn.executescript("""建表sql语句""") # 语句长,阅读性不佳
e.g. conn.executescript("""
create table project(
name text primary key,
description text,
deadline date
);
create table task(
id integer primary key autoincrement not null,
priority integer default 1,
details text,
status text,
deadline date,
completed_on date,
project text not null references project(name)
);
""")
1
2
3
4
5
6
7
8
9
4.添加数据:conn.executescript("""多句查表sql语句"""")
e.g. conn.executescript("""
insert into project (name, description, deadline)
values ('pymotw', 'python module of the week', '2017-7-19');
insert into task (details, status, deadline, project)
values ('write about select', 'done', '2017-7-20', 'pymotw');
""")
1
2
3
4
5
6
5.查询数据:cur = conn.cursor()
cur.execute('select * from project')
print(cur.fetchone()) # 返回一行数据的tuple,fetchall返回所有查询结果的list
cur.close() # fetchmany(x)返回x个结果
若查询语句中有变量用?占位,如query="""select * from ?""",执行cur.execute(query, (project,)) 用元组来包含变量
若查询语句中某些变量重复多次用命名参数,query = """select * from :project_name""",执行的方法 为cur.execute(query, {'project_name':project_name})
1
6.update、delete方法与insert类似,会相关sql语法即可
1
7.事务隔离(有用时再看)

进程与线程

1
2
3
4
5
6
7
并行:
并发:
进程:
线程:
协程:
同步:
异步:

1.subprocess 创建附加进程

2.signal 异步系统事件

3.threading 管理并发操作,程序可在同一进程空间运行多个操作(并发,IO密集型操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import logging # 线程安全
import time
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s '
)
# def worker(num):
# """thread worker function"""
# print('worker:%s,%s' % (threading.current_thread().getName(), num))
# logging.debug('exiting')
#
# for i in range(10):
# t = threading.Thread(name=str(i), target=worker, args=(i,))
# t.start()
def daemon():
logging.debug('开始打印')
for i in range(5):
time.sleep(2)
print('daemon:', i)
def non_daemon():
logging.debug('开始打印')
for i in range(5):
print('noo_daemon:', i)
t1 = threading.Thread(name='daemon', target=daemon)
t1.setDaemon(True) # 守护进程可在主程序和其他进程结束时隐式结束
t2 = threading.Thread(name='non_daemon', target=non_daemon)
t1.start()
t2.start()
t1.join(20) # join可以使的等待进程运行完
print('t1 is alive', t1.is_alive())
t2.join()

4.multiprocessing 像线程一样管理进程(并行,cpu密集型操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
一般开启新进程
import multiprocessing
import time
def worker(num):
"""worker function"""
time.sleep(1)
print('worker:%s' % num)
if __name__ == '__main__': # 主要的逻辑部分卸载main中,或者单独写在一个文件中,确保新进程启动
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,)) # 参数必须能用pickle串行化
p.start() # 看不出谁先谁后,每个进程都在争夺资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
给进程命名
import multiprocessing
import time
def worker(num):
"""worker function"""
time.sleep(1)
print('worker:{:5}, name:{:10s}'.format(num, multiprocessing.current_process().name))
def service(num):
"""service function"""
time.sleep(1)
print('service:{:5}, name:{:10s}'.format(num, multiprocessing.current_process().name))
if __name__ == '__main__':
s = multiprocessing.Process(name='service', target=service, args=(1,))
w1 = multiprocessing.Process(name='worker', target=worker, args=(2,))
w2 = multiprocessing.Process(target=worker, args=(3,))
s.start()
w1.start()
w2.start()
每个进程都有一个默认名称,可在创建进程时改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
创建守护进程p.daemon=True,一般主程序会等待所有子进程完成后退出,守护进程会在主程序结束时结束
主进程使用join()会等待守护进程退出
import multiprocessing
import time
def worker(num):
"""worker function"""
time.sleep(1)
print('worker:{:5}, name:{:10s}'.format(num, multiprocessing.current_process().name))
def service(num):
"""service function"""
time.sleep(5)
print('service:{:5}, name:{:10s}'.format(num, multiprocessing.current_process().name))
if __name__ == '__main__':
s = multiprocessing.Process(name='service', target=service, args=(1,))
s.daemon = True
w = multiprocessing.Process(name='worker', target=worker, args=(2,))
s.start()
w.start()
s.join()
w.join()
注意daemon状态一定要在start前设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
通过进程间传递消息来分解工作,具体使用进程池
import multiprocessing
import time
def calculate(data):
return data * 2
def start_p():
return
if __name__ == '__main__':
start = time.time()
inputs = list(range(1000))
builtin_outputs = map(calculate, inputs)
end = time.time()
print('builtin_time:', end - start)
start1 = time.time()
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_p,
)
pool_outputs = pool.map(calculate, inputs)
pool.close()
pool.join()
end1 = time.time()
print('pool_time:', end1 - start1)

Internet

1.urllib.parse.urlparse 解析url的组成部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
http://netloc/path;param?query=arg#frag
返回namedtuple
(scheme, netloc, path, params, query, fragment, username, password, hostname, port)
机制 网络位置 路径 路径段参数 查询 片段 用户名 密码 主机名 端口
-------------------------------------------------------------------------------------
from urllib.parse import urlparse
from urllib.parse import urldefrag
url = 'https://www.baidu.com/index.php?tn=monline_3_dg'
parsed = urlparse(url) # 解析元素
print(parsed)
print(parsed.scheme)
url, frag = urldefrag(url) # 分成两部分
print(url, frag)
print(parsed.geturl()) # 反解析

2.urllib 访问不需要验证的远程资源

3.urllib2 使用url标识的Internet资源

4.json