如何将debezium消息转换为json格式,以便将其加载到redshift中

koaltpgm  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(628)

我需要帮助来完成一些事情。我已经创建了一个数据管道,如下所述。
mysql-->debezium-->kafka-->kafka连接--->aws s3。
现在s3将有一个json格式的debezium事件消息。
现在需要将其作为表加载到redshift。
s3-->红移(目标数据库)为行。
下面我已经为一个更新事件(产品id 102的更新数量)共享了一个debezium事件消息,并且只想有一个格式,当我在s3中执行copy命令时,它应该将更改(create/update/delete)加载到redshift表中。
注意:这里我已经给出了“rotate.interval.ms”:“3600000”,对于我们的每一个用户,将创建一个包含所有crud操作的debezium消息文件。
因此,我们需要一个解决方案,这样它就可以将s3中每个新创建的文件(作为debezium消息事件)转换为一种格式,在这种格式中,我们可以应用一个copy命令,以便它以红移方式加载。我的主要目标是从mysql中捕获cdc更改并在redshift中复制。
这是我的s3接收器连接器配置:kafka connect s3 sink:

{
  "name": "s3-sink-db02",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "S3bucket",
    "name": "s3-sink-db02",
    "tasks.max": "3",
    "s3.region": "us-east-1",
    "aws.access_key_id": "accesskey",
    "aws.secret_access_key": "secretKey",
    "s3.part.size": "5242880",
    "s3.compression.type": "gzip",
    "timezone": "UTC",
    "locale": "en",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "topics.regex": "dbserver1.(.*)",
    "internal.key.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "internal.value.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
    "path.format": "YYYY/MM/dd/HH",
    "partition.duration.ms": "3600000",
    "rotate.schedule.interval.ms": "3600000"
  }
}

debezium消息:

{
  "schema": {
    "name": "dbserver1.inventory.orders.Envelope",
    "optional": false,
    "type": "struct",
    "fields": [
      {
        "field": "before",
        "name": "dbserver1.inventory.orders.Value",
        "optional": true,
        "type": "struct",
        "fields": [
          {
            "field": "order_number",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "order_date",
            "name": "io.debezium.time.Date",
            "optional": false,
            "type": "int32",
            "version": 1
          },
          {
            "field": "purchaser",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "quantity",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "product_id",
            "optional": false,
            "type": "int32"
          }
        ]
      },
      {
        "field": "after",
        "name": "dbserver1.inventory.orders.Value",
        "optional": true,
        "type": "struct",
        "fields": [
          {
            "field": "order_number",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "order_date",
            "name": "io.debezium.time.Date",
            "optional": false,
            "type": "int32",
            "version": 1
          },
          {
            "field": "purchaser",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "quantity",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "product_id",
            "optional": false,
            "type": "int32"
          }
        ]
      },
      {
        "field": "source",
        "name": "io.debezium.connector.mysql.Source",
        "optional": false,
        "type": "struct",
        "fields": [
          {
            "field": "version",
            "optional": false,
            "type": "string"
          },
          {
            "field": "connector",
            "optional": false,
            "type": "string"
          },
          {
            "field": "name",
            "optional": false,
            "type": "string"
          },
          {
            "field": "ts_ms",
            "optional": false,
            "type": "int64"
          },
          {
            "default": "false",
            "field": "snapshot",
            "name": "io.debezium.data.Enum",
            "optional": true,
            "type": "string",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            }
          },
          {
            "field": "db",
            "optional": false,
            "type": "string"
          },
          {
            "field": "table",
            "optional": true,
            "type": "string"
          },
          {
            "field": "server_id",
            "optional": false,
            "type": "int64"
          },
          {
            "field": "gtid",
            "optional": true,
            "type": "string"
          },
          {
            "field": "file",
            "optional": false,
            "type": "string"
          },
          {
            "field": "pos",
            "optional": false,
            "type": "int64"
          },
          {
            "field": "row",
            "optional": false,
            "type": "int32"
          },
          {
            "field": "thread",
            "optional": true,
            "type": "int64"
          },
          {
            "field": "query",
            "optional": true,
            "type": "string"
          }
        ]
      },
      {
        "field": "op",
        "optional": false,
        "type": "string"
      },
      {
        "field": "ts_ms",
        "optional": true,
        "type": "int64"
      }
    ]
  },
  "payload": {
    "op": "u",
    "before": {
      "order_date": 16816,
      "quantity": 1,
      "purchaser": 1001,
      "order_number": 10001,
      "product_id": 102
    },
    "after":**{
  "order_date": 16816,
  "quantity": 6,
  "purchaser": 1001,
  "order_number": 10001,
  "product_id": 102
},
"source": {
    "query": null,
    "thread": 4,
    "server_id": 223344,
    "version": "1.0.3.Final",
    "file": "mysql-bin.000007",
    "connector": "mysql",
    "pos": 354,
    "name": "dbserver1",
    "gtid": null,
    "row": 0,
    "ts_ms": 1591620600000,
    "snapshot": "false",
    "db": "inventory",
    "table": "orders"
  },
  "ts_ms": 1591620602204
}
llmtgqce

