朴素的解决方案
简单的算法是
- 对原始嵌套可迭代执行按元素复制。
- Make
n
元素复制的副本。
- 获取与每个独立副本相关的坐标。
这可以像这样实现
from itertools import tee
from operator import itemgetter
from typing import (Any,
Iterable,
Tuple,
TypeVar)
Domain = TypeVar('Domain')
def copy_nested_iterable(nested_iterable: Iterable[Iterable[Domain]],
*,
count: int = 2
) -> Tuple[Iterable[Iterable[Domain]], ...]:
def shallow_copy(iterable: Iterable[Domain]) -> Tuple[Iterable[Domain], ...]:
return tee(iterable, count)
copies = shallow_copy(map(shallow_copy, nested_iterable))
return tuple(map(itemgetter(index), iterables)
for index, iterables in enumerate(copies))
Pros:
Cons:
- 如果我们想以更高的嵌套级别扩展我们的可迭代方法(例如嵌套可迭代的可迭代等等),那么这种方法看起来没有帮助。
我们可以做得更好。
改进的解决方案
如果我们看一下itertools.tee功能文档 https://docs.python.org/3/library/itertools.html#itertools.tee,它包含Python配方,在以下帮助下functools.singledispatch装饰者 https://docs.python.org/3/library/functools.html#functools.singledispatch可以重写为
from collections import (abc,
deque)
from functools import singledispatch
from itertools import repeat
from typing import (Iterable,
Tuple,
TypeVar)
Domain = TypeVar('Domain')
@functools.singledispatch
def copy(object_: Domain,
*,
count: int) -> Iterable[Domain]:
raise TypeError('Unsupported object type: {type}.'
.format(type=type(object_)))
# handle general case
@copy.register(object)
# immutable strings represent a special kind of iterables
# that can be copied by simply repeating
@copy.register(bytes)
@copy.register(str)
# mappings cannot be copied as other iterables
# since they are iterable only by key
@copy.register(abc.Mapping)
def copy_object(object_: Domain,
*,
count: int) -> Iterable[Domain]:
return itertools.repeat(object_, count)
@copy.register(abc.Iterable)
def copy_iterable(object_: Iterable[Domain],
*,
count: int = 2) -> Tuple[Iterable[Domain], ...]:
iterator = iter(object_)
# we are using `itertools.repeat` instead of `range` here
# due to efficiency of the former
# more info at
# https://stackoverflow.com/questions/9059173/what-is-the-purpose-in-pythons-itertools-repeat/9098860#9098860
queues = [deque() for _ in repeat(None, count)]
def replica(queue: deque) -> Iterable[Domain]:
while True:
if not queue:
try:
element = next(iterator)
except StopIteration:
return
element_copies = copy(element,
count=count)
for sub_queue, element_copy in zip(queues, element_copies):
sub_queue.append(element_copy)
yield queue.popleft()
return tuple(replica(queue) for queue in queues)
Pros:
- 处理更深层次的嵌套,甚至是同一级别上的可迭代元素和不可迭代元素等混合元素,
- 可以扩展为用户定义的结构(例如,用于制作它们的独立深层副本)。
Cons:
- 可读性较差(但据我们所知“实用性胜过纯粹性” https://www.python.org/dev/peps/pep-0020/#the-zen-of-python),
- 提供了一些与调度相关的开销(但没关系,因为它基于字典查找,其中有
O(1)
复杂)。
Test
准备
让我们定义嵌套迭代如下
nested_iterable = [range(10 ** index) for index in range(1, 7)]
由于迭代器的创建没有提及底层副本的性能,因此让我们定义迭代器耗尽的函数(描述here https://mail.python.org/pipermail/python-ideas/2013-September/023488.html)
exhaust_iterable = deque(maxlen=0).extend
Time
Using timeit
package
import timeit
def naive(): exhaust_iterable(copy_nested_iterable(nested_iterable))
def improved(): exhaust_iterable(copy_iterable(nested_iterable))
print('naive approach:', min(timeit.repeat(naive)))
print('improved approach:', min(timeit.repeat(improved)))
我的笔记本电脑运行 Windows 10 x64,运行 Python 3.5.4
naive approach: 5.1863865
improved approach: 3.5602296000000013
Memory
Using memory_profiler package https://pypi.org/project/memory-profiler/
Line # Mem usage Increment Line Contents
================================================
78 17.2 MiB 17.2 MiB @profile
79 def profile_memory(nested_iterable: Iterable[Iterable[Any]]) -> None:
80 68.6 MiB 51.4 MiB result = list(flatten(flatten(copy_nested_iterable(nested_iterable))))
对于“天真的”方法和
Line # Mem usage Increment Line Contents
================================================
78 17.2 MiB 17.2 MiB @profile
79 def profile_memory(nested_iterable: Iterable[Iterable[Any]]) -> None:
80 68.7 MiB 51.4 MiB result = list(flatten(flatten(copy_iterable(nested_iterable))))
为“改进”之一。
Note:我已经运行了不同的脚本,因为立即运行它们不会具有代表性,因为第二条语句将重用之前在后台创建的内容int
对象。
结论
正如我们所看到的,这两个函数具有相似的性能,但最后一个函数支持更深层次的嵌套,并且看起来非常可扩展。
广告
我添加了“改进”的解决方案lz package https://pypi.org/project/lz from 0.4.0
可以像这样使用的版本
>>> from lz.replication import replicate
>>> iterable = iter(range(5))
>>> list(map(list, replicate(iterable,
count=3)))
[[0, 1, 2, 3, 4], [0, 1, 2, 3, 4], [0, 1, 2, 3, 4]]
它是基于属性的测试,使用hypothesis框架 https://hypothesis.works/,所以我们可以确定它按预期工作。