incubator-doris Load json data into Doris by json-path

omjgkv6w  于 2022-04-22  发布在  Java
关注(0)|答案(2)|浏览(236)

Requirements describe

I wish Doris can support json data, that‘s load json data into table by routine load or stream load

It's my idea

Routine load:

  1. Create table for books
    CREATE TABLE books(
    category varchar(32),
    author varchar(32),
    title varchar(32),
    dt int COMMENT '天分区,格式YYYYMMDD',
    price int
    ) ENGINE=OLAP
    AGGREGATE KEY(category,author,title)
    PARTITION BY RANGE(dt) (
    PARTITION p0 VALUES less than ("20190101"),
    PARTITION p20190101 VALUES less than ("20190102"),
    PARTITION p20190102 VALUES less than ("20190103")
    )
    DISTRIBUTED BY HASH(category, author, title) BUCKETS 32
    PROPERTIES ("storage_type"="column");
  2. Create Routine Load
    CREATE ROUTINE LOAD example_db.books_label1 ON books
    COLUMNS(category, author, title, dt, price),
    PROPERTIES
    (

"format" = "json",

"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);

  1. kafka json data
    {

**"jsonpath": [

{"key": "author", "type": "string", "value": "$.store.book.author"},
{"key": "category", "type": "string", "value": "$.store.book.category"},
{"key": "price", "type": "float", "value": "$.store.book.price"},
{"key": "title", "type": "string", "value": "$.store.book.title"}
{"key": "dt", "type": "integer", "value": "$.date"}
],**
userdata : [
{
"store": {
"book": [
{"category": "reference", "author": "NigelRees", "title": "SayingsoftheCentury", "price": 8.95},
{"category": "fiction", "author": "EvelynWaugh", "title": "SwordofHonour", "price": 12.99}
],
"bicycle": {"color": "red", "price": 19.95}
},
"expensive": 10,
"date": 20190202
},
]

}

PS:

  1. We must specify format, that's "json"
  2. In json data of kafka, we need contain a jsonpath object
    Keyword key is a column name of table
    Keyword type is data type in json data
    Keyword value is express of the json-path.
  3. userdata is array object, it can contain lots of application data objects.

Can you offer some other suggestions ??

nqwrtyyt

nqwrtyyt1#

Can the json path be stored in the routine load task?
Because in the actual scenario, the json message in kafka may be filled with ready-made data. If you need to add the json path to the message, the data needs to be reprocessed, and the size of each message is also increased.

I think kafka json message should be decoupled from doris column

ilmyapht

ilmyapht2#

I has completed this function.
PR: https://github.com/apache/incubator-doris/pull/3230

For example:

CREATE TABLE `test_json_price`(
  `category` varchar(32),
  `author` varchar(32),
  `title` varchar(32),
  `price` double  SUM

) ENGINE=OLAP
AGGREGATE KEY(category, author, title)
DISTRIBUTED BY HASH(category, author, title) BUCKETS 32
PROPERTIES ("storage_type"="column","replication_num" = "1");

CREATE ROUTINE LOAD mediavad.test_json_price_label001 ON test_json_price
COLUMNS(category, author, title, price)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpath" = "{\"jsonpath\":[{\"column\":\"category\",\"value\":\"$.store.book.category\"},{\"column\":\"author\",\"value\":\"$.store.book.author\"},{\"column\":\"title\",\"value\":\"$.store.book.title\"},{\"column\":\"price\",\"value\":\"$.store.book.price\"}]}"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.125.110:9092,192.168.125.120:9092,192.168.125.130:9092",
    "kafka_topic" = "KafkaLoadJsonForDoris",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "0,0,0"
);

相关问题