使用kafka connect和table.whitelist强制转换数字字段

4uqofj5v  于 2021-06-15  发布在  Cassandra
关注(0)|答案(0)|浏览(215)

我为kafka connect confluent 5.0创建了一个源连接器和一个接收器连接器,将两个sqlserver表推送到我的datalake
以下是我的sqlserver表架构:

CREATE TABLE MYBASE.dbo.TABLE1 (
id_field int IDENTITY(1,1) NOT NULL,
my_numericfield numeric(24,6) NULL,
time_field smalldatetime NULL,
CONSTRAINT PK_CBMARQ_F_COMPTEGA PRIMARY KEY (id_field)
) GO

我的Cassandra模式:

create table TEST-TABLE1(my_numericfield decimal, id_field int, time_field timestamp, PRIMARY KEY (id_field));

以下是源配置,带有一个白名单参数:

{
"config":
{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://localhost:1433;database=MYBASE",
    "connection.user": "admin",
    "connection.password": "password",
    "table.whitelist": "TABLE1, TABLE2",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "time_field",
    "incrementing.column.name": "id_field",
    "validate.non.null": "false",
    "topic.prefix": "TEST-",
    "tasks.max": "8",
    "numeric.mapping":"best_fit"
},
"name": "sqlserver-MYBASE-test"
}

这是我的Flume接头:

{
"name": "s3-sink-MYBASE",
"config":
{
    "topics": "TEST-TABLE1, TEST_TABLE2",
    "topics.dir": "DATABASE_FULL",
    "s3.part.size": 5242880,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "tasks.max": 8,
    "schema.compatibility": "NONE",
    "s3.region": "eu-central-1",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "s3.bucket.name": "mydatalake",
    "flush.size": 1,
    "transforms":"InsertSourceDetails",  
    "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertSourceDetails.static.field":"DATABASE",
    "transforms.InsertSourceDetails.static.value":"MYBASE"
}
}

问题是,有些字段在sqlserver中是数字类型的,kafka在到达datalake时将它们转换为二进制
以下是schema\u注册表结果:

{"type": "record",
"name": "TEST-TABLE1",
"fields": [
{
  "name": "my_numericfield",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 6,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "6"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
},
{
  "name": "id_field",
  "type": "int"
},
{
  "name": "cbCreateur",
  "type": [
    "null",
    "string"
  ],
  "default": null
},
{
  "name": "time_field",
  "type": [
    "null",
    {
      "type": "long",
      "connect.version": 1,
      "connect.name": "org.apache.kafka.connect.data.Timestamp",
      "logicalType": "timestamp-millis"
    }
  ],
  "default": null
},
],
"connect.name": "TEST-TABLE1"}

以下是spark脚本和结果:

...: from pyspark.sql.functions import col 
...: AWS_ID='xxxxxxxxxxxxxxxxx'
...: AWS_KEY='xxxxxxxxxxxxxxxxx/'
...: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ID)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_KEY)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
...: spark.conf.set('spark.cassandra.connection.host', 'localhost')
...: spark.conf.set('spark.cassandra.connection.port', 9042)
...: spark.conf.set('spark.cassandra.auth.username', 'cassandra')
...: spark.conf.set('spark.cassandra.auth.password', 'cassandra')
...: 
...: 

   :    F_TEST-TABLE1 = spark.read.format('com.databricks.spark.avro').load('s3a://mydatalake/DATABASE_FULL/TEST-TABLE1').drop('partition')
...:    DF_TEST-TABLE1 = F_TEST-TABLE1.toDF(*[c.lower() for c in TEST-TABLE1.columns])
...: 
...: 

: DF_TEST-TABLE1.printSchema()
root
 |-- my_numericfield: binary (nullable = true)
 |-- id_field: integer (nullable = true)
 |-- time_field: long (nullable = true)

: DF_TEST-TABLE1.createTempView("event")

: spark.sql("select * from event").show(1, False)
+----------------+--------+--------------+
||my_numericfield|id_field|time_field    |
+----------------+-----------+-----------+
|[00]            | 5      |1542733800000 |
+----------------+--------+--------------+
only showing top 1 row

: DF_TEST-TABLE1.write.format('org.apache.spark.sql.cassandra').options(keyspace='sage_full', table='f_test-table1').option('confirm.truncate', True).save(mode='overwrite')
18/11/22 08:29:05 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 3)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [B@6d0d5743 of type class [B to java.lang.BigDecimal.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:190)

我正在尝试动态地强制转换字段以匹配数字类型(即float),但是如果事先不知道字段名,我就找不到方法来执行此操作
使用白名单参数,连接器处理两个表,连接器配置中没有字段描述
有没有一种方法可以动态地对所有的数字字段进行转换?
谢谢你的帮助

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题