org.apache.spark.executor.TaskMetrics.diskBytesSpilled()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(16.8k)|赞(0)|评价(0)|浏览(105)

本文整理了Java中org.apache.spark.executor.TaskMetrics.diskBytesSpilled()方法的一些代码示例,展示了TaskMetrics.diskBytesSpilled()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TaskMetrics.diskBytesSpilled()方法的具体详情如下:
包路径:org.apache.spark.executor.TaskMetrics
类名称:TaskMetrics
方法名:diskBytesSpilled

TaskMetrics.diskBytesSpilled介绍

暂无

代码示例

代码示例来源:origin: org.apache.spark/spark-core_2.10

private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
 memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 writer.write(dataToWrite.iterator());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
 memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 writer.write(dataToWrite.iterator());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core

private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
 memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 writer.write(dataToWrite.iterator());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: apache/hive

public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
 this(
  metrics.executorDeserializeTime(),
  TimeUnit.NANOSECONDS.toMillis(metrics.executorDeserializeCpuTime()),
  metrics.executorRunTime(),
  TimeUnit.NANOSECONDS.toMillis(metrics.executorCpuTime()),
  metrics.resultSize(),
  metrics.jvmGCTime(),
  metrics.resultSerializationTime(),
  metrics.memoryBytesSpilled(),
  metrics.diskBytesSpilled(),
  taskInfo.duration(),
  optionalInputMetric(metrics),
  optionalShuffleReadMetric(metrics),
  optionalShuffleWriteMetrics(metrics),
  optionalOutputMetrics(metrics));
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
 memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
 for (int i = 0; i < 10 + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, bigByteArray));
 }
 writer.write(dataToWrite.iterator());
 assertEquals(2, spillFilesCreated.size());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
 memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
 for (int i = 0; i < 10 + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, bigByteArray));
 }
 writer.write(dataToWrite.iterator());
 assertEquals(2, spillFilesCreated.size());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
 memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES);
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
 for (int i = 0; i < 10 + 1; i++) {
  dataToWrite.add(new Tuple2<>(i, bigByteArray));
 }
 writer.write(dataToWrite.iterator());
 assertEquals(2, spillFilesCreated.size());
 writer.stop(true);
 readRecordsFromFile();
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
 assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
 assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());

代码示例来源:origin: org.apache.spark/spark-core_2.10

ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());

代码示例来源:origin: org.apache.spark/spark-core

ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L));
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void writeWithoutSpilling() throws Exception {
 // In this example, each partition should have exactly one record:
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < NUM_PARTITITONS; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(dataToWrite.iterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 long sumOfPartitionSizes = 0;
 for (long size: partitionSizesInMergedFile) {
  // All partitions should be the same size:
  assertEquals(partitionSizesInMergedFile[0], size);
  sumOfPartitionSizes += size;
 }
 assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
 assertEquals(
  HashMultiset.create(dataToWrite),
  HashMultiset.create(readRecordsFromFile()));
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void writeWithoutSpilling() throws Exception {
 // In this example, each partition should have exactly one record:
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < NUM_PARTITITONS; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(dataToWrite.iterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 long sumOfPartitionSizes = 0;
 for (long size: partitionSizesInMergedFile) {
  // All partitions should be the same size:
  assertEquals(partitionSizesInMergedFile[0], size);
  sumOfPartitionSizes += size;
 }
 assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
 assertEquals(
  HashMultiset.create(dataToWrite),
  HashMultiset.create(readRecordsFromFile()));
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void writeWithoutSpilling() throws Exception {
 // In this example, each partition should have exactly one record:
 final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
 for (int i = 0; i < NUM_PARTITITONS; i++) {
  dataToWrite.add(new Tuple2<>(i, i));
 }
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(dataToWrite.iterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 long sumOfPartitionSizes = 0;
 for (long size: partitionSizesInMergedFile) {
  // All partitions should be the same size:
  assertEquals(partitionSizesInMergedFile[0], size);
  sumOfPartitionSizes += size;
 }
 assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);
 assertEquals(
  HashMultiset.create(dataToWrite),
  HashMultiset.create(readRecordsFromFile()));
 assertSpillFilesWereCleanedUp();
 ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
 assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
 assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void testDiskSpilledBytes() throws Exception {
 final UnsafeExternalSorter sorter = newSorter();
 long[] record = new long[100];
 int recordSize = record.length * 8;
 int n = (int) pageSizeBytes / recordSize * 3;
 for (int i = 0; i < n; i++) {
  record[0] = (long) i;
  sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false);
 }
 // We will have at-least 2 memory pages allocated because of rounding happening due to
 // integer division of pageSizeBytes and recordSize.
 assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() == 0);
 UnsafeExternalSorter.SpillableIterator iter =
     (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
 assertTrue(iter.spill() > 0);
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
 assertEquals(0, iter.spill());
 // Even if we did not spill second time, the disk spilled bytes should still be non-zero
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
 sorter.cleanupResources();
 assertSpillFilesWereCleanedUp();
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void testDiskSpilledBytes() throws Exception {
 final UnsafeExternalSorter sorter = newSorter();
 long[] record = new long[100];
 int recordSize = record.length * 8;
 int n = (int) pageSizeBytes / recordSize * 3;
 for (int i = 0; i < n; i++) {
  record[0] = (long) i;
  sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false);
 }
 // We will have at-least 2 memory pages allocated because of rounding happening due to
 // integer division of pageSizeBytes and recordSize.
 assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() == 0);
 UnsafeExternalSorter.SpillableIterator iter =
     (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
 assertTrue(iter.spill() > 0);
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
 assertEquals(0, iter.spill());
 // Even if we did not spill second time, the disk spilled bytes should still be non-zero
 assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
 sorter.cleanupResources();
 assertSpillFilesWereCleanedUp();
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void writeEmptyIterator() throws Exception {
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(Iterators.emptyIterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
 assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
 assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
}

代码示例来源:origin: org.apache.spark/spark-core_2.10

@Test
public void writeEmptyIterator() throws Exception {
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(Iterators.emptyIterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
 assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
 assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
}

代码示例来源:origin: org.apache.spark/spark-core

@Test
public void writeEmptyIterator() throws Exception {
 final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
 writer.write(Iterators.emptyIterator());
 final Option<MapStatus> mapStatus = writer.stop(true);
 assertTrue(mapStatus.isDefined());
 assertTrue(mergedOutputFile.exists());
 assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
 assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
 assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
 assertEquals(0, taskMetrics.diskBytesSpilled());
 assertEquals(0, taskMetrics.memoryBytesSpilled());
}

代码示例来源:origin: com.github.hyukjinkwon/spark-client

public Metrics(TaskMetrics metrics) {
 this(
  metrics.executorDeserializeTime(),
  metrics.executorRunTime(),
  metrics.resultSize(),
  metrics.jvmGCTime(),
  metrics.resultSerializationTime(),
  metrics.memoryBytesSpilled(),
  metrics.diskBytesSpilled(),
  optionalInputMetric(metrics),
  optionalShuffleReadMetric(metrics),
  optionalShuffleWriteMetrics(metrics));
}

代码示例来源:origin: org.spark-project.hive/spark-client

public Metrics(TaskMetrics metrics) {
 this(
  metrics.executorDeserializeTime(),
  metrics.executorRunTime(),
  metrics.resultSize(),
  metrics.jvmGCTime(),
  metrics.resultSerializationTime(),
  metrics.memoryBytesSpilled(),
  metrics.diskBytesSpilled(),
  optionalInputMetric(metrics),
  optionalShuffleReadMetric(metrics),
  optionalShuffleWriteMetrics(metrics));
}

相关文章