免责声明:我对spark和scala非常陌生。我和spark在scala中进行一个文档相似性项目。我有一个Dataframe,看起来像这样:
+--------+--------------------+------------------+
| text| shingles| hashed_shingles|
+--------+--------------------+------------------+
| qwerty|[qwe, wer, ert, rty]| [-4, -6, -1, -9]|
|qwerasfg|[qwe, wer, era, r...|[-4, -6, 6, -2, 2]|
+--------+--------------------+------------------+
在这里,我将文档文本分割成带状,并为每个带状计算一个哈希值。
想象一下我有一个 hash_function(integer, seed) -> integer
. 现在我想申请 n
将此形式的不同哈希函数 hashed_shingles
数组。i、 e.获得一个数组 n
使每个数组 hash_function(hashed_shingles, seed)
种子从1到n。
我正试着做这样的事,但我没能成功:
val n = 3
df = df.withColumn("tmp", array_repeat($"hashed_shingles", n)) // Repeat minhashes
val minhash_expr = "transform(tmp,(x,i) -> hash_function(x, i))"
df = df.withColumn("tmp", expr(minhash_expr)) // Apply hash to each array
我知道如何用一个 udf
,但据我所知,它们没有经过优化,我应该尽量避免使用它们,所以我尽量用 org.apache.spark.sql.functions
.
你有没有什么想法 udf
?
这个 udf
实现相同目标的是:
// Family of hashing functions
class Hasher(seed: Int, max_val : Int, p : Int = 104729) {
private val random_generator = new scala.util.Random(seed)
val a = 1 + 2*random_generator.nextInt((p-2)/2)// a odd in [1, p-1]
val b = 1 + random_generator.nextInt(p - 2) // b in [1, p-1]
def getHash(x : Int) : Int = ((a*x + b) % p) % max_val
}
// Compute a list of minhashes from a list of hashers given a set of ids
class MinHasher(hashes : List[Hasher]) {
def getMinHash(set : Seq[Int])(hasher : Hasher) : Int = set.map(hasher.getHash).min
def getMinHashes(set: Seq[Int]) : Seq[Int] = hashes.map(getMinHash(set))
}
// Minhasher
val minhash_len = 100
val hashes = List.tabulate(minhash_len)(n => new Hasher(n, shingle_bins))
val minhasher = new MinHasher(hashes)
// Compute Minhashes
val minhasherUDF = udf[Seq[Int], Seq[Int]](minhasher.getMinHashes)
df = df.withColumn("minhashes", minhasherUDF('hashed_shingles))
暂无答案!
目前还没有任何答案,快来回答吧!