org.apache.kafka.connect.data.Struct.getStruct()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(18.8k)|赞(0)|评价(0)|浏览(162)

本文整理了Java中org.apache.kafka.connect.data.Struct.getStruct()方法的一些代码示例,展示了Struct.getStruct()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.getStruct()方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:getStruct

Struct.getStruct介绍

[英]Equivalent to calling #get(String) and casting the result to a Struct.
[中]相当于调用#get(String)并将结果强制转换为结构。

代码示例

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSetNullRecordToNull() {
 // When:
 final SchemaAndValue msg = ProcessingLogMessageFactory.deserializationErrorMsg(
   error,
   Optional.empty()
 ).get();
 // Then:
 final Struct struct = (Struct) msg.value();
 final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
 assertThat(deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD), is(nullValue()));
}

代码示例来源:origin: debezium/debezium

private void assertGeomRecord(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  Testing.debug(after);
  assertThat(i).isNotNull();
  if (i == 1) {
    // INSERT INTO dbz_507_geometry VALUES (1, ST_GeomFromText('POINT(1 1)', 4326), ST_GeomFromText('LINESTRING(0 0, 1 1)', 3187), ST_GeomFromText('POLYGON((0 0, 1 1, 1 0, 0 0))'), ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(0 0, 1 1))', 4326));
    assertThat(after.getStruct("geom").getInt32("srid")).isEqualTo(4326);
    assertThat(DatatypeConverter.printHexBinary(after.getStruct("geom").getBytes("wkb"))).isEqualTo("0101000000000000000000F03F000000000000F03F");
    assertThat(after.getStruct("linestring").getInt32("srid")).isEqualTo(3187);
    assertThat(DatatypeConverter.printHexBinary(after.getStruct("linestring").getBytes("wkb"))).isEqualTo("01020000000200000000000000000000000000000000000000000000000000F03F000000000000F03F");
    assertThat(after.getStruct("polygon").getInt32("srid")).isEqualTo(null);
    assertThat(DatatypeConverter.printHexBinary(after.getStruct("polygon").getBytes("wkb"))).isEqualTo("0103000000010000000400000000000000000000000000000000000000000000000000F03F000000000000F03F000000000000F03F000000000000000000000000000000000000000000000000");
    assertThat(after.getStruct("collection").getInt32("srid")).isEqualTo(4326);
    assertThat(DatatypeConverter.printHexBinary(after.getStruct("collection").getBytes("wkb"))).isEqualTo("0107000000020000000101000000000000000000F03F000000000000F03F01020000000200000000000000000000000000000000000000000000000000F03F000000000000F03F");
  } else if (i == 2) {
    // INSERT INTO dbz_507_geometry VALUES (2, ST_GeomFromText('LINESTRING(0 0, 1 1)'), NULL, NULL, NULL);
    assertThat(after.getStruct("geom").getInt32("srid")).isEqualTo(null);
    assertThat(DatatypeConverter.printHexBinary(after.getStruct("geom").getBytes("wkb"))).isEqualTo("01020000000200000000000000000000000000000000000000000000000000F03F000000000000F03F");
    assertThat(after.getStruct("linestring")).isNull();
    assertThat(after.getStruct("polygon")).isNull();
    assertThat(after.getStruct("collection")).isNull();
  }
}

代码示例来源:origin: debezium/debezium

protected static Struct valueFor(SourceRecord record) {
  Struct envelope = (Struct) record.value();
  Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
  if (afterField != null) {
    return envelope.getStruct(afterField.name());
  }
  return null;
}

代码示例来源:origin: debezium/debezium

protected static Struct sourceFor(SourceRecord record) {
    Struct envelope = (Struct) record.value();
    Field field = envelope.schema().field(Envelope.FieldName.SOURCE);
    if (field != null) {
      return envelope.getStruct(field.name());
    }
    return null;
  }
}

代码示例来源:origin: debezium/debezium

protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedSchemaAndValuesByColumn,
                      SourceRecord record,
                      String envelopeFieldName) {
  Struct content = ((Struct) record.value()).getStruct(envelopeFieldName);
  if (expectedSchemaAndValuesByColumn == null) {
    assertThat(content).isNull();
  }
  else {
    assertNotNull("expected there to be content in Envelope under " + envelopeFieldName, content);
    expectedSchemaAndValuesByColumn.forEach(
        schemaAndValueField -> schemaAndValueField.assertFor(content)
    );
  }
}

