- paramiko模块
- 进程、与线程区别
- python GIL全局解释器锁
- 多线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型
- 多进程 语法 join 进程Queue 进程Pipe 进程Manager 进程同步 进程池
安装:若已安装pip则:pip install paramiko
#! /usr/bin/env python3# -*- coding:utf-8 -*-import paramikotransport = paramiko.Transport("", 22)transport.connect(username="root", password="admin123")sftp = paramiko.SFTPClient.from_transport(transport)sftp.put("rsa.txt","/data/rsa.txt") # put file to Server(local file path, sever path)must write filename# sftp.get("/root/.ssh/id_rsa","id_rsa_1") # get file from Servertransport.close()
#! /usr/bin/env python3# -*- coding:utf-8 -*-import paramikoprivate_key = paramiko.RSAKey.from_private_key_file("id_rsa_1") # 指定私钥所在文件transport = paramiko.Transport("", 22)transport.connect(username="root", pkey=private_key)sftp = paramiko.SFTPClient.from_transport(transport)sftp.put("rsa.txt","/data/python6term/rsa.txt") # put file to Server(local file path, sever path)must write filename# sftp.get("/root/.ssh/id_rsa","id_rsa_1") # get file from Servertransport.close()
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。里面包含对各种资源的调用,内存的管理,网络接口的调用等。。。对各种资源管理的集合 就可以称为 进程
(3)创建新线程很简单; 创建新进程需要对其父进程进行一次克隆
3.Python GIL(Global Interpreter Lock)
无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,这还叫什么多线程呀?
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
more: http://www.dabeaz.com/python/UnderstandingGIL.pdf
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time def func(name): print("hello", name) time.sleep(3) t1 = threading.Thread(target=func,args=("alex",)) t2 = threading.Thread(target=func,args=("zingp",)) t1.start() # 并发 t2.start() # 并发 # func("alex") # 先执行 # func("zingp") # 再执行启线程法2(类):
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time # tread类方法调用 class MyTread(threading.Thread): def __init__(self,name): super(MyTread,self).__init__() self.name = name def run(self): print("hello",self.name) time.sleep(3) t1 = MyTread("alex") t2 = MyTread("zingp") t1.start() # 并发 t2.start() # 并发4.2. join():主程序(主线程)会等待其他线程执行完
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time def func(name): print("i am ", name) time.sleep(3) print("this tread done...") star_time = time.time() # 这里主线程和主线程启动的50个线程均为并行,互不影响;相当于51个线程并发 res = [] for i in range(50): t = threading.Thread(target=func, args=(i,)) t.start() res.append(t) # 每启动一个线程,就将这个实例加入列表 for j in res: # 历遍所启动的50个线程实例 j.join() print("all tread has finished...", threading.current_thread()) print("total time:",time.time()-star_time)4.3 Daemon:设置守护线程。程序会等待【非守护线程】结束才退出,不会等【守护线程】
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time def func(name): print("i am ", name) time.sleep(1) print("this tread done...") star_time = time.time() # 这里主线程和主线程启动的50个线程均为并行,互不影响;相当于51个线程并发 for i in range(50): t = threading.Thread(target=func, args=(i,)) t.setDaemon(True) # 将当前线程设置为守护线程,程序会等待【非守护线程】结束才退出,不会等【守护线程】。 t.start() print("all tread has finished...", threading.current_thread(),threading.active_count()) print("total time:",time.time()-star_time)4.4 线程锁、互斥锁Mutex
import time
import threading def addNum(): global num #在每个线程中都获取这个全局变量 print('--get num:',num ) time.sleep(1) num -=1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('final num:', num )正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
import timeimport threading def addNum(): global num #在每个线程中都获取这个全局变量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改数据前加锁 num -=1 #对此公共变量进行-1操作 lock.release() #修改后释放 num = 100 #设定一个共享变量thread_list = []lock = threading.Lock() #生成全局锁for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('final num:', num )
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体看图:
import threading, time
def run1(): print("grab the first part data") lock.acquire() global num num += 1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res, res2) num, num2 = 0, 0 lock = threading.RLock() for i in range(1): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num, num2)Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time def func(n): semaphore.acquire() time.sleep(1) print("this thread is %s\n" % n) semaphore.release() semaphore = threading.BoundedSemaphore(5) # 信号量 for i in range(23): t = threading.Thread(target=func,args=(i,)) t.start() while threading.active_count() != 1: pass # print(threading.active_count()) else:print("all threads is done...")4.5. Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.event = threading.Event()# a client thread can wait for the flag to be setevent.wait()# a server thread can set or reset itevent.set()event.clear()If the flag is set, the wait method doesn’t do anything.If the flag is cleared, wait will block until it becomes set again.Any number of threads may wait for the same event.通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import threading import time def lighter(): count = 0 event.set() while True: if 5 < count < 10: event.clear() print("This is RED....") elif count > 10: event.set() count = 0 else: print("This is GREEN...") time.sleep(1) count += 1 def car(name): while True: if event.is_set(): print(" Green, The %s running...." % name) time.sleep(1) else: print("RED, the %s is waiting..." % name) event.wait() print("green, %s start going..." % name) event = threading.Event() light = threading.Thread(target=lighter,) light.start() car1 = threading.Thread(target=car,args=("Tesla",)) car1.start()4.6. 队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
(maxsize=0) #先入先出 - class
(maxsize=0) #last in fisrt out class queue.
(maxsize=0) #存储数据时可设置优先级的队列 -
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by
). A typical pattern for entries is a tuple in the form:(priority_number, data)
- exception
Exception raised when non-blocking (or ) is called on a object which is empty.
- exception
Exception raised when non-blocking (or ) is called on a object which is full.
() Queue.
() #return True if empty Queue.
() # return True if full -
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the exception (timeout is ignored in that case).
(item, block=True, timeout=None) -
Equivalent to
put(item, False)
(item) -
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the exception (timeout is ignored in that case).
(block=True, timeout=None) -
Equivalent to
() Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each used to fetch a task, a subsequent call to tells the queue that the processing on the task is complete.
If a is currently blocking, it will resume when all items have been processed (meaning that a call was received for every item that had been into the queue).
Raises a if called more times than there were items placed in the queue.
() Queue.
() block直到queue被消费完毕
#! /usr/bin/env python3
# -*- coding:utf-8 -*- # 正常的队列是先进先出 import queue q = queue.Queue(maxsize=10) # 设置队列大小,默认为无限大 q.put(1) q.put(8) q.put("alex") q.put("zingp") print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 1 # 8 # jesson # zingp # print(q.get(timeout=5)) # put()和get()都可以设置超时时间,若设置,超时会报错。没设置则会卡住(阻塞) q2 = queue.LifoQueue() # 后进先出 q2.put(1) q2.put(2) q2.put("zingp") print(q2.get()) print(q2.get()) print(q2.get()) # zingp # 2 # 1 q3 = queue.PriorityQueue() # 设置优先级 q3.put((-1,"chenronghua")) q3.put((6,"hanyang")) q3.put((10,"jesson")) q3.put((4,"wangsen")) print(q3.get()) print(q3.get()) print(q3.get()) print(q3.get()) # (-1, 'chenronghua') # (4, 'wangsen') # (6, 'hanyang') # (10, 'jesson')4.7. 生产者消费者模型
import threading,time
import queue q = queue.Queue(maxsize=10) def Producer(name): count = 1 while True: q.put("骨头%s" % count) print("生产了骨头",count) count +=1 time.sleep(0.1) def Consumer(name): #while q.qsize()>0: while True: print("[%s] 取到[%s] 并且吃了它..." %(name, q.get())) time.sleep(1) p = threading.Thread(target=Producer,args=("Alex",)) c = threading.Thread(target=Consumer,args=("ChengRonghua",)) c1 = threading.Thread(target=Consumer,args=("王森",)) p.start() c.start() c1.start()5.多进程
#! /usr/bin/env python3
# -*- coding:utf-8 -*- import multiprocessing import time,threading def thread_id(): """获得线程ID。""" print(" thread..") print("thread_id:%s\n" % threading.get_ident()) def hello(name): time.sleep(2) print("hello %s..." % name) # 启一个线程 t = threading.Thread(target=thread_id,) t.start() if __name__ == "__main__": # windows环境下必须写这句,不写会报错 for i in range(10): # 启一个进程和一个线程的语法都差不多 p = multiprocessing.Process(target=hello,args=("progress %s" % i,)) p.start()
5.2 每一个进程都是由父进程创建
#! /usr/bin/env python3
# -*- coding:utf-8 -*- # 每一个进程都是由父进程创建的import multiprocessingimport osdef info(title):
print(title) print("module name:", __name__) print("parent process:",os.getppid()) print("process id:",os.getpid()) print("\n")def f(name):
info("child process..") print("hello",name)if __name__ == "__main__":
info("\033[31;1m main process\033[0m ") p = multiprocessing.Process(target=f,args=("jack",)) p.start()5.3 进程Queue:进程间的通讯(数据传递)
#! /usr/bin/env python3
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()5.4进程Pipe:通过管道实现进程间的通讯
The function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
#! /usr/bin/env python3
# -*- coding:utf-8 -*-# 进程之间也可以通过管道通讯from multiprocessing import Process,Pipe
def f(conn):
conn.send([42,None,"hello from child"]) # 发数据 conn.send([42, None, "hello from child"]) # 多次发数据 print("from parent:",conn.recv()) # 收数据 conn.close()if __name__ == "__main__":
parent_conn,child_conn = Pipe() # 创建管道,通过管道实现进程间通讯 p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) # 接收数据 print(parent_conn.recv()) # 多次接收数据 parent_conn.send("hello zingp......") p.join()5.5进程Manager:真正实现进程间的数据共享(不只是数据传递)
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types , , , , , , , , , , , and . For example,
#! /usr/bin/env python3
# -*- coding:utf-8 -*- # 实现了进程之间的数据共享 from multiprocessing import Process, Manager import os def f(d,l): d["name"] = "alex" d["sex"] = "Man" d["age"] = 33 l.append(os.getpid()) print(l) if __name__ == "__main__": with Manager() as manager: d = manager.dict() # 生成一个字典,可以在多个进程直接共享和传递 l = manager.list(range(5)) # 生成一个列表,可以在多个进程直接共享和传递 res = [] for i in range(10): p = Process(target=f,args=(d,l)) p.start() res.append(p) for j in res: # 等待结果 j.join() print(d) print(l)5.6 进程同步
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock
def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()5.7 进程池Pool
- apply
- apply_async
#! /usr/bin/env python3
# -*- coding:utf-8 -*-from multiprocessing import Pool
import os,timedef Foo(i):
time.sleep(2) print("in process",os.getpid()) return i + 100def Bar(arg):
print("-->exec done:",arg,os.getpid())if __name__ == "__main__":
pool = Pool(processes=3) # 允许进程池同时放入5个进程 print("主进程:",os.getpid()) for i in range(10): pool.apply_async(func=Foo,args=(1,),callback=Bar) # callback = 回调 # 这里回调的函数是主进程去回调的(生产中若所有进程完毕后将结果写入数据库,只需要写个回调就行了,不必每个进程中写入数据库) # pool.apply(func=Foo,args=(1,)) # 串行 # pool.apply_async(func=Foo,args=(1,)) # 并行print("end")
pool.close() pool.join() # 这里一定是先close再join否则会出问题。。。如果注释掉该句,程序会直接关闭