org.apache.kafka.clients.admin.AdminClient.deleteRecords()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(2.4k)|赞(0)|评价(0)|浏览(222)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.deleteRecords()方法的一些代码示例,展示了AdminClient.deleteRecords()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.deleteRecords()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:deleteRecords

AdminClient.deleteRecords介绍

[英]Delete records whose offset is smaller than the given offset of the corresponding partition. This is a convenience method for #deleteRecords(Map,DeleteRecordsOptions) with default options. See the overload for more details. This operation is supported by brokers with version 0.11.0.0 or higher.
[中]删除偏移量小于相应分区给定偏移量的记录。这是带有默认选项的#deleteRecords(映射、DeleteRecordsOptions)的便捷方法。有关更多详细信息,请参阅重载。版本为0.11.0.0或更高版本的代理支持此操作。

代码示例

代码示例来源:origin: apache/kafka

/**
 * Delete records whose offset is smaller than the given offset of the corresponding partition.
 *
 * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
 * @return                      The DeleteRecordsResult.
 */
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
  return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}

代码示例来源:origin: apache/kafka

recordsToDelete.put(myTopicPartition4, RecordsToDelete.beforeOffset(10L));
DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete);

代码示例来源:origin: org.apache.kafka/kafka-streams

void maybePurgeCommitedRecords() {
  // we do not check any possible exceptions since none of them are fatal
  // that should cause the application to fail, and we will try delete with
  // newer offsets anyways.
  if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) {
    if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) {
      log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks());
    }
    final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
    for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
      recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
    }
    deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
    log.trace("Sent delete-records request: {}", recordsToDelete);
  }
}

相关文章