Apache Spark stringsableConverter是用来做什么的

2skhul33  于 6个月前  发布在  Apache
关注(0)|答案(1)|浏览(49)

SparkContext.scala中,有两个为基本类型定义的到其可写对应项的隐式转换,例如Int,Long,String
以String为例,有两种定义,定义如下:

implicit def stringWritableConverter(): WritableConverter[String] =
    simpleWritableConverter[String, Text](_.toString)

字符串

implicit val stringWritableConverterFn: () => WritableConverter[String] =
    () => simpleWritableConverter[String, Text](_.toString)


我想问一下这些方法和变量是如何使用的。它们用于隐式类型转换,但没有输入参数(stringWritableConverter不带参数,stringWritableConverterFn也不带参数)。
这些隐式转换是如何被转换为WritableConverter
它们没有参数,我只是不知道如何/何时使用这些转换
谢谢.

更新

为了更好地用代码来说明问题,我编写了以下简单的代码来减少Spark

import scala.reflect.{ClassTag, classTag}

trait Writable

class IntWritable(value: Int) extends Writable

class Text(value: String) extends Writable

class WritableConverter[T](
                            val writableClass: ClassTag[T] => Class[_ <: Writable],
                            val convert: Writable => T)

object implicit_test {

  private def simpleWritableConverter[T, W <: Writable : ClassTag](convert: W => T)
  : WritableConverter[T] = {
    println("Hello, simpleWritableConverter")
    val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
  }

  implicit val stringWritableConverterFn: () => WritableConverter[String] = {
    println("Hello  stringWritableConverterFn")
    () => simpleWritableConverter[String, Text](_.toString)
  }

  implicit def stringWritableConverter(): WritableConverter[String] = {
    println("Hello  stringWritableConverter")
    simpleWritableConverter[String, Text](_.toString)
  }

  def do_convert(a: String) = println(s"a is $a")

  def main(args: Array[String]): Unit = {

    //Compile Error: Required String, Found Text
    do_convert(new Text("abc"))
  }

}


当我调用do_convert(new Text("abc"))时,编译器会抱怨Required String, Found Text,这意味着隐式转换没有生效。

fcwjkofz

fcwjkofz1#

我并不是100%理解这个故事,但我觉得我可能会给你带来一些有用的见解。我将使用Spark版本3.4.1源代码(以及相应的Hadoop版本3.3.4)进行解释。
整个故事看起来有点像这样:

/**
 * A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
 * class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
 * conversion.
 * The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
 * that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
 * support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
 */
private[spark] class WritableConverter[T](
    val writableClass: ClassTag[T] => Class[_ <: Writable],
    val convert: Writable => T)
  extends Serializable

object WritableConverter {

  // Helper objects for converting common types to Writable
  private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
  : WritableConverter[T] = {
    val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
  }

  // The following implicit functions were in SparkContext before 1.3 and users had to
  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  // them automatically. However, we still keep the old functions in SparkContext for backward
  // compatibility and forward to the following functions directly.

  // The following implicit declarations have been added on top of the very similar ones
  // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
  // expansion of zero-arg methods and thus won't match a no-arg method where it expects
  // an implicit that is a function of no args.

  ...

  implicit val stringWritableConverterFn: () => WritableConverter[String] =
    () => simpleWritableConverter[String, Text](_.toString)
  
  ...

  // These implicits remain included for backwards-compatibility. They fulfill the
  // same role as those above.

  ...

  implicit def stringWritableConverter(): WritableConverter[String] =
    simpleWritableConverter[String, Text](_.toString)

  ...
}

字符串

这些可复制对象是什么?

我在这里引用this great answer的一部分:
我们已经知道,在分布式计算环境中,数据需要在不同节点之间传输。这需要数据的序列化和重复化,以将结构化格式的数据转换为字节流,反之亦然。因此,Hadoop使用简单高效的序列化协议在map和reduce阶段之间序列化数据,这些称为可重复性(s)前面已经提到的一些可写的例子是不可写的,可写的,布尔可写的和浮点可写的。
因此,我们知道在使用Hadoop时,我们将跳转到一堆这样的Writable对象上。为了使用原语类型进行可靠的计算,我们需要有一种方法从Writable转到我们的原语。

为什么我们看起来有两个stringWritableConverter

如果你仔细看我贴在这里的代码注解,你会发现stringWritableConverterstringWritableConverterFn做的事情完全一样。
所以我们可以简化我们的问题:这里只有stringWritableConverterFn重要。

这个stringWritableConverterFn会发生什么?

在Scala中,implant是在编译时解析的。所以在编译时,下面的代码将被调用:

simpleWritableConverter[String, Text](_.toString)


请密切注意这里的类型[String, Text]String是原始类型,TextWritableorg.apache.hadoop.io.Text
让我们单独看看simpleWritableConverter函数:

private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
  : WritableConverter[T] = {
    val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
  }


因此,创建了一个新的WritableConverter对象,它存储了两件事:

  • 可继承阶级
  • convert函数,其签名为W => T。这是Hadoop中TexttoString方法。

总结

在Hadoop中,可转换类被广泛使用。如果我们希望能够检索它们所表示的原语类型,我们需要能够将这些类转换为它们的原语类型。这就是这些WritableConverter对象的用途。
最后,我们创建了这个WritableConverter[String]类,它包含了一种通过toString方法将Text可转换为String原语的方法!

回复您的更新

如果你想看看这个转换器的实际效果,你可以看一个测试示例:

test("BytesWritable implicit conversion is correct") {
    // Regression test for SPARK-3121
    val bytesWritable = new BytesWritable()
    val inputArray = (1 to 10).map(_.toByte).toArray
    bytesWritable.set(inputArray, 0, 10)
    bytesWritable.set(inputArray, 0, 5)

    val converter = WritableConverter.bytesWritableConverter()
    val byteArray = converter.convert(bytesWritable)
    assert(byteArray.length === 5)

    bytesWritable.set(inputArray, 0, 0)
    val byteArray2 = converter.convert(bytesWritable)
    assert(byteArray2.length === 0)
  }


在这里,您可以看到我们创建了这个converter对象,它可以使用其convert方法(来自本文开头的WritableConverter[T]类)为您进行转换。
这个例子的不同之处在于,你没有调用WritableConverterconvert方法。所以Scala找不到任何将Text转换为String的函数。

相关问题