代码示例来源:origin: debezium/debezium

protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) {
  if (record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)) {
    assertThat(record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)).isTrue();
    Struct value = (Struct) record.value();
    assertThat(value.getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.INITIAL_SYNC)).isTrue();
  } else {
    // Only the last record in the initial sync should be marked as not being part of the initial sync ...
    assertThat(foundLast.getAndSet(true)).isFalse();
  }
}

代码示例来源:origin: debezium/debezium

private void assertPoint(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  Testing.debug(after);
  assertThat(i).isNotNull();
  Double expectedX = after.getFloat64("expected_x");
  Double expectedY = after.getFloat64("expected_y");
  Integer expectedSrid = after.getInt32("expected_srid");
  if (after.getStruct("point") != null) {
    Double actualX = after.getStruct("point").getFloat64("x");
    Double actualY = after.getStruct("point").getFloat64("y");
    Integer actualSrid = after.getStruct("point").getInt32("srid");
    //Validate the values
    databaseDifferences.geometryAssertPoints(expectedX, expectedY, actualX, actualY);
    assertThat(actualSrid).isEqualTo(expectedSrid);
    //Test WKB
    Point point = (Point) WkbGeometryReader.readGeometry(new ByteReader((byte[]) after.getStruct("point")
        .get("wkb")));
    databaseDifferences.geometryAssertPoints(expectedX, expectedY, point.getX(), point.getY());
  } else if (expectedX != null) {
    Assert.fail("Got a null geometry but didn't expect to");
  }
}

代码示例来源:origin: confluentinc/ksql

struct.get(ProcessingLogMessageFactory.TYPE),
  equalTo(MessageType.DESERIALIZATION_ERROR.ordinal()));
final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
assertThat(
  deserializationError.get(DESERIALIZATION_ERROR_FIELD_MESSAGE),

代码示例来源:origin: debezium/debezium

protected void verifyNotFromInitialSync(SourceRecord record) {
  assertThat(record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)).isFalse();
  Struct value = (Struct) record.value();
  assertThat(value.getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.INITIAL_SYNC)).isNull();
}

代码示例来源:origin: debezium/debezium

protected void assertSourceInfo(SourceRecord record, String db, String schema, String table) {
  assertTrue(record.value() instanceof Struct);
  Struct source = ((Struct) record.value()).getStruct("source");
  assertEquals(db, source.getString("db"));
  assertEquals(schema, source.getString("schema"));
  assertEquals(table, source.getString("table"));
}

代码示例来源:origin: debezium/debezium

protected void assertSourceInfo(SourceRecord record) {
  assertTrue(record.value() instanceof Struct);
  Struct source = ((Struct) record.value()).getStruct("source");
  assertNotNull(source.getString("db"));
  assertNotNull(source.getString("schema"));
  assertNotNull(source.getString("table"));
}

代码示例来源:origin: debezium/debezium

@Test
@FixFor("DBZ-878")
public void shouldReplaceInvalidTopicNameCharacters() throws Exception {
  String setupStmt = SETUP_TABLES_STMT +
            "CREATE TABLE s1.\"dbz_878_some|test@data\" (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
            "INSERT INTO s1.\"dbz_878_some|test@data\" (aa) VALUES (123);";
  TestHelper.execute(setupStmt);
  Configuration.Builder configBuilder = TestHelper.defaultConfig()
                          .with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
                          .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
                          .with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1")
                          .with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.dbz_878_some\\|test@data");
  start(PostgresConnector.class, configBuilder.build());
  assertConnectorIsRunning();
  SourceRecords actualRecords = consumeRecordsByTopic(1);
  List<SourceRecord> records = actualRecords.recordsForTopic(topicName("s1.dbz_878_some_test_data"));
  assertThat(records.size()).isEqualTo(1);
  SourceRecord record = records.get(0);
  VerifyRecord.isValidRead(record, PK_FIELD, 1);
  String sourceTable = ((Struct)record.value()).getStruct("source").getString("table");
  assertThat(sourceTable).isEqualTo("dbz_878_some|test@data");
}

代码示例来源:origin: debezium/debezium

