org.apache.spark.SparkContext类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(202)

本文整理了Java中org.apache.spark.SparkContext类的一些代码示例,展示了SparkContext类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SparkContext类的具体详情如下:
包路径:org.apache.spark.SparkContext
类名称:SparkContext

SparkContext介绍

暂无

代码示例

代码示例来源:origin: apache/kylin

public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception {
  sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
  sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
  sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
  sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
 public void scalaSparkContext() {
  List<String> jars = List$.MODULE$.empty();
  Map<String, String> environment = Map$.MODULE$.empty();

  new SparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
  new SparkContext("local", "name", new SparkConf()).stop();
  new SparkContext("local", "name").stop();
  new SparkContext("local", "name", "sparkHome").stop();
  new SparkContext("local", "name", "sparkHome", jars).stop();
  new SparkContext("local", "name", "sparkHome", jars, environment).stop();
 }
}

代码示例来源:origin: apache/kylin

Class.forName("scala.collection.mutable.WrappedArray$ofRef") };
SparkConf conf = new SparkConf().setAppName("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
  KylinSparkJobListener jobListener = new KylinSparkJobListener();
  sc.sc().addSparkListener(jobListener);
  HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
  final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());

代码示例来源:origin: org.apache.spark/spark-core_2.11

public static void main(String[] args) throws Exception {
 assertNotEquals(0, args.length);
 assertEquals(args[0], "hello");
 new SparkContext().stop();
 synchronized (LOCK) {
  LOCK.notifyAll();
 }
}

代码示例来源:origin: scipr-lab/dizk

spark.sparkContext().conf().set("spark.files.overwrite", "true");
spark.sparkContext().conf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.sparkContext().conf().registerKryoClasses(SparkUtils.zksparkClasses());
sc = new JavaSparkContext(spark.sparkContext());
final Configuration config = new Configuration(numExecutors,
    numCores,
spark.sparkContext().conf().set("spark.files.overwrite", "true");
spark.sparkContext().conf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.sparkContext().conf().registerKryoClasses(SparkUtils.zksparkClasses());
sc = new JavaSparkContext(spark.sparkContext());
final Configuration config = new Configuration(numExecutors,
    numCores,
spark.sparkContext().conf().set("spark.files.overwrite", "true");
spark.sparkContext().conf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.sparkContext().conf().registerKryoClasses(SparkUtils.zksparkClasses());
spark.sparkContext().conf().set("spark.files.overwrite", "true");
spark.sparkContext().conf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.sparkContext().conf().registerKryoClasses(SparkUtils.zksparkClasses());

代码示例来源:origin: jgperrin/net.jgp.labs.spark

private void start() {
  SparkConf conf = new SparkConf().setAppName("Concurrency Lab 001")
    .setMaster(Config.MASTER);
  JavaSparkContext sc = new JavaSparkContext(conf);
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

  conf = spark.sparkContext().conf();
  System.out.println(conf.get("hello"));

  Dataset<Row> df = spark.sql("SELECT * from myView");
  df.show();
 }
}

代码示例来源:origin: apache/hive

SparkConf conf = new SparkConf();
String serverAddress = null;
int serverPort = -1;
for (Tuple2<String, String> e : conf.getAll()) {
 mapConf.put(e._1(), e._2());
 LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2());
 JavaSparkContext sc = new JavaSparkContext(conf);
 sc.sc().addSparkListener(new ClientListener());
 synchronized (jcLock) {
  jc = new JobContextImpl(sc, localTmpDir);

代码示例来源:origin: apache/hive

private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf)
  throws FileNotFoundException, MalformedURLException {
 String regJar = null;
 // the registrator jar should already be in CP when not in test mode
 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST)) {
  String kryoReg = sparkConf.get("spark.kryo.registrator", "");
  if (SparkClientUtilities.HIVE_KRYO_REG_NAME.equals(kryoReg)) {
   regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf);
   SparkClientUtilities.addJarToContextLoader(new File(regJar));
  }
 }
 sc = new JavaSparkContext(sparkConf);
 if (regJar != null) {
  sc.addJar(regJar);
 }
 jobMetricsListener = new JobMetricsListener();
 sc.sc().addSparkListener(jobMetricsListener);
}

代码示例来源:origin: org.apache.spark/spark-core_2.11

@Test
public void foreachPartition() {
 LongAccumulator accum = sc.sc().longAccumulator();
 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
 rdd.foreachPartition(iter -> {
  while (iter.hasNext()) {
   iter.next();
   accum.add(1);
  }
 });
 assertEquals(2, accum.value().intValue());
}

代码示例来源:origin: apache/tinkerpop