llmtgqce1#

我会在有时间的时候完善这个答案。它来自我们的一个生产回购,它使用https://github.com/goibibo/dataplatform_utils 图书馆。

import com.goibibo.dp.utils.{KfUtils09, SparkKafkaUtils09}
import com.goibibo.dp.utils.SparkKafkaUtils09._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, OffsetRange}
import org.slf4j.{Logger, LoggerFactory}
import org.json4s.jackson.Serialization
import org.json4s._
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConverters._

object KafkaToRedShift {

  val spark: SparkSession = getSparkSession("KafkaToRedShift") /* Implement this */
  val logger: Logger = LoggerFactory.getLogger(KafkaToRedShift.getClass)

  def createOffsetRange(kafkaBrokers: String, topics: Seq[String],
                        consumerGroup: String, maxMessagesPerPartition: Option[Int],
                        readFrom: String = READ_FROM_COMMITTED): (Seq[OffsetRange], KOffsets, Boolean) = {

    var isReadRequired = false
    val kafkaConfig = KfUtils09.createKafkaConfig(kafkaBrokers, consumerGroup)
    val topicsNames = topics.asJava
    val earliestOffsets: Map[TopicPartition, Long] = KfUtils09.getEarliestOffsets(topicsNames, kafkaConfig).get
    val latestOffsets: Map[TopicPartition, Long] = KfUtils09.getLatestOffsets(topicsNames, kafkaConfig).get
    val committedOffsets: Map[TopicPartition, Long] = KfUtils09.getCommittedOffsets(topicsNames, kafkaConfig).get
    val fromOffsets =
      if (READ_FROM_EARLIEST.equals(readFrom)) earliestOffsets
      else if (READ_FROM_LATEST.equals(readFrom)) latestOffsets
      else committedOffsets
    val offsetRanges: List[OffsetRange] = latestOffsets.toList.map(pairTopicPartitionAndOffset => {
      val (tp, untilOffset) = pairTopicPartitionAndOffset
      val totalMessagesInPartition = untilOffset - fromOffsets(tp)
      logger.info(s"${tp.topic} ${tp.partition} earliestOffsets =  $earliestOffsets committedOffsets = ${committedOffsets(tp)} fromOffsets = ${fromOffsets(tp)} untilOffset = $untilOffset")
      logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition ")
      val newUntilOffset = if (maxMessagesPerPartition.isDefined) {
        if (totalMessagesInPartition > maxMessagesPerPartition.get) {
          logger.info(s"${tp.topic} ${tp.partition} totalMessagesInPartition = $totalMessagesInPartition higher than maxMessagesPerPartition = $maxMessagesPerPartition")
          val newUntilOffset = fromOffsets(tp) + maxMessagesPerPartition.get
          logger.info(s"${tp.topic} ${tp.partition} new untilOffset = $newUntilOffset")
          newUntilOffset
        } else {
          untilOffset
        }
      } else {
        untilOffset
      }
      if (newUntilOffset > fromOffsets(tp)) {
        isReadRequired = true
      }
      OffsetRange.create(tp.topic, tp.partition, fromOffsets(tp), newUntilOffset)
    })

    val latestOffsetsN = offsetRanges.map(o => (new TopicPartition(o.topic, o.partition), o.untilOffset)).toMap
    (offsetRanges, latestOffsetsN, isReadRequired)
  }