@Test
public void shouldGenerateRecordForInsertEvent() throws InterruptedException {
  CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
  BsonTimestamp ts = new BsonTimestamp(1000, 1);
  ObjectId objId = new ObjectId();
  Document obj = new Document().append("_id", objId).append("name", "Sally");
  Document event = new Document().append("o", obj)
                  .append("ns", "dbA.c1")
                  .append("ts", ts)
                  .append("h", Long.valueOf(12345678))
                  .append("op", "i");
  RecordsForCollection records = recordMakers.forCollection(collectionId);
  records.recordEvent(event, 1002);
  assertThat(produced.size()).isEqualTo(1);
  SourceRecord record = produced.get(0);
  Struct key = (Struct) record.key();
  Struct value = (Struct) record.value();
  assertThat(key.schema()).isSameAs(record.keySchema());
  assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}");
  assertThat(value.schema()).isSameAs(record.valueSchema());
  // assertThat(value.getString(FieldName.BEFORE)).isNull();
  assertThat(value.getString(FieldName.AFTER)).isEqualTo(obj.toJson(WRITER_SETTINGS));
  assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.CREATE.code());
  assertThat(value.getInt64(FieldName.TIMESTAMP)).isEqualTo(1002L);
  Struct actualSource = value.getStruct(FieldName.SOURCE);
  Struct expectedSource = source.lastOffsetStruct("rs0", collectionId);
  assertThat(actualSource).isEqualTo(expectedSource);
}

代码示例来源:origin: debezium/debezium

private void assertBigintUnsignedPrecise(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting org.apache.kafka.connect.data.Decimal:Byte  since we are dealing with unsignd-bigint
  //So Unsigned BIGINY would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Decimal.builder(0).schema());
  assertThat(after.schema().field("c2").schema()).isEqualTo(Decimal.builder(0).schema());
  //Validate the schema first, we are expecting int-64 since we are dealing with signed-bigint.
  //So Signed BIGINT would be an INT64 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT64_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("18446744073709551615"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("18446744073709551615"));
    assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L);
    break;
  case 2:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("14446744073709551615"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("14446744073709551615"));
    assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L);
    break;
  case 3:
    assertThat(after.get("c1")).isEqualTo(new BigDecimal("0"));
    assertThat(after.get("c2")).isEqualTo(new BigDecimal("0"));
    assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L);
  }
}

代码示例来源:origin: debezium/debezium

@Test
@FixFor("DBZ-582")
public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
  RecordMakers recordMakers = new RecordMakers(filters, source, topicSelector, produced::add, false);
  BsonTimestamp ts = new BsonTimestamp(1000, 1);
  CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
  ObjectId objId = new ObjectId();
  Document obj = new Document("_id", objId);
  Document event = new Document().append("o", obj)
      .append("ns", "dbA.c1")
      .append("ts", ts)
      .append("h", new Long(12345678))
      .append("op", "d");
  RecordsForCollection records = recordMakers.forCollection(collectionId);
  records.recordEvent(event, 1002);
  assertThat(produced.size()).isEqualTo(1);
  SourceRecord record = produced.get(0);
  Struct key = (Struct) record.key();
  Struct value = (Struct) record.value();
  assertThat(key.schema()).isSameAs(record.keySchema());
  assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
  assertThat(value.schema()).isSameAs(record.valueSchema());
  assertThat(value.getString(FieldName.AFTER)).isNull();
  assertThat(value.getString("patch")).isNull();
  assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.DELETE.code());
  assertThat(value.getInt64(FieldName.TIMESTAMP)).isEqualTo(1002L);
  Struct actualSource = value.getStruct(FieldName.SOURCE);
  Struct expectedSource = source.lastOffsetStruct("rs0", collectionId);
  assertThat(actualSource).isEqualTo(expectedSource);
}

代码示例来源:origin: debezium/debezium

VerifyRecord.isValidInsert(insert, "id", 1);
String sourceTable = ((Struct)insert.value()).getStruct("source").getString("table");
assertThat(sourceTable).isEqualTo("dbz_878_some|test@data");

代码示例来源:origin: debezium/debezium

