将来自nifi流的传入json消息解析到hbase表中

nbnkbykc  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(408)

​大家好,我从nifi的一个Kafka主题中得到了一条信息流,我正在通过一个消费过程来阅读。消息的格式为json(伪json值,json格式与原始格式相同):

{   "schema": {
    "type": "struct",
    "name": "emp_table",
    "fields": [
      {
        "field": "emp_id",
        "type": "string"
      },
      {
        "field": "emp_name",
        "type": "String"
      },
      {
        "field": "city",
        "type": "string"
      },
      {
        "field": "emp_sal",
        "type": "string"
      },
      {
        "field": "manager_name",
        "type": "string"
      }
    ]   },   "payload": {
    "emp_id": "1",
    "emp_name": "abc",
    "city": "NYK",
    "emp_sal": "100000",
    "manager_name": "xyz"   } }

正如您在这里看到的,实际的表名在schema下,列值在payload下。我能够在nifi中使用evaluatejsonpath和puthbasejson处理器解析列值并将其放入hbase表中。
我能够实现的是手动放置表名和rowid。但我的问题是我想从json中获取tablename(在上面的示例emp\u table中)和rowid(在上面的示例emp\u id中),并在运行时将这些值与列值一起提供给nifi中的puthbasejson处理器。

bvpmtnay

bvpmtnay1#

您应该能够向evaluatejsonpath添加另一个json路径表达式,如下所示:

table = $.schema.name

然后在puthbasejson中,使表名${table},或在evaluatejsonpath中命名的任何名称。

相关问题