在Scala中解组protobufs

0sgqnhkj  于 6个月前  发布在  Scala
关注(0)|答案(2)|浏览(51)

我之前已经在Scala中使用ScalaPB和以下编组器实现了Unmarshalling:

implicit def marshaller[T <: GeneratedMessage]: ToEntityMarshaller[T] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[T](r => r.toByteArray)

implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[Request]](implicit companion: GeneratedMessageCompanion[Request]): FromEntityUnmarshaller[Request] = {
    Unmarshaller.byteArrayUnmarshaller.map[Request](bytes => companion.parseFrom(bytes))
  }

字符串
这允许我的Route接受Request类型的传入消息,定义为:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message RegisterRequest {
  string username = 1;
  optional string password = 2;
}

message Request {
  string hostID = 1;

  oneof requestType {
    RegisterRequest registerRequest = 2;
  }
}


我在系统中添加了另一个Route,它接受DataRequest类型。这被定义为:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message DataRequest {
  string hostID = 1;
  string data = 2;
}


因此,我修改了我的AKKA参与者和路由,以使用它们接收和响应的消息类型的类型,定义为:

final case class ActorRequest[T, E](request: T, replyTo: ActorRef[ActorResponse[E]])

  final case class ActorResponse[T](response: T)


为了减少重复代码,我将Route的创建移到了超类中。超类Layer看起来像:

trait Marshalling extends DefaultJsonProtocol with SprayJsonSupport {
  
  implicit def marshaller[E <: GeneratedMessage]: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[T]](implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
  
}

abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)
  extends CORSHandler with Marshalling {

  implicit val timeout: Timeout = Timeout.create(SYSTEM.settings.config.getDuration("my-app.routes.ask-timeout"))

  private var systemActor: ActorRef[ActorRequest[T, E]] = null

  def createResponse(request: T): ActorResponse[E]

  private def createRoutes(): Route = {
    pathPrefix(HOST_ID) {
      path(directivePath) {
        post {
          entity(as[T]) { request =>
            onComplete(handle(request)) {
              case Success(response) =>
                complete(response.response)
              case Failure(exception) => complete(InternalServerError, s"An error occurred ${exception.getMessage}")
            }
          }
        }
      }
    }
  }

...
}


当切换到Unmarshaller时,我得到以下错误:

I found:

    akka.http.scaladsl.unmarshalling.Unmarshaller.
      messageUnmarshallerFromEntityUnmarshaller[T](
      akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.
        sprayJsonUnmarshaller[T](/* missing */summon[spray.json.RootJsonReader[T]])
    )

But no implicit values were found that match type spray.json.RootJsonReader[T].
          entity(as[T]) { request =>


有谁是这方面的Maven,可以帮助我确定这个问题?错误似乎是抱怨它不是一个FromRequestMarshaller,但也不是Unmarshaller以前定义类类型时。任何建议?
最小可重现性示例:https://github.com/ritcat14/hydra_broken_marshalling

ergxz8rk

ergxz8rk1#

trait Marshalling中的implicit def unmarshaller不能从Layer类中使用:unmarshaller需要一个GeneratedMessageCompanion[T],但是Layer类不能保证这样一个同伴将可用于它将示例化的T,解决方案是将隐式同伴作为构造函数参数添加到class Layer中,以便将其提供给def unmarshaller。 这将是Marshalling`的最小定义(不必要的JSON内容被清除,但这不是问题的原因):

trait Marshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
  implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
}

字符串
然后,Layer类签名可以捕获隐式同伴:

abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)(implicit cmp: GeneratedMessageCompanion[T])
  extends CORSHandler with Marshalling[T, E] {`


然而,由于示例cmpLayer的实现中实际上并不直接需要,因此可以重写为:

abstract class Layer[T <: GeneratedMessage : GeneratedMessageCompanion, E <: GeneratedMessage](name: String, directivePath: String)
  extends CORSHandler with Marshalling[T, E] {

bvpmtnay

bvpmtnay2#

因此,虽然这“解决”了问题,但这是一个讨厌的工作,并将修复放在protobuf端,而不是修复数据库编组,但目前这是有效的。
我创建了一个ProtoMessage protobuf,它只有一个所有消息类型的oneof字段,如下所示:

syntax = "proto3";

import "Request.proto";
import "DataRequest.proto";
import "Response.proto";
import "DataResponse.proto";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message UnknownType {}

message ProtoMessage {
  string hostID = 1;

  oneof messageType {
    Request request = 2;
    DataRequest dataRequest = 3;

    Response response = 4;
    DataResponse dataResponse = 5;

    UnknownType unknownType = 6;
  }
}

字符串
然后我将这个对象马歇尔/解编组:

implicit def protobufMarshaller[T <: GeneratedMessage]: ToEntityMarshaller[ProtoMessage] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ProtoMessage](r => r.toByteArray)

  implicit def requestMarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[ProtoMessage]](implicit companion: GeneratedMessageCompanion[ProtoMessage]): FromEntityUnmarshaller[ProtoMessage] = {
    Unmarshaller.byteArrayUnmarshaller.map[ProtoMessage](bytes => companion.parseFrom(bytes))
  }


我用ProtoMessage替换了所有的类类型。
这是一个解决方案,是的。它是可怕的吗?绝对的。它是一个永久性的修复吗?不。仍然开放的建议,因为这个修复,我必须过滤的messageType字段每次我收到一个消息,对每个Route

相关问题