用typetag克服scala中的类型擦除

qf9go6mv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(349)

我有以下案例课程:

trait Event
object Event {
  case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T]) extends Event
}

trait KafkaMessage
object KafkaMessage {

  case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
    def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
  }

  case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
    def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
  }
}

在我的一个actor中,我有以下方法来标识实际类型:

class KafkaPublisher[T <: KafkaMessage: TypeTag] extends Actor {

  def paramInfo[T](x: T)(implicit tag: TypeTag[T]): Unit = {
    val targs = typeOf[T] match { case TypeRef(_, _, args) => args }
    println(s"type of $x has type arguments $targs")
  }

  implicit val system = context.system
  val log = Logging(system, this.getClass.getName)

  override final def receive = {
    case ProducerStreamActivated(_, stream) =>
      paramInfo(stream)
      log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
      context.become(active(stream))

    case other =>
      log.warning("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[KafkaMessage]): Receive = {
    case msg: T =>
      stream.offer(msg)

    case other =>
      log.warning("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props[T <: KafkaMessage: TypeTag] =
    Props(new KafkaPublisher[T])
}

我在父参与者中创建producerstreamactivated(…)的示例,如下所示:

val stream = producerStream[DefaultMessage](producerProperties)
  def producerStream[T: Converter](producerProperties: Map[String, String]): SourceQueueWithComplete[T] = {
    if (Try(producerProperties("isEnabled").toBoolean).getOrElse(false)) {
      log.info(s"Kafka is enabled for topic ${producerProperties("publish-topic")}")
      val streamFlow = flowToKafka[T](producerProperties)
      val streamSink = sink(producerProperties)
      source[T].via(streamFlow).to(streamSink).run()
    } else {
      // We just Log to the Console and by pass all Kafka communication
      log.info(s"Kafka is disabled for topic ${producerProperties("publish-topic")}")
      source[T].via(flowToLog[T](log)).to(Sink.ignore).run()
    }
  }

当我现在在我的子actor中打印流sourcequeuewithcomplete[t]中包含的类型时,我可以看到包含的基类kafkamessage,而不是预期的defaultmessage。有什么办法可以缓解这种情况吗?

jv4diomz

jv4diomz1#

在你的 KafkaPublisherreceive 方法,在 ProducerStreamActivated 如果没有任何类型参数(由于类型擦除,您无法在具有参数的类型上进行匹配),并且在该方法中,隐式 TypeTag 传递给 paramInfo 在编译时决定,此时它只是一个 TypeTag[KafkaMessage] .
解决这个问题的一个方法就是 ProducerStreamActivated 类带有自己的类型标签,即:

case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T])(implicit val tag: TypeTag[T]) extends Event

而不是在 receive 方法,就这么做 msg.tag .
这应该是可行的,因为在您实际创建 ProducerStreamActivated ,您确实有编译时类型参数信息(它是 DefaultMessage )这将是编译器填充的类型标记,然后您可以保留对它的引用。

相关问题