org.apache.spark.sql.DataFrame类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.5k)|赞(0)|评价(0)|浏览(305)

本文整理了Java中org.apache.spark.sql.DataFrame类的一些代码示例,展示了DataFrame类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataFrame类的具体详情如下:
包路径:org.apache.spark.sql.DataFrame
类名称:DataFrame

DataFrame介绍

暂无

代码示例

代码示例来源:origin: databricks/learning-spark

SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlCtx = new SQLContext(sc);
DataFrame input = sqlCtx.jsonFile(inputFile);
input.printSchema();
input.registerTempTable("tweets");
DataFrame topTweets = sqlCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10");
Row[] result = topTweets.collect();
for (Row row : result) {
 System.out.println(row.get(0));
JavaRDD<String> topTweetText = topTweets.toJavaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
   return row.getString(0);
JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);
DataFrame happyPeopleSchemaRDD = sqlCtx.applySchema(happyPeopleRDD, HappyPerson.class);
happyPeopleSchemaRDD.registerTempTable("happy_people");
sqlCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() {
  @Override
DataFrame tweetLength = sqlCtx.sql("SELECT stringLengthJava('text') FROM tweets LIMIT 10");
Row[] lengths = tweetLength.collect();
for (Row row : result) {
 System.out.println(row.get(0));

代码示例来源:origin: Impetus/Kundera

@Override
  public void saveDataFrame(DataFrame dataFrame, Class<?> entityClazz, Map<String, Object> properties)
  {
    dataFrame.sqlContext().sql("use " + (String) properties.get(KEYSPACE));
    dataFrame.write().insertInto((String) properties.get(TABLE));
  }
}

代码示例来源:origin: Impetus/Kundera

/**
 * Gets the data frame size.
 * 
 * @param dataFrame
 *            the data frame
 * @return the data frame size
 */
public int getDataFrameSize(DataFrame dataFrame)
{
  long l = dataFrame != null ? dataFrame.count() : 0;
  if (l < Integer.MIN_VALUE || l > Integer.MAX_VALUE)
  {
    logger.error(l + " cannot be cast to int without changing its value.");
    return 0;
  }
  return (int) l;
}

代码示例来源:origin: phuonglh/vn.vitk

String classifierFileName, int numHiddenUnits) {
this.sqlContext = new SQLContext(jsc);
trainingData.cache();
  trainingData.show(false);
trainingData.registerTempTable("dfTable");
Row row = sqlContext.sql("SELECT MAX(label) as maxValue from dfTable").first();
int numLabels = (int)row.getDouble(0);
numLabels++;
DataFrame predictionAndLabel = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator().setMetricName("precision");
if (verbose) {
  System.out.println("N = " + trainingData.count());
  System.out.println("D = " + vocabSize);
  System.out.println("K = " + numLabels);

代码示例来源:origin: phuonglh/vn.vitk

MarkovOrder order = MarkovOrder.values()[(Integer)params.getOrDefault(params.getMarkovOrder())-1];
ContextExtractor contextExtractor = new ContextExtractor(order, Constants.REGEXP_FILE);
JavaRDD<LabeledContext> contexts = contextExtractor.extract(dataset.javaRDD());
DataFrame dataFrame = dataset.sqlContext().createDataFrame(contexts, LabeledContext.class);
JavaRDD<Row> wt = df.select("word", "label").javaRDD();
JavaPairRDD<String, Set<Integer>> tagDictionary = wt.mapToPair(new PairFunction<Row, String, Set<Integer>>(){
  private static final long serialVersionUID = 5865372074294028547L;
df.registerTempTable("dft");
Row row = df.sqlContext().sql("SELECT MAX(label) as maxValue FROM dft").first();
this.numLabels = (int)row.getDouble(0) + 1;
JavaRDD<Row> rows = df.sqlContext().sql("SELECT label, features FROM dft").toJavaRDD();

代码示例来源:origin: KeithSSmith/spark-compaction

public void compact(String inputPath, String outputPath) throws IOException {
  this.setCompressionAndSerializationOptions(inputPath, outputPath);
  this.outputCompressionProperties(this.outputCompression);
  
  // Defining Spark Context with a generic Spark Configuration.
  SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
  
  if (this.outputSerialization.equals(TEXT)) {
    JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
    textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
  } else if (this.outputSerialization.equals(PARQUET)) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
    parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
  } else if (this.outputSerialization.equals(AVRO)) {
    // For this to work the files must end in .avro
    // Another issue is that when using compression the compression codec extension is not being added to the file name.
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
    avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
  } else {
    System.out.println("Did not match any serialization type: text, parquet, or avro.  Recieved: " +
        this.outputSerialization);
  }
}

代码示例来源:origin: phuonglh/vn.vitk

DataFrame df0 = (new SQLContext(jsc)).createDataFrame(jrdd, WhitespaceContext.class);
DataFrame df1 = model.transform(df0);
prediction = jsc.broadcast(df1.select("prediction").collect());
if (df1.count() > 0) {
  output = s.map(new WhitespaceClassificationFunction());

代码示例来源:origin: phuonglh/vn.vitk

new StructField("dependency", DataTypes.StringType, false, Metadata.empty())
});
SQLContext sqlContext = new SQLContext(jsc);
DataFrame df = sqlContext.createDataFrame(rows, schema);
  df.select("dependency").write().text(outputFileName);
else 
  df.repartition(1).write().json(outputFileName);

代码示例来源:origin: phuonglh/vn.vitk

/**
 * Tags a list of sequences and returns a list of tag sequences.
 * @param sentences
 * @return a list of tagged sequences.
 */
public List<String> tag(List<String> sentences) {
  List<Row> rows = new LinkedList<Row>();
  for (String sentence : sentences) {
    rows.add(RowFactory.create(sentence));
  }
  StructType schema = new StructType(new StructField[]{
    new StructField("sentence", DataTypes.StringType, false, Metadata.empty())	
  });
  SQLContext sqlContext = new SQLContext(jsc);
  DataFrame input = sqlContext.createDataFrame(rows, schema);
  if (cmmModel != null) {
    DataFrame output = cmmModel.transform(input).repartition(1);
    return output.javaRDD().map(new RowToStringFunction(1)).collect();
  } else {
    System.err.println("Tagging model is null. You need to create or load a model first.");
    return null;
  }
}

代码示例来源:origin: sectong/SparkToParquet

JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
SQLContext sqlContext = new SQLContext(sc);
  DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
  df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());

代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform

DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true)));
DataFrame df = sqlContext.createDataFrame(rowsRDD, schema);
df.registerTempTable("user_visit_action");  
for(Row _row : df.take(1)) {
  System.out.println(_row);  
    DataTypes.createStructField("sex", DataTypes.StringType, true)));
DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
for(Row _row : df2.take(1)) {
  System.out.println(_row);  
df2.registerTempTable("user_info");

代码示例来源:origin: phuonglh/vn.vitk

/**
 * Creates a n-gram data frame from text lines.
 * @param lines
 * @return a n-gram data frame.
 */
DataFrame createNGramDataFrame(JavaRDD<String> lines) {
  JavaRDD<Row> rows = lines.map(new Function<String, Row>(){
    private static final long serialVersionUID = -4332903997027358601L;
    
    @Override
    public Row call(String line) throws Exception {
      return RowFactory.create(Arrays.asList(line.split("\\s+")));
    }
  });
  StructType schema = new StructType(new StructField[] {
      new StructField("words",
          DataTypes.createArrayType(DataTypes.StringType), false,
          Metadata.empty()) });
  DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema);
  // build a bigram language model
  NGram transformer = new NGram().setInputCol("words")
      .setOutputCol("ngrams").setN(2);
  DataFrame ngramDF = transformer.transform(wordDF);
  ngramDF.show(10, false);
  return ngramDF;
}

代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform

/**
 * 获取指定日期范围内的数据
 * @param sc
 * @param taskParam
 * @return
 */
private static JavaRDD<Row> getActionRDD(SQLContext sc, JSONObject taskParam)
{
  String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
  String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
  String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
  DataFrame df=sc.sql(sql);
  return df.javaRDD();
}

代码示例来源:origin: stackoverflow.com

