根据scala flink中的另一个数据集过滤数据集

dwthyt8l  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(414)

我正在尝试复制以下python代码:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])

哪里 x 以及 y 是向量,和 uy 是唯一的值吗 y ,例如 0,1 .
在Flink,我有:

val uy = y.distinct.collect
val condHx = for (i ← uy)
    yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

然而,似乎 filterWithBcVariable 不是每个人都重视 y ,只需要第一个。
我也试过:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)

但我的记性没了。
我怎么过滤 x 在价值观方面 y ?
像这样的 x.zip(y) 会这样做,但不支持。
有什么想法吗?

798qvoo8

798qvoo81#

我想出了一个解决办法,也许不是最好的,但至少它起作用了。
现在,不是过去 x 以及 y 分开的 DataSets ,我路过一个 DataSet[LabeledVector] 只有一列:

val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))

那我就过去了 xy 我的职责是:

def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
    // Get the label
    val y = xy map (_.label)
    // Get probs for the label
    val p = probs(y).toArray.asBreeze
    // Get unique values in label
    val values = y.distinct.collect
    // Compute Conditional Entropy
    val condH = for (i ← values)
      yield entropy(xy.filter(_.label == i))
    p.dot(seq2Breeze(condH))
  }

相关问题