org.apache.spark.sql.DataFrameWriter.insertInto()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(105)

本文整理了Java中org.apache.spark.sql.DataFrameWriter.insertInto()方法的一些代码示例,展示了DataFrameWriter.insertInto()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataFrameWriter.insertInto()方法的具体详情如下:
包路径:org.apache.spark.sql.DataFrameWriter
类名称:DataFrameWriter
方法名:insertInto

DataFrameWriter.insertInto介绍

暂无

代码示例

代码示例来源:origin: Impetus/Kundera

@Override
  public void saveDataFrame(DataFrame dataFrame, Class<?> entityClazz, Map<String, Object> properties)
  {
    dataFrame.sqlContext().sql("use " + (String) properties.get(KEYSPACE));
    dataFrame.write().insertInto((String) properties.get(TABLE));
  }
}

代码示例来源:origin: Impetus/Kundera

@Override
public boolean persist(List listEntity, EntityMetadata m, SparkClient sparkClient)
{
  try
  {
    Seq s = scala.collection.JavaConversions.asScalaBuffer(listEntity).toList();
    ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(m.getEntityClazz());
    JavaRDD personRDD = sparkClient.sparkContext.parallelize(s, 1, tag).toJavaRDD();
    DataFrame df = sparkClient.sqlContext.createDataFrame(personRDD, m.getEntityClazz());
    sparkClient.sqlContext.sql("use " + m.getSchema());
    if (logger.isDebugEnabled())
    {
      logger.info("Below are the registered table with hive context: ");
      sparkClient.sqlContext.sql("show tables").show();
    }
    df.write().insertInto(m.getTableName());
    return true;
  }
  catch (Exception e)
  {
    throw new KunderaException("Cannot persist object(s)", e);
  }
}

代码示例来源:origin: cerner/bunsen

/**
 * Writes mapping records to a table. This class ensures the columns and partitions are mapped
 * properly, and is a workaround similar to the problem described <a
 * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
 *
 * @param mappings a dataset of mapping records
 * @param tableName the table to write them to
 */
private static void writeMappingsToTable(Dataset<Mapping> mappings,
  String tableName) {
 // Note the last two columns here must be the partitioned-by columns
 // in order and in lower case for Spark to properly match
 // them to the partitions.
 Dataset<Row> orderedColumnDataset =
   mappings.select("sourceValueSet",
     "targetValueSet",
     "sourceSystem",
     "sourceValue",
     "targetSystem",
     "targetValue",
     "equivalence",
     "conceptmapuri",
     "conceptmapversion");
 orderedColumnDataset
   .write()
   .insertInto(tableName);
}

代码示例来源:origin: com.cerner.bunsen/bunsen-core

/**
 * Writes mapping records to a table. This class ensures the columns and partitions are mapped
 * properly, and is a workaround similar to the problem described <a
 * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
 *
 * @param mappings a dataset of mapping records
 * @param tableName the table to write them to
 */
private static void writeMappingsToTable(Dataset<Mapping> mappings,
  String tableName) {
 // Note the last two columns here must be the partitioned-by columns
 // in order and in lower case for Spark to properly match
 // them to the partitions.
 Dataset<Row> orderedColumnDataset =
   mappings.select("sourceValueSet",
     "targetValueSet",
     "sourceSystem",
     "sourceValue",
     "targetSystem",
     "targetValue",
     "equivalence",
     "conceptmapuri",
     "conceptmapversion");
 orderedColumnDataset
   .write()
   .insertInto(tableName);
}

代码示例来源:origin: com.cerner.bunsen/bunsen-core

/**
 * Writes ancestor records to a table. This class ensures the columns and partitions are mapped
 * properly, and is a workaround similar to the problem described <a
 * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
 *
 * @param ancestors a dataset of ancestor records
 * @param tableName the table to write them to
 */
