本文整理了Java中org.apache.kafka.clients.admin.AdminClient.deleteRecords()
方法的一些代码示例,展示了AdminClient.deleteRecords()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.deleteRecords()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:deleteRecords
[英]Delete records whose offset is smaller than the given offset of the corresponding partition. This is a convenience method for #deleteRecords(Map,DeleteRecordsOptions) with default options. See the overload for more details. This operation is supported by brokers with version 0.11.0.0 or higher.
[中]删除偏移量小于相应分区给定偏移量的记录。这是带有默认选项的#deleteRecords(映射、DeleteRecordsOptions)的便捷方法。有关更多详细信息,请参阅重载。版本为0.11.0.0或更高版本的代理支持此操作。
代码示例来源:origin: apache/kafka
/**
* Delete records whose offset is smaller than the given offset of the corresponding partition.
*
* This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
* @return The DeleteRecordsResult.
*/
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}
代码示例来源:origin: apache/kafka
recordsToDelete.put(myTopicPartition4, RecordsToDelete.beforeOffset(10L));
DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete);
代码示例来源:origin: org.apache.kafka/kafka-streams
void maybePurgeCommitedRecords() {
// we do not check any possible exceptions since none of them are fatal
// that should cause the application to fail, and we will try delete with
// newer offsets anyways.
if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) {
if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) {
log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks());
}
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
}
deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
log.trace("Sent delete-records request: {}", recordsToDelete);
}
}
内容来源于网络,如有侵权,请联系作者删除!