我想在spark job中使用kryo序列化。
public class SerializeTest {
public static class Toto implements Serializable {
private static final long serialVersionUID = 6369241181075151871L;
private String a;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
}
private static final PairFunction<Toto, Toto, Integer> WRITABLE_CONVERTOR = new PairFunction<Toto, Toto, Integer>() {
private static final long serialVersionUID = -7119334882912691587L;
@Override
public Tuple2<Toto, Integer> call(Toto input) throws Exception {
return new Tuple2<Toto, Integer>(input, 1);
}
};
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SerializeTest");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[]{Toto[].class});
JavaSparkContext context = new JavaSparkContext(conf);
List<Toto> list = new ArrayList<Toto>();
list.add(new Toto());
JavaRDD<Toto> cursor = context.parallelize(list, list.size());
JavaPairRDD<Toto, Integer> writable = cursor.mapToPair(WRITABLE_CONVERTOR);
writable.saveAsHadoopFile(args[0], Toto.class, Integer.class, SequenceFileOutputFormat.class);
context.close();
}
}
但我有个错误:
java.io.ioexception:找不到键类的序列化程序:“com.test.serializetest.toto”。如果使用自定义序列化,请确保配置“io.serializations”配置正确。在org.apache.hadoop.io.sequencefile$writer.init(sequencefile。java:1179)在org.apache.hadoop.io.sequencefile$writer。java:1094)在org.apache.hadoop.io.sequencefile.createwriter(sequencefile。java:273)在org.apache.hadoop.io.sequencefile.createwriter(sequencefile。java:530)在org.apache.hadoop.mapred.sequencefileoutputformat.getrecordwriter(sequencefileoutputformat)。java:63)在org.apache.spark.sparkhadoopwriter.open(sparkhadoopwriter。scala:90)在org.apache.spark.rdd.pairddfunctions$$anonfun$13.apply(pairddfunctions。scala:1068)在org.apache.spark.rdd.pairddfunctions$$anonfun$13.apply(pairddfunctions。scala:1059)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:61)在org.apache.spark.scheduler.task.run(task。scala:64)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:203)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)15/09/21 17:49:14警告tasksetmanager:阶段0.0(tid 0,localhost)中的任务0.0丢失:java.io.ioexception:找不到密钥类的序列化程序:“com.test.serializetest.toto”。如果使用自定义序列化,请确保配置“io.serializations”配置正确。在org.apache.hadoop.io.sequencefile$writer.init(sequencefile。java:1179)在org.apache.hadoop.io.sequencefile$writer。java:1094)在org.apache.hadoop.io.sequencefile.createwriter(sequencefile。java:273)在org.apache.hadoop.io.sequencefile.createwriter(sequencefile。java:530)在org.apache.hadoop.mapred.sequencefileoutputformat.getrecordwriter(sequencefileoutputformat)。java:63)在org.apache.spark.sparkhadoopwriter.open(sparkhadoopwriter。scala:90)在org.apache.spark.rdd.pairddfunctions$$anonfun$13.apply(pairddfunctions。scala:1068)在org.apache.spark.rdd.pairddfunctions$$anonfun$13.apply(pairddfunctions。scala:1059)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:61)在org.apache.spark.scheduler.task.run(task。scala:64)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:203)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)
谢谢。
1条答案
按热度按时间cczfrluj1#
这个错误与spark和kryo无关。
当使用hadoop输出格式时,您需要确保您的键和值是
Writable
. hadoop默认情况下不使用java序列化(您也不想使用它,因为它非常无效)你可以检查一下你的房间
io.serializations
属性,您将看到所用序列化程序的列表,包括org.apache.hadoop.io.serializer.WritableSerialization
要解决这个问题Toto
类必须实现Writable
. 同样的问题也存在于Integer
,使用IntWritable
.