本文整理了Java中org.apache.spark.executor.TaskMetrics.shuffleWriteMetrics()
方法的一些代码示例,展示了TaskMetrics.shuffleWriteMetrics()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TaskMetrics.shuffleWriteMetrics()
方法的具体详情如下:
包路径:org.apache.spark.executor.TaskMetrics
类名称:TaskMetrics
方法名:shuffleWriteMetrics
暂无
代码示例来源:origin: apache/hive
public ShuffleWriteMetrics(TaskMetrics metrics) {
this(metrics.shuffleWriteMetrics().shuffleBytesWritten(),
metrics.shuffleWriteMetrics().shuffleWriteTime(),
metrics.shuffleWriteMetrics().shuffleRecordsWritten());
}
代码示例来源:origin: apache/hive
private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
writer.write(dataToWrite.iterator());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
writer.write(dataToWrite.iterator());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core
private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
writer.write(dataToWrite.iterator());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
for (int i = 0; i < 10 + 1; i++) {
dataToWrite.add(new Tuple2<>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
for (int i = 0; i < 10 + 1; i++) {
dataToWrite.add(new Tuple2<>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
for (int i = 0; i < 10 + 1; i++) {
dataToWrite.add(new Tuple2<>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
代码示例来源:origin: org.apache.spark/spark-core
BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void writeWithoutSpilling() throws Exception {
// In this example, each partition should have exactly one record:
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < NUM_PARTITITONS; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
long sumOfPartitionSizes = 0;
for (long size: partitionSizesInMergedFile) {
// All partitions should be the same size:
assertEquals(partitionSizesInMergedFile[0], size);
sumOfPartitionSizes += size;
}
assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
assertEquals(
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void writeWithoutSpilling() throws Exception {
// In this example, each partition should have exactly one record:
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < NUM_PARTITITONS; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
long sumOfPartitionSizes = 0;
for (long size: partitionSizesInMergedFile) {
// All partitions should be the same size:
assertEquals(partitionSizesInMergedFile[0], size);
sumOfPartitionSizes += size;
}
assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
assertEquals(
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void writeWithoutSpilling() throws Exception {
// In this example, each partition should have exactly one record:
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
for (int i = 0; i < NUM_PARTITITONS; i++) {
dataToWrite.add(new Tuple2<>(i, i));
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
long sumOfPartitionSizes = 0;
for (long size: partitionSizesInMergedFile) {
// All partitions should be the same size:
assertEquals(partitionSizesInMergedFile[0], size);
sumOfPartitionSizes += size;
}
assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
assertEquals(
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
this.serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
代码示例来源:origin: org.apache.spark/spark-core_2.11
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
代码示例来源:origin: org.apache.spark/spark-core_2.10
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
代码示例来源:origin: org.apache.spark/spark-core
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
内容来源于网络,如有侵权,请联系作者删除!