sparksql with scalapb:从dataframe转换到proto dataset时跳过proto字段时出错

jecbmhm3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(401)

我有以下原型消息,需要使用scalapb通过spark编写:

message EnforcementData
{
  required int32 id = 1;
  required int32 source = 2;
  required int32 flagsEnforceOption = 4;
  required int32 categoryEnforceOption = 5;

  optional TypeA a= 100;
  optional TypeB b= 101;
}
``` `TypeA` 以及 `TypeB` 儿童班是 `EnforcementData` 在接收器端,它使用protobuf-net来反序列化。
现在,我的sparkDataframe可以有a列或b列。假设df是我的Dataframe,我称之为: `df.withColumn(b, null).as[EnforcementData].map(_.toByteArray)` 对于typea消息 `df.withColumn(a, null).as[EnforcementData].map(_.toByteArray)` 对于类型B消息
但是使用protobuf-net反序列化消息的接收器抛出stackoverflow异常。我还尝试过传递一个伪case类而不是null,但它似乎仍然不起作用。
请告诉我怎么处理?
nwnhqdif

nwnhqdif1#

我可以通过重构case类并显式跳过可选的子类字段来解决这个问题。即

//for TypeA messages,

 df.withColumn(b, null)
   .as[EnforcementData]
   .map{case EnforcementData(id, source, flag, cat, a, _) => EnforcementData(id, source, flag, cat, a = a) 
   } 

 //for TypeB messages,    

 df.withColumn(s, null)
   .as[EnforcementData]
   .map{case EnforcementData(id, source, flag, cat, _, b) => EnforcementData(id, source, flag, cat, b = b) 
    }

相关问题