private static void writeAncestorsToTable(Dataset<Ancestor> ancestors, String tableName) {
 Dataset<Row> orderedColumnDataset = ancestors.select("descendantSystem",
   "descendantValue",
   "ancestorSystem",
   "ancestorValue",
   "uri",
   "version");
 orderedColumnDataset.write()
   .mode(SaveMode.ErrorIfExists)
   .insertInto(tableName);
}

代码示例来源:origin: cerner/bunsen

/**
 * Writes ancestor records to a table. This class ensures the columns and partitions are mapped
 * properly, and is a workaround similar to the problem described <a
 * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
 *
 * @param ancestors a dataset of ancestor records
 * @param tableName the table to write them to
 */
private static void writeAncestorsToTable(Dataset<Ancestor> ancestors, String tableName) {
 Dataset<Row> orderedColumnDataset = ancestors.select("descendantSystem",
   "descendantValue",
   "ancestorSystem",
   "ancestorValue",
   "uri",
   "version");
 orderedColumnDataset.write()
   .mode(SaveMode.ErrorIfExists)
   .insertInto(tableName);
}

代码示例来源:origin: com.cerner.bunsen/bunsen-core

/**
  * Writes value records to a table. This class ensures the columns and partitions are mapped
  * properly, and is a workaround similar to the problem described <a
  * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
  *
  * @param values a dataset of value records
  * @param tableName the table to write them to
  */
 private static void writeValuesToTable(Dataset<Value> values, String tableName) {

  // Note the last two columns here must be the partitioned-by columns in order and in lower case
  // for Spark to properly match them to the partitions
  Dataset<Row> orderColumnDataset = values.select("system",
    "version",
    "value",
    "valueseturi",
    "valuesetversion");

  orderColumnDataset.write()
    .mode(SaveMode.ErrorIfExists)
    .insertInto(tableName);
 }
}

代码示例来源:origin: cerner/bunsen

/**
  * Writes value records to a table. This class ensures the columns and partitions are mapped
  * properly, and is a workaround similar to the problem described <a
  * href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
  *
  * @param values a dataset of value records
  * @param tableName the table to write them to
  */
 private static void writeValuesToTable(Dataset<Value> values, String tableName) {

  // Note the last two columns here must be the partitioned-by columns in order and in lower case
  // for Spark to properly match them to the partitions
  Dataset<Row> orderColumnDataset = values.select("system",
    "version",
    "value",
    "valueseturi",
    "valuesetversion");

  orderColumnDataset.write()
    .mode(SaveMode.ErrorIfExists)
    .insertInto(tableName);
 }
}

代码示例来源:origin: cloudera-labs/envelope

@Override
public void applyBulkMutations(List<Tuple2<MutationType, Dataset<Row>>> planned) {    
 for (Tuple2<MutationType, Dataset<Row>> plan : planned) {
  MutationType mutationType = plan._1();
  Dataset<Row> mutation = (doesAlignColumns) ? alignColumns(plan._2()) : plan._2();
  DataFrameWriter<Row> writer = mutation.write();
  if (partitionColumns != null) {
   writer = writer.partitionBy(partitionColumns);
  }
  if (options != null) {
   writer = writer.options(options);
  }
  switch (mutationType) {
   case INSERT:
    writer = writer.mode(SaveMode.Append);
    break;
   case OVERWRITE:
    writer = writer.mode(SaveMode.Overwrite);
    break;
   default:
    throw new RuntimeException("Hive output does not support mutation type: " + mutationType);
  }
  writer.insertInto(tableName);
 }
}

代码示例来源:origin: com.cerner.bunsen/bunsen-core

.insertInto(conceptMapTable);

代码示例来源:origin: cerner/bunsen

.insertInto(conceptMapTable);

代码示例来源:origin: cerner/bunsen

.insertInto(valueSetTable);

代码示例来源:origin: com.cerner.bunsen/bunsen-core

.insertInto(valueSetTable);

相关文章