Sprak streaming问题总结

0

如果是缺少class:

18/12/11 16:24:29 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)

设置conf.setJars,注意这里需要将jar放入到集群能够访问的地方,例如:hdfs

conf.setJars(new String[] {
	"hdfs://master:9000/home/jars/kafka-clients-2.0.0.jar",
	"hdfs://master:9000/home/jars/spark-streaming-kafka-0-10_2.11-2.4.0.jar"
});

然后提示:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
	- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 28, CreateTime = 1544517476606, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d d d))
	- element of array (index: 0)
	- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)

这个是由于ConsumerRecord这个类没有实现序列化接口导致,设置序列化配置:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");