SQLContext sqlcontext=new SQLContext(context);
DataFrame outDataFrame=sqlcontext.createDataFrame(finalOutPutRDD, WebHttpOutPutVO.class);
Properties prop = new java.util.Properties();
prop.setProperty("database", "Web_Session");
prop.setProperty("user", "user");
prop.setProperty("password", "pwd@123");
prop.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver");
outDataFrame.write().mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:sqlserver://<Host>:1433", "test_table", prop);

代码示例来源:origin: com.cloudera.livy/livy-test-lib

@Override
public List<String> call(JobContext jc) throws Exception {
 InputStream source = getClass().getResourceAsStream("/testweet.json");
 // Save the resource as a file in HDFS (or the local tmp dir when using a local filesystem).
 URI input;
 File local = File.createTempFile("tweets", ".json", jc.getLocalTmpDir());
 Files.copy(source, local.toPath(), StandardCopyOption.REPLACE_EXISTING);
 FileSystem fs = FileSystem.get(jc.sc().sc().hadoopConfiguration());
 if ("file".equals(fs.getUri().getScheme())) {
  input = local.toURI();
 } else {
  String uuid = UUID.randomUUID().toString();
  Path target = new Path("/tmp/" + uuid + "-tweets.json");
  fs.copyFromLocalFile(new Path(local.toURI()), target);
  input = target.toUri();
 }
 SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx();
 sqlctx.jsonFile(input.toString()).registerTempTable("tweets");
 List<String> tweetList = new ArrayList<>();
 Row[] result =
  (Row[])(sqlctx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
   .collect());
 for (Row r : result) {
   tweetList.add(r.toString());
 }
 return tweetList;
}

代码示例来源:origin: Impetus/Kundera

/**
 * Register table for json.
 * 
 * @param tableName
 *            the table name
 * @param dataSourcePath
 *            the data source path
 * @param sqlContext
 *            the sql context
 */
private void registerTableForJson(String tableName, String dataSourcePath, HiveContext sqlContext)
{
  sqlContext.jsonFile(dataSourcePath).registerTempTable(tableName);
}

代码示例来源:origin: phuonglh/vn.vitk

DataFrame df = sqlContext.createDataFrame(jrdd, WhitespaceContext.class);
df.show(false);
System.out.println("N = " + df.count());
df.groupBy("label").count().show();
predictions.show();
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator().setMetricName("precision");
double accuracy = evaluator.evaluate(predictions);

代码示例来源:origin: phuonglh/vn.vitk

@Override
public DataFrame transform(DataFrame dataset) {
  JavaRDD<Row> output = dataset.javaRDD().map(new DecodeFunction());
  StructType schema = new StructType(new StructField[]{
    new StructField("sentence", DataTypes.StringType, false, Metadata.empty()),
    new StructField("prediction", DataTypes.StringType, false, Metadata.empty())
  });
  return dataset.sqlContext().createDataFrame(output, schema);
}

代码示例来源:origin: stackoverflow.com

dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
df1.show();
dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);
DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");
result.show();

代码示例来源:origin: Quetzal-RDF/quetzal

public static void main( String[] args )
 {       
//   	SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("local[2]");
//      	SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("spark://Kavithas-MBP.home:7077");
   SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("spark://kavithas-mbp.watson.ibm.com:7077");
 
   JavaSparkContext sc = new JavaSparkContext(conf);
   
   HiveContext sqlContext = new HiveContext(sc.sc());
   DataFrame urls = sqlContext.read().json("/tmp/urls.json");
   urls.registerTempTable("urls");
   DataFrame temp = sqlContext.sql("select * from urls");
   temp.show();
   
     sqlContext.sql("add jar /tmp/quetzal.jar");
   sqlContext.sql("create temporary function webservice as 'com.ibm.research.rdf.store.utilities.WebServiceGetUDTF'");
   DataFrame drugs = sqlContext.sql("select webservice(\"drug,id,action\", \"url\", \"\", \"GET\", \"xs=http://www.w3.org/2001/XMLSchema\", \"//row\",\"drug\",\"./drug\","
       + " \"<string>\", \"id\", \"./id\",\"<string>\", \"action\", \"./action\", \"<string>\", url) as (drug, drug_typ, id, id_typ, action, action_typ) from urls");
   drugs.show();
   System.out.println("Num rows:" + drugs.count());
 }

相关文章