gevent中与"线程"相关的几个例子

标题虽然说是线程,其实gevent用的是“greenlet”,可能翻译成"微线程"更合适一些。

1、线程池

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import time
import gevent
from gevent.threadpool import ThreadPool
def my_func(text, num):
print text, num
pool = ThreadPool(100)
start = time.time()
for i in xrange(100000):
pool.spawn(my_func, "Hello", i)
pool.join()
delay = time.time() - start
print('Take %.3f seconds' % delay)
import time import gevent from gevent.threadpool import ThreadPool def my_func(text, num): print text, num pool = ThreadPool(100) start = time.time() for i in xrange(100000): pool.spawn(my_func, "Hello", i) pool.join() delay = time.time() - start print('Take %.3f seconds' % delay)
import time
import gevent
from gevent.threadpool import ThreadPool

def my_func(text, num):
    print text, num

pool = ThreadPool(100)
start = time.time()
for i in xrange(100000):
    pool.spawn(my_func, "Hello", i)
pool.join()
delay = time.time() - start
print('Take %.3f seconds' % delay)

2、 一生产者多消费者

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import JoinableQueue
from gevent.queue import Empty
q = JoinableQueue()
def consumer():
while True:
item = q.get()
try:
print "consume", item
finally:
q.task_done()
# Start
start = time.time()
# Start consumers
jobs = [gevent.spawn(consumer) for i in xrange(20)]
# Make something to be consume
for i in xrange(100000):
#time.sleep(0.1)
q.put(i)
q.join()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
import time import gevent from gevent.threadpool import ThreadPool from gevent.queue import JoinableQueue from gevent.queue import Empty q = JoinableQueue() def consumer(): while True: item = q.get() try: print "consume", item finally: q.task_done() # Start start = time.time() # Start consumers jobs = [gevent.spawn(consumer) for i in xrange(20)] # Make something to be consume for i in xrange(100000): #time.sleep(0.1) q.put(i) q.join() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import JoinableQueue
from gevent.queue import Empty

q = JoinableQueue()

def consumer():
    while True:
        item = q.get()
        try:
            print "consume", item
        finally:
            q.task_done()

# Start
start = time.time()

# Start consumers
jobs = [gevent.spawn(consumer) for i in xrange(20)]

# Make something to be consume
for i in xrange(100000):
    #time.sleep(0.1)
    q.put(i)
q.join()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

3、多生产者(用Pool),1消费者(单独线程)

这个略反常规,如果我们假设生产者是I/O密集的Job,由Pool中的Job产生。而消费者只有1个。

用了kill,写的不太优雅,各路大神可以给提提意见。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty
q = Queue()
def consumer():
while True:
item = q.get(0.1)
try:
print "consume", item
except Empty:
pass
def producer(n):
for i in xrange(10):
#time.sleep(0.1)
q.put(1000*n+i)
# Start
start = time.time()
# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
pool.spawn(producer, i)
job = gevent.spawn(consumer)
pool.join()
job.kill()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
import time import gevent from gevent.threadpool import ThreadPool from gevent.queue import Queue from gevent.queue import Empty q = Queue() def consumer(): while True: item = q.get(0.1) try: print "consume", item except Empty: pass def producer(n): for i in xrange(10): #time.sleep(0.1) q.put(1000*n+i) # Start start = time.time() # Start producer using pool pool = ThreadPool(10) for i in xrange(100): pool.spawn(producer, i) job = gevent.spawn(consumer) pool.join() job.kill() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty

q = Queue()

def consumer():
    while True:
        item = q.get(0.1)
        try:
            print "consume", item
        except Empty:
            pass

def producer(n):
    for i in xrange(10):
        #time.sleep(0.1)
        q.put(1000*n+i)

# Start
start = time.time()

# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
    pool.spawn(producer, i)

job = gevent.spawn(consumer)

pool.join()
job.kill()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

Update下,我们可以稍微改进下:当pool所有job都完成后,给queue中加入一个特殊的"StopIteration" ,然后来让consumer退出:-)

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import time
import random
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty
q = Queue()
def consumer():
while True:
#item = q.get(0.1)
item = q.get()
if item == StopIteration:
break
try:
print "consume", item
except StopIteration:
break
except Empty:
pass
def producer(n):
for i in xrange(10):
#time.sleep(random.random())
q.put(1000*n+i)
# Start
start = time.time()
# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
pool.spawn(producer, i)
job = gevent.spawn(consumer)
pool.join()
q.put(StopIteration)
job.join()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
import time import random import gevent from gevent.threadpool import ThreadPool from gevent.queue import Queue from gevent.queue import Empty q = Queue() def consumer(): while True: #item = q.get(0.1) item = q.get() if item == StopIteration: break try: print "consume", item except StopIteration: break except Empty: pass def producer(n): for i in xrange(10): #time.sleep(random.random()) q.put(1000*n+i) # Start start = time.time() # Start producer using pool pool = ThreadPool(10) for i in xrange(100): pool.spawn(producer, i) job = gevent.spawn(consumer) pool.join() q.put(StopIteration) job.join() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
import time
import random
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty

q = Queue()

def consumer():
    while True:
        #item = q.get(0.1)
        item = q.get()
        if item == StopIteration:
            break
        try:
            print "consume", item
        except StopIteration:
            break
        except Empty:
            pass

def producer(n):
    for i in xrange(10):
        #time.sleep(random.random())
        q.put(1000*n+i)

# Start
start = time.time()

# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
    pool.spawn(producer, i)

job = gevent.spawn(consumer)

pool.join()
q.put(StopIteration)
job.join()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

 

 

 

 

 

 

 

 

One thought on “gevent中与"线程"相关的几个例子

Leave a Reply

Your email address will not be published. Required fields are marked *