On this page
itertools —创建用于高效循环的迭代器的函数
该模块实现了许多iterator构建块,这些构建块受 APL,Haskell 和 SML 的构造启发。每个都已经以适合 Python 的形式进行了重铸。
该模块标准化了一组核心的快速,高效内存工具,这些工具本身或结合使用很有用。它们共同构成了一个“迭代器代数”,从而可以在纯 Python 中简洁高效地构建专用工具。
例如,SML 提供了一个制表工具:tabulate(f)
,它产生一个序列f(0), f(1), ...
。pass将map()和count()组合成map(f, count())
,可以在 Python 中获得相同的效果。
这些工具及其内置的对应工具也可以与operator模块中的高速Function很好地配合使用。例如,可以将乘法运算符 Map 到两个向量上,以形成有效的点积:sum(map(operator.mul, vector1, vector2))
。
Infinite iterators:
Iterator | Arguments | Results | Example |
---|---|---|---|
count() | start, [step] | 开始,开始,开始 2 *步骤,... | count(10) --> 10 11 12 13 14 ... |
cycle() | p | p0,p1,…plast,p0,p1,… | cycle('ABCD') --> A B C D A B C D ... |
repeat() | elem [,n] | elem,elem,elem……无休止或最多 n 次 | repeat(10, 3) --> 10 10 10 |
迭代器以最短的 Importing 序列终止:
Iterator | Arguments | Results | Example |
---|---|---|---|
accumulate() | p [,func] | p0,p0 p1,p0 p1 p2,... | accumulate([1,2,3,4,5]) --> 1 3 6 10 15 |
chain() | p,q,... | p0,p1,…plast,q0,q1,… | chain('ABC', 'DEF') --> A B C D E F |
chain.from_iterable() | iterable | p0,p1,…plast,q0,q1,… | chain.from_iterable(['ABC', 'DEF']) --> A B C D E F |
compress() | data, selectors | (如果 s [0]为 d [0]),(如果 s [1]为 d [1]),... | compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F |
dropwhile() | pred, seq | seq [n],seq [n 1],在 pred 失败时开始 | dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1 |
filterfalse() | pred, seq | seq 的元素,其中 pred(elem)为 false | filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8 |
groupby() | iterable[, key] | 子重复项按键值(v)分组 | |
islice() | seq,[开始,]停止[,步骤] | seq 中的元素[开始:停止:步骤] | islice('ABCDEFG', 2, None) --> C D E F G |
starmap() | func, seq | func(* seq [0]),func(* seq [1]),… | starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000 |
takewhile() | pred, seq | seq [0],seq [1],直到 pred 失败 | takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4 |
tee() | it, n | it1,it2,…itn 将一个迭代器拆分为 n | |
zip_longest() | p,q,... | (p [0],q [0]),(p [1],q [1]),... | zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D- |
Combinatoric iterators:
Iterator | Arguments | Results |
---|---|---|
product() | p,q,…[repeat = 1] | 笛卡尔积,等效于嵌套的 for 循环 |
permutations() | p[, r] | r 长度 Tuples,所有可能的排序,没有重复的元素 |
combinations() | p, r | r 长度 Tuples,按排序 Sequences,无重复元素 |
combinations_with_replacement() | p, r | r 长度 Tuples,按排序 Sequences,带有重复的元素 |
Examples | Results |
---|---|
product('ABCD', repeat=2) |
AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD |
permutations('ABCD', 2) |
AB AC AD BA BC BD CA CB CD DA DB DC |
combinations('ABCD', 2) |
AB AC AD BC BD CD |
combinations_with_replacement('ABCD', 2) |
AA AB AC AD BB BC BD CC CD DD |
Itertool functions
以下模块负责所有构造和返回迭代器。有些提供无限长的流,因此只能由截断该流的函数或循环访问。
itertools.
accumulate
(* iterable * [,* func ,**,* initial = None *])- 使迭代器返回累加的总和或其他二进制函数的累加结果(pass可选的* func *参数指定)。
如果提供了* func ,则它应该是两个参数的函数。Importing iterable 的元素可以是 func *的参数可以接受的任何类型。 (例如,使用默认的加法操作,元素可以是任何可加类型,包括Decimal或Fraction。)
通常,输出的元素数与 Importing 的可迭代数匹配。但是,如果提供了关键字参数* initial ,则累加会以 initial *值开始,因此输出的元素多于 Importing 的可迭代元素。
大致相当于:
def accumulate(iterable, func=operator.add, *, initial=None):
'Return running totals'
# accumulate([1,2,3,4,5]) --> 1 3 6 10 15
# accumulate([1,2,3,4,5], initial=100) --> 100 101 103 106 110 115
# accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120
it = iter(iterable)
total = initial
if initial is None:
try:
total = next(it)
except StopIteration:
return
yield total
for element in it:
total = func(total, element)
yield total
- func 参数有多种用途。对于最小运行量,可以将其设置为min();对于最大运行量,可以将其设置为max();对于运行中的产品,则可以设置为operator.mul()。可以pass累计利息和应用付款来构建摊销表。一阶recurrence relations可以pass在 iterable 中提供初始值并仅使用 func *参数中的累加总数来建模:
>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8]
>>> list(accumulate(data, operator.mul)) # running product
[3, 12, 72, 144, 144, 1296, 0, 0, 0, 0]
>>> list(accumulate(data, max)) # running maximum
[3, 4, 6, 6, 6, 9, 9, 9, 9, 9]
# Amortize a 5% loan of 1000 with 4 annual payments of 90
>>> cashflows = [1000, -90, -90, -90, -90]
>>> list(accumulate(cashflows, lambda bal, pmt: bal*1.05 + pmt))
[1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001]
# Chaotic recurrence relation https://en.wikipedia.org/wiki/Logistic_map
>>> logistic_map = lambda x, _: r * x * (1 - x)
>>> r = 3.8
>>> x0 = 0.4
>>> inputs = repeat(x0, 36) # only the initial value is used
>>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)]
['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63',
'0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57',
'0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32',
'0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60']
有关仅返回finally累加值的类似函数,请参见functools.reduce()。
3.2 版中的新Function。
在版本 3.3 中更改:添加了可选的* func *参数。
在 3.8 版中进行了更改:添加了可选的* initial *参数。
itertools.
chain
(** iterables *)- 创建一个迭代器,该迭代器从第一个可迭代对象返回元素,直到耗尽为止,然后 continue 进行下一个可迭代对象,直到所有可迭代对象都耗尽为止。用于将连续序列视为单个序列。大致相当于:
def chain(*iterables):
# chain('ABC', 'DEF') --> A B C D E F
for it in iterables:
for element in it:
yield element
- 类方法
chain.
from_iterable
(可迭代)- chain()的备用构造函数。从一个延迟计算的单个可迭代参数获取链接 Importing。大致相当于:
def from_iterable(iterables):
# chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
for it in iterables:
for element in it:
yield element
itertools.
combinations
(* iterable , r *)- 从* iterable Importing 中返回元素的 r *个长度子序列。
根据 Importing* iterable 的 Sequences,按字典 Sequences 发出组合 Tuples。因此,如果对 Importing iterable *进行排序,则将按排序 Sequences 生成组合 Tuples。
元素根据其位置而不是其价值被视为唯一。因此,如果 Importing 元素是唯一的,则每个组合中将没有重复值。
大致相当于:
def combinations(iterable, r):
# combinations('ABCD', 2) --> AB AC AD BC BD CD
# combinations(range(4), 3) --> 012 013 023 123
pool = tuple(iterable)
n = len(pool)
if r > n:
return
indices = list(range(r))
yield tuple(pool[i] for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != i + n - r:
break
else:
return
indices[i] += 1
for j in range(i+1, r):
indices[j] = indices[j-1] + 1
yield tuple(pool[i] for i in indices)
在过滤元素未按排序 Sequences(根据它们在 Importing 池中的位置)的条目之后,combinations()的代码也可以表示为permutations()的子序列:
def combinations(iterable, r):
pool = tuple(iterable)
n = len(pool)
for indices in permutations(range(n), r):
if sorted(indices) == list(indices):
yield tuple(pool[i] for i in indices)
返回的项目数是0 <= r <= n
时为n! / r! / (n-r)!
,或r > n
时为零。
itertools.
combinations_with_replacement
(* iterable , r *)- 从 Importing* iterable 返回元素的 r *长度子序列,从而允许单个元素重复多次。
根据 Importing* iterable 的 Sequences,按字典 Sequences 发出组合 Tuples。因此,如果对 Importing iterable *进行排序,则将按排序 Sequences 生成组合 Tuples。
元素根据其位置而不是其价值被视为唯一。因此,如果 Importing 元素是唯一的,则生成的组合也将是唯一的。
大致相当于:
def combinations_with_replacement(iterable, r):
# combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CC
pool = tuple(iterable)
n = len(pool)
if not n and r:
return
indices = [0] * r
yield tuple(pool[i] for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != n - 1:
break
else:
return
indices[i:] = [indices[i] + 1] * (r - i)
yield tuple(pool[i] for i in indices)
在过滤元素未按排序 Sequences(根据它们在 Importing 池中的位置)的条目之后,combinations_with_replacement()的代码也可以表示为product()的子序列:
def combinations_with_replacement(iterable, r):
pool = tuple(iterable)
n = len(pool)
for indices in product(range(n), repeat=r):
if sorted(indices) == list(indices):
yield tuple(pool[i] for i in indices)
当n > 0
时,返回的项目数为(n+r-1)! / r! / (n-1)!
。
3.1 版中的新Function。
itertools.
compress
(* data , selectors *)- 创建一个迭代器,以从* data 过滤元素,仅返回那些在 selectors 中具有对应元素且其值为
True
的元素。当 data 或 selectors *可迭代项用尽时停止。大致相当于:
- 创建一个迭代器,以从* data 过滤元素,仅返回那些在 selectors 中具有对应元素且其值为
def compress(data, selectors):
# compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
return (d for d, s in zip(data, selectors) if s)
3.1 版中的新Function。
itertools.
count
(* start = 0 , step = 1 *)
def count(start=0, step=1):
# count(10) --> 10 11 12 13 14 ...
# count(2.5, 0.5) -> 2.5 3.0 3.5 ...
n = start
while True:
yield n
n += step
用浮点数进行计数时,有时可以pass替换诸如(start + step * i for i in count())
的乘法代码来获得更好的精度。
在版本 3.1 中更改:添加了* step *参数,并允许使用非整数参数。
itertools.
cycle
(可迭代)- 创建一个迭代器,从迭代器返回元素,并保存每个元素的副本。当 iterable 耗尽时,从保存的副本中返回元素。无限重复。大致相当于:
def cycle(iterable):
# cycle('ABCD') --> A B C D A B C D A B C D ...
saved = []
for element in iterable:
yield element
saved.append(element)
while saved:
for element in saved:
yield element
请注意,工具箱的此成员可能需要大量辅助存储(取决于迭代对象的长度)。
itertools.
dropwhile
(* predicate , iterable *)- 只要谓词为真,就创建一个迭代器,从迭代器中删除元素。之后,返回每个元素。注意,在谓词首先变为假之前,迭代器不会产生* any *输出,因此启动时间可能很长。大致相当于:
def dropwhile(predicate, iterable):
# dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1
iterable = iter(iterable)
for x in iterable:
if not predicate(x):
yield x
break
for x in iterable:
yield x
itertools.
filterfalse
(* predicate , iterable *)- 创建一个迭代器,从可迭代的元素中筛选出仅返回谓词为
False
的元素。如果* predicate *为None
,则返回错误的项目。大致相当于:
- 创建一个迭代器,从可迭代的元素中筛选出仅返回谓词为
def filterfalse(predicate, iterable):
# filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8
if predicate is None:
predicate = bool
for x in iterable:
if not predicate(x):
yield x
itertools.
groupby
(* iterable , key = None *)- 创建一个迭代器,该迭代器从* iterable *返回连续的键和组。 * key 是一个为每个元素计算键值的函数。如果未指定或为
None
,则 key *默认为标识函数,并返回不变的元素。通常,可迭代项需要已经在相同的键Function上进行了排序。
- 创建一个迭代器,该迭代器从* iterable *返回连续的键和组。 * key 是一个为每个元素计算键值的函数。如果未指定或为
groupby()的操作类似于 Unix 中的uniq
过滤器。每当键函数的值更改时,它都会生成一个break或新组(这就是为什么通常需要使用相同的键函数对数据进行排序的原因)。这种行为与 SQL 的 GROUP BY 不同,后者会聚集通用元素,而不管它们的 ImportingSequences 如何。
返回的组本身是一个迭代器,与groupby()共享基础可迭代对象。因为源是共享的,所以当groupby()对象高级时,上一个组将不再可见。因此,如果以后需要该数据,则应将其存储为列表:
groups = []
uniquekeys = []
data = sorted(data, key=keyfunc)
for k, g in groupby(data, keyfunc):
groups.append(list(g)) # Store group iterator as a list
uniquekeys.append(k)
groupby()大致等于:
class groupby:
# [k for k, g in groupby('AAAABBBCCDAABBB')] --> A B C D A B
# [list(g) for k, g in groupby('AAAABBBCCD')] --> AAAA BBB CC D
def __init__(self, iterable, key=None):
if key is None:
key = lambda x: x
self.keyfunc = key
self.it = iter(iterable)
self.tgtkey = self.currkey = self.currvalue = object()
def __iter__(self):
return self
def __next__(self):
self.id = object()
while self.currkey == self.tgtkey:
self.currvalue = next(self.it) # Exit on StopIteration
self.currkey = self.keyfunc(self.currvalue)
self.tgtkey = self.currkey
return (self.currkey, self._grouper(self.tgtkey, self.id))
def _grouper(self, tgtkey, id):
while self.id is id and self.currkey == tgtkey:
yield self.currvalue
try:
self.currvalue = next(self.it)
except StopIteration:
return
self.currkey = self.keyfunc(self.currvalue)
itertools.
islice
(可重复,停止)itertools.
islice
((* iterable , start , stop * [,* step *])- 使一个迭代器返回可迭代对象中的选定元素。如果* start 不为零,则跳过 iterable 中的元素,直到到达 start 为止。此后,除非将 step 设置为大于一个,否则将连续返回元素,这会导致项目被跳过。如果 stop 为
None
,那么迭代将一直持续到迭代器耗尽为止。否则,它将停在指定位置。与常规切片不同,islice()不支持 start , stop 或 step *的负值。可用于从内部结构已被展平的数据中提取相关字段(例如,多行报告可能每三行列出一个名称字段)。大致相当于:
- 使一个迭代器返回可迭代对象中的选定元素。如果* start 不为零,则跳过 iterable 中的元素,直到到达 start 为止。此后,除非将 step 设置为大于一个,否则将连续返回元素,这会导致项目被跳过。如果 stop 为
def islice(iterable, *args):
# islice('ABCDEFG', 2) --> A B
# islice('ABCDEFG', 2, 4) --> C D
# islice('ABCDEFG', 2, None) --> C D E F G
# islice('ABCDEFG', 0, None, 2) --> A C E G
s = slice(*args)
start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1
it = iter(range(start, stop, step))
try:
nexti = next(it)
except StopIteration:
# Consume *iterable* up to the *start* position.
for i, element in zip(range(start), iterable):
pass
return
try:
for i, element in enumerate(iterable):
if i == nexti:
yield element
nexti = next(it)
except StopIteration:
# Consume to *stop*.
for i, element in zip(range(i + 1, stop), iterable):
pass
如果* start 为None
,则迭代从零开始。如果 step *为None
,则该步骤默认为 1.
itertools.
permutations
(* iterable , r = None *)- 返回* iterable 中元素的连续 r *长度排列。
如果未指定* r 或None
,则 r 默认为 iterable *的长度,并生成所有可能的全长排列。
根据 Importing* iterable 的 Sequences,按字典 Sequences 发出排列 Tuples。因此,如果对 Importing iterable *进行排序,则将按排序 Sequences 生成组合 Tuples。
元素根据其位置而不是其价值被视为唯一。因此,如果 Importing 元素是唯一的,则每个排列中都不会有重复值。
大致相当于:
def permutations(iterable, r=None):
# permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC
# permutations(range(3)) --> 012 021 102 120 201 210
pool = tuple(iterable)
n = len(pool)
r = n if r is None else r
if r > n:
return
indices = list(range(n))
cycles = list(range(n, n-r, -1))
yield tuple(pool[i] for i in indices[:r])
while n:
for i in reversed(range(r)):
cycles[i] -= 1
if cycles[i] == 0:
indices[i:] = indices[i+1:] + indices[i:i+1]
cycles[i] = n - i
else:
j = cycles[i]
indices[i], indices[-j] = indices[-j], indices[i]
yield tuple(pool[i] for i in indices[:r])
break
else:
return
permutations()的代码也可以表示为product()的子序列,经过过滤以排除具有重复元素的条目(这些元素来自 Importing 池中的相同位置):
def permutations(iterable, r=None):
pool = tuple(iterable)
n = len(pool)
r = n if r is None else r
for indices in product(range(n), repeat=r):
if len(set(indices)) == r:
yield tuple(pool[i] for i in indices)
返回的项目数是0 <= r <= n
时为n! / (n-r)!
,或r > n
时为零。
itertools.
product
(** iterables , repeat = 1 *)- Importing 可迭代项的笛卡尔积。
大致等效于生成器表达式中的嵌套 for 循环。例如,product(A, B)
返回与((x,y) for x in A for y in B)
相同。
嵌套循环就像里程表一样循环,最右边的元素在每次迭代中都前进。此模式创建字典 Sequences,以便如果对 Importing 的可迭代对象进行排序,则将按排序 Sequences 发出产品 Tuples。
要计算自身的可乘积,请使用可选的* repeat *关键字参数指定重复次数。例如,product(A, repeat=4)
的含义与product(A, A, A, A)
相同。
此函数与以下代码大致等效,除了实际的实现不会在内存中构建中间结果之外:
def product(*args, repeat=1):
# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
# product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
pools = [tuple(pool) for pool in args] * repeat
result = [[]]
for pool in pools:
result = [x+[y] for x in result for y in pool]
for prod in result:
yield tuple(prod)
itertools.
repeat
(* object * [,* times *])
大致相当于:
def repeat(object, times=None):
# repeat(10, 3) --> 10 10 10
if times is None:
while True:
yield object
else:
for i in range(times):
yield object
- repeat 的常见用法是向 map 或 zip *提供一个常量值流:
>>> list(map(pow, range(10), repeat(2)))
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
itertools.
starmap
(* function , iterable *)
def starmap(function, iterable):
# starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
for args in iterable:
yield function(*args)
itertools.
takewhile
(* predicate , iterable *)- 只要谓词为 true,就可以使迭代器返回可迭代对象的元素。大致相当于:
def takewhile(predicate, iterable):
# takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4
for x in iterable:
if predicate(x):
yield x
else:
break
itertools.
tee
(* iterable , n = 2 *)- 从单个可迭代器返回* n *个独立的迭代器。
以下 Python 代码有助于说明* tee *的Function(尽管实际实现更为复杂,并且仅使用单个基础 FIFO 队列)。
大致相当于:
def tee(iterable, n=2):
it = iter(iterable)
deques = [collections.deque() for i in range(n)]
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
try:
newval = next(it) # fetch a new value and
except StopIteration:
return
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return tuple(gen(d) for d in deques)
tee()分割后,原始的* iterable 不应在其他任何地方使用;否则,在不通知 tee 对象的情况下, iterable *可能会 Developing。
tee
迭代器不是线程安全的。当同时使用由同一tee()调用返回的迭代器时,即使原始的* iterable *是线程安全的,也可能引发RuntimeError。
此 itertool 可能需要大量辅助存储(取决于需要存储多少临时数据)。通常,如果一个迭代器在另一个迭代器启动之前使用了大部分或全部数据,则使用list()而不是tee()会更快。
itertools.
zip_longest
(** iterables , fillvalue = None *)- 创建一个迭代器,以聚合每个可迭代对象中的元素。如果可迭代项的长度不均匀,则用* fillvalue *填充缺失值。迭代一直持续到最长的可迭代耗尽为止。大致相当于:
def zip_longest(*args, fillvalue=None):
# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
iterators = [iter(it) for it in args]
num_active = len(iterators)
if not num_active:
return
while True:
values = []
for i, it in enumerate(iterators):
try:
value = next(it)
except StopIteration:
num_active -= 1
if not num_active:
return
iterators[i] = repeat(fillvalue)
value = fillvalue
values.append(value)
yield tuple(values)
如果一个可迭代的对象可能是无限的,则zip_longest()函数应该用限制调用次数的内容包装(例如islice()或takewhile())。如果未指定,则* fillvalue *默认为None
。
Itertools Recipes
本节显示使用现有 itertools 作为构建块来创建扩展工具集的方法。
基本上所有这些食谱以及许多其他食谱都可以从 Python 软件包索引中的more-itertools project安装:
pip install more-itertools
扩展工具提供与基础工具集相同的高性能。pass一次处理一个元素,而不是一次将整个可迭代对象全部放入内存,可以保持卓越的内存性能。pass以一种Function样式将工具链接在一起,可以减少代码量,这有助于消除临时变量。与使用 for 循环和generator s 相比,优先使用“矢量化”构建块来保持高速,这会导致解释器开销。
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(islice(iterable, n))
def prepend(value, iterator):
"Prepend a single value in front of an iterator"
# prepend(1, [2, 3, 4]) -> 1 2 3 4
return chain([value], iterator)
def tabulate(function, start=0):
"Return function(0), function(1), ..."
return map(function, count(start))
def tail(n, iterable):
"Return an iterator over the last n items"
# tail(3, 'ABCDEFG') --> E F G
return iter(collections.deque(iterable, maxlen=n))
def consume(iterator, n=None):
"Advance the iterator n-steps ahead. If n is None, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
def nth(iterable, n, default=None):
"Returns the nth item or a default value"
return next(islice(iterable, n, None), default)
def all_equal(iterable):
"Returns True if all the elements are equal to each other"
g = groupby(iterable)
return next(g, True) and not next(g, False)
def quantify(iterable, pred=bool):
"Count how many times the predicate is true"
return sum(map(pred, iterable))
def padnone(iterable):
"""Returns the sequence elements and then returns None indefinitely.
Useful for emulating the behavior of the built-in map() function.
"""
return chain(iterable, repeat(None))
def ncycles(iterable, n):
"Returns the sequence elements n times"
return chain.from_iterable(repeat(tuple(iterable), n))
def dotproduct(vec1, vec2):
return sum(map(operator.mul, vec1, vec2))
def flatten(list_of_lists):
"Flatten one level of nesting"
return chain.from_iterable(list_of_lists)
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def roundrobin(*iterables):
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
num_active = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while num_active:
try:
for next in nexts:
yield next()
except StopIteration:
# Remove the iterator we just exhausted from the cycle.
num_active -= 1
nexts = cycle(islice(nexts, num_active))
def partition(pred, iterable):
'Use a predicate to partition entries into false entries and true entries'
# partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
t1, t2 = tee(iterable)
return filterfalse(pred, t1), filter(pred, t2)
def powerset(iterable):
"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_justseen(iterable, key=None):
"List unique elements, preserving order. Remember only the element just seen."
# unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
# unique_justseen('ABBCcAD', str.lower) --> A B C A D
return map(next, map(operator.itemgetter(1), groupby(iterable, key)))
def iter_except(func, exception, first=None):
""" Call a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like builtins.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
Examples:
iter_except(functools.partial(heappop, h), IndexError) # priority queue iterator
iter_except(d.popitem, KeyError) # non-blocking dict iterator
iter_except(d.popleft, IndexError) # non-blocking deque iterator
iter_except(q.get_nowait, Queue.Empty) # loop over a producer Queue
iter_except(s.pop, KeyError) # non-blocking set iterator
"""
try:
if first is not None:
yield first() # For database APIs needing an initial cast to db.first()
while True:
yield func()
except exception:
pass
def first_true(iterable, default=False, pred=None):
"""Returns the first true value in the iterable.
If no true value is found, returns *default*
If *pred* is not None, returns the first item
for which pred(item) is true.
"""
# first_true([a,b,c], x) --> a or b or c or x
# first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
return next(filter(pred, iterable), default)
def random_product(*args, repeat=1):
"Random selection from itertools.product(*args, **kwds)"
pools = [tuple(pool) for pool in args] * repeat
return tuple(random.choice(pool) for pool in pools)
def random_permutation(iterable, r=None):
"Random selection from itertools.permutations(iterable, r)"
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(random.sample(pool, r))
def random_combination(iterable, r):
"Random selection from itertools.combinations(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.sample(range(n), r))
return tuple(pool[i] for i in indices)
def random_combination_with_replacement(iterable, r):
"Random selection from itertools.combinations_with_replacement(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.randrange(n) for i in range(r))
return tuple(pool[i] for i in indices)
def nth_combination(iterable, r, index):
'Equivalent to list(combinations(iterable, r))[index]'
pool = tuple(iterable)
n = len(pool)
if r < 0 or r > n:
raise ValueError
c = 1
k = min(r, n-r)
for i in range(1, k+1):
c = c * (n - k + i) // i
if index < 0:
index += c
if index < 0 or index >= c:
raise IndexError
result = []
while r:
c, n, r = c*r//n, n-1, r-1
while index >= c:
index -= c
c, n = c*(n-r)//n, n-1
result.append(pool[-1-n])
return tuple(result)