private void assertSmallUnsigned(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-32 since we are dealing with unsignd-smallint
  //So Unsigned SMALLINT would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate the schema first, we are expecting int-16 since we are dealing with signed-smallint.
  //So Signed SMALLINT would be an INT16 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT16_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.getInt32("c1")).isEqualTo(65535);
    assertThat(after.getInt32("c2")).isEqualTo(65535);
    assertThat(after.getInt16("c3")).isEqualTo((short)32767);
    break;
  case 2:
    assertThat(after.getInt32("c1")).isEqualTo(45535);
    assertThat(after.getInt32("c2")).isEqualTo(45535);
    assertThat(after.getInt16("c3")).isEqualTo((short)-12767);
    break;
  case 3:
    assertThat(after.getInt32("c1")).isEqualTo(0);
    assertThat(after.getInt32("c2")).isEqualTo(0);
    assertThat(after.getInt16("c3")).isEqualTo((short)-32768);
  }
}

代码示例来源:origin: debezium/debezium

private void assertMediumUnsigned(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-32 since we are dealing with unsignd-mediumint
  //So Unsigned MEDIUMINT would be an int32 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate the schema first, we are expecting int-32 since we are dealing with signed-mediumint.
  //So Signed MEDIUMINT would be an INT32 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT32_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.getInt32("c1")).isEqualTo(16777215);
    assertThat(after.getInt32("c2")).isEqualTo(16777215);
    assertThat(after.getInt32("c3")).isEqualTo(8388607);
    break;
  case 2:
    assertThat(after.getInt32("c1")).isEqualTo(10777215);
    assertThat(after.getInt32("c2")).isEqualTo(10777215);
    assertThat(after.getInt32("c3")).isEqualTo(-6388607);
    break;
  case 3:
    assertThat(after.getInt32("c1")).isEqualTo(0);
    assertThat(after.getInt32("c2")).isEqualTo(0);
    assertThat(after.getInt32("c3")).isEqualTo(-8388608);
  }
}

代码示例来源:origin: debezium/debezium

private void assertBigintUnsignedLong(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-64 since we have forced Long mode for BIGINT UNSIGNED
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT64_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT64_SCHEMA);
  //Validate the schema first, we are expecting int-64 since we are dealing with signed-bigint.
  //So Signed BIGINT would be an INT64 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT64_SCHEMA);
  //Validate candidates values, note the loss in precision which is expected since BIGINT UNSIGNED cannot always be represented by
  //a long datatype.
  switch (i) {
    case 1:
      assertThat(after.getInt64("c1")).isEqualTo(-1L);
      assertThat(after.getInt64("c2")).isEqualTo(-1L);
      assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L);
      break;
    case 2:
      assertThat(after.getInt64("c1")).isEqualTo(-4000000000000000001L);
      assertThat(after.getInt64("c2")).isEqualTo(-4000000000000000001L);
      assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L);
      break;
    case 3:
      assertThat(after.getInt64("c1")).isEqualTo(0L);
      assertThat(after.getInt64("c2")).isEqualTo(0L);
      assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L);
  }
}

代码示例来源:origin: debezium/debezium

private void assertTinyintUnsigned(Struct value) {
  Struct after = value.getStruct(Envelope.FieldName.AFTER);
  Integer i = after.getInt32("id");
  assertThat(i).isNotNull();
  //Validate the schema first, we are expecting int-16 since we are dealing with unsignd-tinyint
  //So Unsigned TINYINT would be an INT16 type
  assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT16_SCHEMA);
  assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT16_SCHEMA);
  //Validate the schema first, we are expecting int-16 since we are dealing with signed-tinyint.
  // Note: the recommended mapping of Signed TINYINT is Short which is 16-bit. http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
  //So Signed TINYINT would be an INT16 type
  assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT16_SCHEMA);
  //Validate candidates values
  switch (i) {
  case 1:
    assertThat(after.getInt16("c1")).isEqualTo((short)255);
    assertThat(after.getInt16("c2")).isEqualTo((short)(255));
    assertThat(after.getInt16("c3")).isEqualTo((short)127);
    break;
  case 2:
    assertThat(after.getInt16("c1")).isEqualTo((short)155);
    assertThat(after.getInt16("c2")).isEqualTo((short)155);
    assertThat(after.getInt16("c3")).isEqualTo((short)-100);
    break;
  case 3:
    assertThat(after.getInt16("c1")).isEqualTo((short)0);
    assertThat(after.getInt16("c2")).isEqualTo((short)0);
    assertThat(after.getInt16("c3")).isEqualTo((short)-128);
  }
}

相关文章