我之前已经在Scala中使用ScalaPB和以下编组器实现了Unmarshalling:
implicit def marshaller[T <: GeneratedMessage]: ToEntityMarshaller[T] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[T](r => r.toByteArray)
implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[Request]](implicit companion: GeneratedMessageCompanion[Request]): FromEntityUnmarshaller[Request] = {
Unmarshaller.byteArrayUnmarshaller.map[Request](bytes => companion.parseFrom(bytes))
}
字符串
这允许我的Route
接受Request
类型的传入消息,定义为:
syntax = "proto3";
package PROTOS;
option java_package = "hydra.core.messaging.protobuf";
message RegisterRequest {
string username = 1;
optional string password = 2;
}
message Request {
string hostID = 1;
oneof requestType {
RegisterRequest registerRequest = 2;
}
}
型
我在系统中添加了另一个Route
,它接受DataRequest
类型。这被定义为:
syntax = "proto3";
package PROTOS;
option java_package = "hydra.core.messaging.protobuf";
message DataRequest {
string hostID = 1;
string data = 2;
}
型
因此,我修改了我的AKKA参与者和路由,以使用它们接收和响应的消息类型的类型,定义为:
final case class ActorRequest[T, E](request: T, replyTo: ActorRef[ActorResponse[E]])
final case class ActorResponse[T](response: T)
型
为了减少重复代码,我将Route
的创建移到了超类中。超类Layer
看起来像:
trait Marshalling extends DefaultJsonProtocol with SprayJsonSupport {
implicit def marshaller[E <: GeneratedMessage]: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)
implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[T]](implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
}
}
abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)
extends CORSHandler with Marshalling {
implicit val timeout: Timeout = Timeout.create(SYSTEM.settings.config.getDuration("my-app.routes.ask-timeout"))
private var systemActor: ActorRef[ActorRequest[T, E]] = null
def createResponse(request: T): ActorResponse[E]
private def createRoutes(): Route = {
pathPrefix(HOST_ID) {
path(directivePath) {
post {
entity(as[T]) { request =>
onComplete(handle(request)) {
case Success(response) =>
complete(response.response)
case Failure(exception) => complete(InternalServerError, s"An error occurred ${exception.getMessage}")
}
}
}
}
}
}
...
}
型
当切换到Unmarshaller时,我得到以下错误:
I found:
akka.http.scaladsl.unmarshalling.Unmarshaller.
messageUnmarshallerFromEntityUnmarshaller[T](
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.
sprayJsonUnmarshaller[T](/* missing */summon[spray.json.RootJsonReader[T]])
)
But no implicit values were found that match type spray.json.RootJsonReader[T].
entity(as[T]) { request =>
型
有谁是这方面的Maven,可以帮助我确定这个问题?错误似乎是抱怨它不是一个FromRequestMarshaller
,但也不是Unmarshaller以前定义类类型时。任何建议?
最小可重现性示例:https://github.com/ritcat14/hydra_broken_marshalling
2条答案
按热度按时间ergxz8rk1#
trait Marshalling
中的implicit def unmarshaller
不能从Layer
类中使用:unmarshaller
需要一个GeneratedMessageCompanion[T]
,但是Layer
类不能保证这样一个同伴将可用于它将示例化的T
,解决方案是将隐式同伴作为构造函数参数添加到class Layer
中,以便将其提供给def unmarshaller。 这将是
Marshalling`的最小定义(不必要的JSON内容被清除,但这不是问题的原因):字符串
然后,
Layer
类签名可以捕获隐式同伴:型
然而,由于示例
cmp
在Layer
的实现中实际上并不直接需要,因此可以重写为:型
bvpmtnay2#
因此,虽然这“解决”了问题,但这是一个讨厌的工作,并将修复放在protobuf端,而不是修复数据库编组,但目前这是有效的。
我创建了一个
ProtoMessage
protobuf,它只有一个所有消息类型的oneof
字段,如下所示:字符串
然后我将这个对象马歇尔/解编组:
型
我用
ProtoMessage
替换了所有的类类型。这是一个解决方案,是的。它是可怕的吗?绝对的。它是一个永久性的修复吗?不。仍然开放的建议,因为这个修复,我必须过滤的
messageType
字段每次我收到一个消息,对每个Route
。