在Python编程中,迭代器和生成器是处理大数据流和延迟计算的核心工具。它们通过按需生成数据而非一次性加载全部内容,显著提升内存效率。本文将深入剖析其工作机制,并通过实战案例展示如何利用它们优化代码性能。
核心概念解析
迭代器协议
迭代器是实现了__iter__()
和__next__()
方法的对象。当容器类实现__iter__()
返回迭代器对象,而迭代器本身实现__next__()
返回下一个元素时,便满足迭代器协议:
class Squares:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __iter__(self):
return self
def __next__(self):
if self.n >= self.limit:
raise StopIteration
result = self.n ** 2
self.n += 1
return result
# 使用示例
for num in Squares(5):
print(num) # 输出 0, 1, 4, 9, 16
关键特性:
– 状态保持:迭代器内部维护状态(如self.n
)
– 惰性求值:仅在调用__next__()
时计算
– 一次性消费:迭代耗尽后无法重复使用
生成器函数
生成器是通过yield
关键字实现的特殊函数,自动满足迭代器协议:
def fibonacci(limit):
a, b = 0, 1
while a < limit:
yield a
a, b = b, a + b
# 使用示例
gen = fibonacci(100)
print(list(gen)) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89]
执行机制:
1. 调用生成器函数返回生成器对象(不立即执行)
2. 每次next()
执行到yield
暂停,保存局部变量状态
3. 再次调用next()
时从暂停点恢复执行
高级应用场景
大数据处理管道
生成器可构建高效的数据处理管道,避免内存爆炸:
def read_large_file(file_path):
with open(file_path) as f:
for line in f:
yield line.strip()
def filter_keywords(lines, keywords):
for line in lines:
if any(keyword in line for keyword in keywords):
yield line
# 构建处理管道
lines = read_large_file('server.log')
filtered = filter_keywords(lines, ['ERROR', 'CRITICAL'])
stats = {line: len(line) for line in filtered}
优势:
– 单行数据常驻内存
– 支持TB级文件处理
– 管道组件可独立测试
协程与双向通信
生成器通过.send()
方法支持双向通信,实现简单协程:
def coroutine():
total = 0
while True:
value = yield total
total += value
co = coroutine()
next(co) # 启动生成器
print(co.send(10)) # 输出10
print(co.send(20)) # 输出30
性能优化对比
内存占用测试
通过sys.getsizeof()
对比不同实现的内存消耗:
import sys
# 列表推导式
list_comp = [x**2 for x in range(1000000)]
print(sys.getsizeof(list_comp)) # 约8448728字节
# 生成器表达式
gen_exp = (x**2 for x in range(1000000))
print(sys.getsizeof(gen_exp)) # 约128字节
执行时间分析
使用timeit
模块测试不同方案的耗时:
import timeit
setup = '''
def count_up(n):
i = 0
while i < n:
yield i
i += 1
'''
print(timeit.timeit('sum(count_up(1000000))', setup=setup, number=100))
# 约4.3秒(生成器版本)
print(timeit.timeit('sum([i for i in range(1000000)])', number=100))
# 约3.8秒(列表版本)
结论:
– 生成器在内存敏感场景优势明显
– 列表在CPU密集型操作中更快
– 实际选择需权衡空间与时间成本
行业最佳实践
现代Python改进
Python 3.3+引入yield from
语法简化嵌套生成器:
def chain(*iterables):
for it in iterables:
yield from it
list(chain('ABC', 'DEF')) # ['A', 'B', 'C', 'D', 'E', 'F']
异步生成器
Python 3.6+的异步生成器支持异步流处理:
async def async_fetch(urls):
for url in urls:
data = await aiohttp.request('GET', url)
yield await data.json()
async for result in async_fetch(url_list):
process(result)
常见陷阱与解决方案
-
已耗尽迭代器:
gen = (x for x in range(3)) list(gen) # [0, 1, 2] list(gen) # []
解决方案:使用
itertools.tee
创建副本或重构生成器 -
资源泄漏:
def read_files(): for file in glob('*.log'): yield open(file).read() # 文件句柄未关闭
修复方案:
def read_files_safe(): for file in glob('*.log'): with open(file) as f: yield f.read()
-
过早求值:
data = filter(lambda x: x>0, [-1, 0, 1]) processed = [x*2 for x in data] # 正确用法 length = len(list(data)) # 消耗迭代器
设计模式应用
观察者模式实现
利用生成器构建事件消费者:
class EventStream:
def __init__(self):
self._subscribers = []
def subscribe(self):
gen = self._create_generator()
self._subscribers.append(gen)
return gen
def _create_generator(self):
while True:
msg = yield
print(f"Received: {msg}")
def publish(self, msg):
for sub in self._subscribers:
sub.send(msg)
# 使用示例
stream = EventStream()
consumer = stream.subscribe()
next(consumer) # 启动生成器
stream.publish("ALERT") # 输出"Received: ALERT"
该模式在微服务架构中常用于实现消息广播,相比回调函数方案更易维护。