新的Dataframe列作为其他行(熊猫)的通用函数
在中创建 DataFrame
其他列中 最快的列的最快(最有效)方法 是pandas
什么?
考虑以下示例:
import pandas as pd
d = {
'id': [1, 2, 3, 4, 5, 6],
'word': ['cat', 'hat', 'hag', 'hog', 'dog', 'elephant']
}
pandas_df = pd.DataFrame(d)
产生:
id word
0 1 cat
1 2 hat
2 3 hag
3 4 hog
4 5 dog
5 6 elephant
假设我想创建一个新列bar
,该列包含一个值,该值基于使用函数foo
将当前行中的单词与中的其他行进行比较的输出而得出dataframe
。
def foo(word1, word2):
# do some calculation
return foobar # in this example, the return type is numeric
threshold = some_threshold
for index, _id, word in pandas_df.itertuples():
value = sum(
pandas_df[pandas_df['word'] != word].apply(
lambda x: foo(x['word'], word),
axis=1
) < threshold
)
pandas_df.loc[index, 'bar'] = value
这的确产生了正确的输出,但是它使用了itertuples()
and apply()
,这对于large而言并不是很有效DataFrames
。
有没有一种方法可以 矢量化 (正确的术语?)这种方法?还是有另一种更好(更快)的方法来做到这一点?
注释/更新:
- 在原始帖子中,我使用了“编辑距离/左手脚距”作为
foo
函数。我改变了这个问题,试图变得更通用。想法是要应用的功能是将当前行的值与所有其他行进行比较,并返回一些汇总值。
如果foo
wasnltk.metrics.distance.edit_distance
和thethreshold
被设置为2
(如原始文章中所示),则会产生以下输出:
id word bar
0 1 cat 1.0
1 2 hat 2.0
2 3 hag 2.0
3 4 hog 2.0
4 5 dog 1.0
5 6 elephant 0.0
-
我也有同样的问题的
spark dataframes
为好。我认为将这些分成两个职位是很有意义的,因此它们的范围不太广。但是,我通常发现,pandas
有时可以修改类似问题的解决方案以使其适用spark
。 -
灵感来自这个答案我的
spark
版本这个问题,我试图用一个笛卡尔积在pandas
。我的速度测试表明速度稍快(尽管我怀疑这可能随数据大小而变化)。不幸的是,我仍然无法拨通电话apply()
。
示例代码:
from nltk.metrics.distance import edit_distance as edit_dist
pandas_df2 = pd.DataFrame(d)
i, j = np.where(np.ones((len(pandas_df2), len(pandas_df2))))
cart = pandas_df2.iloc[i].reset_index(drop=True).join(
pandas_df2.iloc[j].reset_index(drop=True), rsuffix='_r'
)
cart['dist'] = cart.apply(lambda x: edit_dist(x['word'], x['word_r']), axis=1)
pandas_df2 = (
cart[cart['dist'] < 2].groupby(['id', 'word']).count()['dist'] - 1
).reset_index()
-
让我们尝试分析一下问题:
如果有
N
行,则N*N
在相似性函数中要考虑“对”。在一般情况下,对所有这些元素进行评估都是无可避免的(听起来很合理,但我无法证明这一点)。因此,您
至少具有O(n ^ 2)个时间复杂度 。但是,您可以尝试使用时间复杂度 恒定的因素 。我发现的可能选项是:
1.并行化:
由于您有一些大型的
DataFrame
,并行处理是最佳的选择。这将使您(几乎)在时间复杂度方面得到线性改善,因此,如果您有16名工作人员,您将获得(几乎)16倍的改进。例如,我们可以将的行划分
df
为不相交的部分,并分别处理每个部分,然后合并结果。一个非常基本的并行代码可能看起来像这样:from multiprocessing import cpu_count,Pool def work(part): """ Args: part (DataFrame) : a part (collection of rows) of the whole DataFrame. Returns: DataFrame: the same part, with the desired property calculated and added as a new column """ # Note that we are using the original df (pandas_df) as a global variable # But changes made in this function will not be global (a side effect of using multiprocessing). for index, _id, word in part.itertuples(): # iterate over the "part" tuples value = sum( pandas_df[pandas_df['word'] != word].apply( # Calculate the desired function using the whole original df lambda x: foo(x['word'], word), axis=1 ) < threshold ) part.loc[index, 'bar'] = value return part # New code starts here ... cores = cpu_count() #Number of CPU cores on your system data_split = np.array_split(data, cores) # Split the DataFrame into parts pool = Pool(cores) # Create a new thread pool new_parts = pool.map(work , data_split) # apply the function `work` to each part, this will give you a list of the new parts pool.close() # close the pool pool.join() new_df = pd.concat(new_parts) # Concatenate the new parts
注意:我试图使代码尽可能接近OP的代码。这只是一个基本的演示代码,并且存在许多更好的替代方法。
2.“低级”优化:
另一个解决方案是尝试优化相似度函数的计算和迭代/映射。与上一个或下一个选项相比,我认为这不会为您带来很大的提速。
3.取决于功能的修剪:
您可以尝试的最后一件事是依赖相似功能的改进。这在一般情况下不起作用,但如果您可以分析相似性函数,则将很好地工作。例如:
假设您使用的是Levenshtein距离(
LD
),则可以观察到任意两个字符串之间的距离> =长度之间的差。即LD(s1,s2) >= abs(len(s1)-len(s2))
。您可以使用此观察结果来修剪可能的相似对,以进行评估。因此,对于每个字符串长度
l1
,只有具有长度的字符串比较它l2
有abs(l1-l2) <= limit
。(限制为所接受的最大相似度,在您提供的示例中为2)。另一个观察是
LD(s1,s2) = LD(s2,s1)
。这样可以将对数减少2倍。该解决方案实际上可能使您陷入
O(n)
时间复杂性(高度依赖于数据)的问题。
为什么?你可能会问。
这是因为,如果我们有10^9
行,但平均而言10^3
,每行只有“接近”长度的行,那么我们需要针对约10^9 * 10^3 /2
对而不是10^9 * 10^9
对来评估函数。但这(再次)取决于数据。如果(在此示例中)您拥有长度均为3的字符串,则此方法将无用。