private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration) {
    // DCL is safe in this case due to volatility
    if (!INITIALIZED) {
      synchronized (UnshadedKryoShimService.class) {
        if (!INITIALIZED) {
          // so we don't get a WARN that a new configuration is being created within an active context
          final SparkConf sparkConf = null == Spark.getContext() ? new SparkConf() : Spark.getContext().getConf().clone();
          configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
          final KryoSerializer serializer = new KryoSerializer(sparkConf);
          // Setup a pool backed by our spark.serializer instance
          // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf)
          KRYOS.clear();
          final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
          for (int i = 0; i < poolSize; i++) {
            KRYOS.add(serializer.newKryo());
          }
          INITIALIZED = true;
        }
      }
    }

    return KRYOS;
  }
}

代码示例来源:origin: uber/marmaray

/**
 * Creates JavaSparkContext if its hasn't been created yet, or returns the instance. {@link #addSchema(Schema)} and
 * {@link #addSchemas(Collection)} must not be called once the JavaSparkContext has been created
 * @return the JavaSparkContext that will be used to execute the JobDags
 */
public JavaSparkContext getOrCreateSparkContext() {
  if (!this.sparkContext.isPresent()) {
    this.sparkContext = Optional.of(new JavaSparkContext(
        SparkUtil.getSparkConf(
          this.appName, Optional.of(this.schemas), this.serializationClasses, this.conf)));
    this.sparkContext.get().sc().addSparkListener(new SparkEventListener());
    // Adding hadoop configuration to default
    this.sparkContext.get().sc().hadoopConfiguration().addResource(
      new HadoopConfiguration(conf).getHadoopConf());
    this.appId = this.sparkContext.get().sc().applicationId();
  }
  return this.sparkContext.get();
}

代码示例来源:origin: com.holdenkarau/spark-testing-base_2.11

@Before
public void runBefore() {
 initialized = (_sc != null);
 if (!initialized) {
  _sc = new SparkContext(conf());
  _jsc = new JavaSparkContext(_sc);
  beforeAllTestCasesHook();
 }
}

代码示例来源:origin: Impetus/Kundera

@Override
public void initialize(Map<String, Object> puProperties)
{
  reader = new SparkEntityReader(kunderaMetadata);
  setExternalProperties(puProperties);
  initializePropertyReader();
  PersistenceUnitMetadata pum = kunderaMetadata.getApplicationMetadata().getPersistenceUnitMetadata(
      getPersistenceUnit());
  sparkconf = new SparkConf(true);
  configureClientProperties(pum);
  sparkContext = new SparkContext(sparkconf);
  sqlContext = new HiveContext(sparkContext);
}

代码示例来源:origin: io.snappydata/snappydata-core

public static synchronized SnappySharedState create(SparkContext sparkContext)
  throws SparkException {
 // force in-memory catalog to avoid initializing hive for SnappyData
 final String catalogImpl = sparkContext.conf().get(CATALOG_IMPLEMENTATION, null);
 // there is a small thread-safety issue in that if multiple threads
 // are initializing normal concurrently SparkSession vs SnappySession
 // then former can land up with in-memory catalog too
 sparkContext.conf().set(CATALOG_IMPLEMENTATION, "in-memory");
 createListenerAndUI(sparkContext);
 final SnappySharedState sharedState = new SnappySharedState(sparkContext);
 // reset the catalog implementation to original
 if (catalogImpl != null) {
  sparkContext.conf().set(CATALOG_IMPLEMENTATION, catalogImpl);
 } else {
  sparkContext.conf().remove(CATALOG_IMPLEMENTATION);
 }
 return sharedState;
}

代码示例来源:origin: apache/hive

@Override
public int getDefaultParallelism() throws Exception {
 return sc.sc().defaultParallelism();
}

代码示例来源:origin: apache/hive

@Override
public int getExecutorCount() {
 return sc.sc().getExecutorMemoryStatus().size();
}

代码示例来源:origin: apache/hive

@Override
public void cleanup() {
 jobMetricsListener.cleanup(jobId);
 if (cachedRDDIds != null) {
  for (Integer cachedRDDId: cachedRDDIds) {
   sparkContext.sc().unpersistRDD(cachedRDDId, false);
  }
 }
}

代码示例来源:origin: org.apache.spark/spark-sql

@Test
public void testForeach() {
 LongAccumulator accum = jsc.sc().longAccumulator();
 List<String> data = Arrays.asList("a", "b", "c");
 Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
 ds.foreach((ForeachFunction<String>) s -> accum.add(1));
 Assert.assertEquals(3, accum.value().intValue());
}

代码示例来源:origin: apache/hive

@Override
public SparkConf getSparkConf() {
 return sc.sc().conf();
}

代码示例来源:origin: cloudera-labs/envelope

@Test
public void testDefaultStreamingConfiguration() {
 Config config = ConfigFactory.empty();
 Contexts.initialize(config, Contexts.ExecutionMode.STREAMING);
 SparkConf sparkConf = Contexts.getSparkSession().sparkContext().getConf();
 assertTrue(sparkConf.contains("spark.dynamicAllocation.enabled"));
 assertTrue(sparkConf.contains("spark.sql.shuffle.partitions"));
 assertEquals(sparkConf.get("spark.sql.catalogImplementation"), "hive");
}

相关文章