一,线程的锁
Q1:线程为什么要有锁
1.线程之间的数据安全问题:
+=,-=,*=,/=赋值操作不安全,如果涉及这些这些一定要加锁
2.pop.append是线程安全
3.队列也是数据安全的
4.多线程中,别在线程中操作全局变量
5.可以使用dic模块中的方法来查看某个操作对应的cpu指令
互斥锁与递归锁
#互斥锁Lockfrom threading import Thread,Lockn = 1500000def func(lock): global n for i in range(1500000): with lock: #如果不加锁,则会出现数据不安全的问题 n -= 1def func2(lcok): global n for i in range(1500000): lock.acquire() #如果不加锁,则会出现数据不安全的问题 n += 1 lock.release()if __name__ == '__main__': t_lst = [] lock = Lock() for i in range(10): t = Thread(target=func,args=(lock,)) t2 = Thread(target=func2,args=(lock,)) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('-->',n)
#互斥锁的死锁现象: #一共有两把锁 #线程之间是异步的 #操作的时候,抢到一把锁之后还要再去抢第二把锁 #一个线程抢到一把锁 #另一个线程抢到了另一把锁import timefrom threading import Thread,Locknoodle_lock = Lock()fork_lock = Lock()def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s吃面'%name) time.sleep(0.3) fork_lock.release() print('%s放下叉子'%name) noodle_lock.release() print('%s放下面'%name)def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name)if __name__ == '__main__': name_list = ['alex','wusir'] name_list2 = ['nezha','yuan'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start()
递归锁:
1.递归锁可以解决互斥锁的死锁问题
当多个线程抢占资源时,使用一把递归锁,解决死锁问题
2.递归锁的优缺点:
递归锁并不是一个好的解决方法
死锁现象的发生不是互斥锁的问题,而是程序员的逻辑问题导致的
递归锁能够快速的解决死锁问题
3.递归锁的特点:
当程序发生死锁的时候,能够迅速恢复服务,使用递归锁替换互斥锁
在后续修复中,逐步把递归锁替换成互斥锁:完善代码逻辑,提高代码的效率
4.递归锁的逻辑:
多个线程之间,一个线程用完一个资源再到另外一个线程取使用
先释放一个资源,再去获取一个资源的锁
#递归锁RLockfrom threading import Thread,RLockrlock = RLock()def func(name): rlock.acquire() print(name,1) rlock.acquire() print(name,2) rlock.acquire() print(name,3) rlock.acquire() print(name,4) rlock.release() rlock.release() rlock.release() rlock.release()if __name__ == '__main__': for i in range(10): Thread(target=func,args=('name%s'%i,)).start()
#科学家吃面问题,把两把互斥锁改为一把递归锁修复死锁问题import timefrom threading import Thread,RLock# noodle_fork_lock = Lock()noodle_lock = fork_lock = RLock()def eat1(name): # noodle_fork_lock.acquire() noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s吃面'%name) time.sleep(0.3) fork_lock.release() print('%s放下叉子'%name) noodle_lock.release() # noodle_fork_lock.release() print('%s放下面'%name) def eat2(name): # noodle_fork_lock.acquire() fork_lock.acquire() print('%s拿到叉子了'%name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面' % name) fork_lock.release() # noodle_fork_lock.release() print('%s放下叉子' % name)if __name__ == '__main__': name_list = ['alex','wusir'] name_list2 = ['nezha','yuan'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start()
二,线程的信号量
原理为:锁+计数器
1.并没有减少线程需要的并发数量
2.造成多个线程在等待资源的释放
#信号量Semaphoreimport timefrom threading import Semaphore,Threaddef func(index,sem): sem.acquire() print(index) time.sleep(1) sem.release()if __name__ == '__main__': sem = Semaphore(5) for i in range(10): Thread(target=func,args=(i,sem)).start()
三,线程的事件
线程与线程之间同步执行,一个线程等待wait()的信号,并由信号决定,线程释放执行,另一个线程负责决定信号的状态,主要用于防止线程长时间或一直处于阻塞状态
#事件:Event #实例:检测数据库连接 #先来测试一下数据库是否能被连接 #网通不通 #用户名,密码是否正确#wait()等待 事件内的信号变成True#set() 把信号变成True#clear() 把信号变成False#is_set() 查看信号状态是否为Trueimport timeimport randomfrom threading import Event,Threaddef check(e): print('开始检测数据库连接') time.sleep(random.randint(1,5)) #检测数据库连接的操作类比 e.set() #成功了def connect(e): for i in range(3): e.wait(timeout=0.5) if e.is_set(): print('数据库连接成功') break else: print('尝试连接数据库%s次失败'%(i+1)) else: raise TimeoutError #若超过连接次数,则主动抛出异常 if __name__ == '__main__': e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()
四,线程的条件
1.notify:控制流量,通知有多少人可以通过了,即设置多少个线程可执行,在使用前后都需要加锁
2.wait:在门口等待的所有人,即处于阻塞状态的线程,使用前后都需要加锁
#条件:Conditionfrom threading import Condition,Threaddef func(con,index): print('%s在等待'%index) con.acquire() print('%s在wait' % index) con.wait() print('% sdo something'%index) con.release()if __name__ == '__main__': con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() # con.acquire() # con.notify_all() #可以设置流量为不限制 # con.release() count = 10 while count > 0: num = int(input('>>>')) con.acquire() con.notify(num) count -= num con.release()
五,线程的定时器
1.子线程延迟执行不影响主程序的运行
2.可以实现 linux操作系统上的定时任务的工具 crontab
#定时器:Timerfrom threading import Timerdef func(): print('执行我啦')t = Timer(5,func) #延迟5秒执行t.start()print('主线程')
六,线程的queue
1.队列:queue
线程安全,一般用于排队相关逻辑
保证qps每秒钟接收请求数
帮助维护程序的响应的顺序
应用场景,服务器响应大量并发连接,将来不及响应的连接,存入队列中,并依次响应
#队列:queueimport queueq = queue.Queue()#队列的方法q.put()q.get()q.put_nowait()q.get_nowait()q.full() #不准确q.empty() #不准确
2.栈:LifoQueue
维护先进后出的顺序
应用场景:完成算法
#栈:LifoQueuefrom queue import LifoQueuelq = LifoQueue()#栈的方法lq.put()lq.get()
#实例:线程中的生产者消费者模型from threading import Threadimport queueimport timedef consumer(q,name): while True: ret = q.get() if ret is None:break print(name,'吃了',ret)def producer(q,name,food): for i in range(10): time.sleep(0.1) q.put(food+str(i)) print(name,'生产了',food+str(i))if __name__ == '__main__': q1 = queue.Queue() #用于线程间的IPC c = Thread(target=consumer,args=(q1,'wusir')) p = Thread(target=producer,args=(q1,'alex','饭菜')) c.start() p.start() p.join() q1.put(None)
3.优先级队列:PriorityQueue
按照优先级条件决定谁先出队列
队列中元素,数据类型需要一致,并且能狗满足比较大小的条件,否则报错
#优先级队列:PriorityQueue from queue import PriorityQueuepq = PriorityQueue()pq.put((15,'abc'))pq.put((12,'def')) #优先级按第二个元素的第一个字母的ascii码大小pq.put((12,'aaa')) #当第一个元素优先级一样是pq.put((5,'ghi'))# pq.put(5)# pq.put(12)# pq.put(15)print(pq.get())print(pq.get())print(pq.get())
七、总结
# 线程安全的问题# 数据类型是否线程安全# += -=不安全# set list dict 基础方法线程安全## 某块数据安全# logging模块 是线程安全的## 互斥锁和递归锁之间的区别# 出现死锁问题的本质# 两把锁,锁两个资源# 分别被不同线程抢占,导致死锁# 解决方法;# 速度上# 逻辑上# 线程中用到的模块# 锁,互斥锁,递归锁# 信号量# 事件# 队列
八,线程中local的概念
多个线程之间使用threading.local对象,可以实现多个线程之间的数据隔离
import timeimport randomfrom threading import local,Threadloc = local()def func2(): global loc print(loc.name,loc.age)def func(name,age): global loc loc.name = name loc.age = age time.sleep(random.random()) func2()Thread(target=func,args=('alex',40)).start()Thread(target=func,args=('boss_jin',38)).start()