  def main(args: Array[String]): Unit = {

    implicit val formats: DefaultFormats.type =  org.json4s.DefaultFormats
    val kafkaBrokers: String = ConfigUtils.bootstrapServers
    val topics: Seq[String] = Seq(ConfigUtils.readTopic)
    val consumerGroup: String = ConfigUtils.kShiftGroup

    if (args.isEmpty) {
      throw new IllegalArgumentException("please provide filename")
    }

    val fileName = args.head
    logger.info(s"Found file name in argument: $fileName")
    val configStr = readFromFile(fileName) /* Implement this */
    val conf: MySqlConfig = Serialization.read[DatabaseConfig](configStr)

    val sql: String = conf.sql.get
    val fullTableName = conf.tableName.split('.')
    val tableSchema = fullTableName.head
    val table: String = fullTableName.tail.head
    val source = conf.source
    var batchpush = false
    val lockFileName = s"${topics}_file.lck"

    try {
      acquireLock(lockFileName) /* Implement this */
      implicit val formats = Serialization.formats(NoTypeHints)

      val (offsets, latestOffsets, isReadRequired) = createOffsetRange(kafkaBrokers, topics, consumerGroup, None)

      if (isReadRequired) {

        val fromOffset: Map[String, Map[String, Long]] = offsets.map { o =>
          (o.topic, o.topicPartition.partition(), o.fromOffset)
        }.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)

        if(fromOffset.forall(o=> o._2.forall(t=> t._2==0))){
          batchpush = true
        }

        val fromOffsetStr: String = write(fromOffset)

        val toOffset: Map[String, Map[String, Long]] = offsets.map { o =>
          (o.topic, o.topicPartition.partition(), o.untilOffset)
        }.groupBy(_._1).mapValues(value => value.map(t => (t._2.toString, t._3)).toMap)

        val toOffsetStr: String = write(toOffset)

        val df: DataFrame = spark.
          read.
          format("kafka").
          option("kafka.bootstrap.servers", kafkaBrokers).
          option("subscribe", topics.mkString(",")).
          option("startingOffsets", fromOffsetStr).
          option("endingOffsets", toOffsetStr).
          load().cache

        df.createOrReplaceTempView("raw_data")

        val tfm = if(source == "mysql") {
          logger.info(s"Execute data transformation query: $sql")
          spark.sql(sql)
        } else {
          spark.sql(
            """
                with a as(
                    select cast(key as string) key, cast(value as string) value,
                            timestamp as ingestion_time,topic,partition,offset
                from raw_data
                ),
                b as
                (
                    select *,row_number() over(partition by key order by topic,partition,offset desc ) r from a
                )
                select * from b where r = 1
                """).drop("r").createOrReplaceTempView("dedup_data")
          // create or replace temp view casted_data as
          logger.info(s"Execute data transformation query: $sql")
          spark.sql(sql)
        }

        val columnNames = tfm.schema.map(_.name).map(c => s""" "$c" """).mkString(",")
        tfm.createOrReplaceTempView("tfm")

        val spectrumTableName = s"misc.${table}_realtime_tmp"
        spark.sql(s"drop table if exists $spectrumTableName")
        spark.table("tfm").repartition(3).write.format("parquet").mode("overwrite").saveAsTable(spectrumTableName)

        val redshiftSql: String =
          s"""
             |create temp table ${table}_realtime_staging
             |        distkey(key)
             |      sortkey(key)
             |      as
             |      select * from misc_e.${table}_realtime_tmp;
             |
             |      CREATE TABLE if not exists $tableSchema.${table}_realtime (like ${table}_realtime_staging);
             |
             |      delete from $tableSchema.${table}_realtime
             |      where key in (select key from ${table}_realtime_staging where key is not null);
             |
             |      insert into $tableSchema.${table}_realtime($columnNames)
             |      select $columnNames from ${table}_realtime_staging where raw_json != 'Deleted';
             |
             |      select 1;
         """.stripMargin

        logger.info(s"Excecuting Below Query in Redshift: $redshiftSql")
        RedshiftUtils.executeRedshiftSql(redshiftSql) /* Implement this */

        if (!SparkKafkaUtils09.commitOffsets(kafkaBrokers, consumerGroup, latestOffsets)) {
          logger.warn("Commit Offsets failed")
        } else {
          logger.info("Commit Offsets successful!")
        }

      }

    } catch {
      case e: Exception => {
        val stackTrace = org.apache.commons.lang.exception.ExceptionUtils.getStackTrace(e)
        logger.info(s"Releasing lock due to exception.. $stackTrace")
        releaseLock(lockFileName) /* Implement this */
        throw new Exception("exiting due to exception..", e)
      }

    } finally {
      //release the lock
      releaseLock(lockFileName) /* Implement this */
    }

    logger.info("All Done!!!")
    System.exit(0)
  }
}

相关问题