Issue
I had a task for creating a POC for Apache Spark w/Springboot. I created a controller for getting my data through API:
@PostMapping(path = "/memberData")
public Map<String, Profile> processData(@RequestBody Member member) {
logger.info("Processing data for member List: {}", member);
return service.processData(member);
}
It should return a Map having profile.getName()
as its Key and Profile
object as its value, both of which are part of Member
object. I then implemented the service for the controller:
@Autowired
JavaSparkContext sc;
...
public Map<String, Profile> processData(Member member) {
logger.info("Processing data for member List: {}", member);
JavaRDD<Profile> profile = sc.parallelize(Collections.singletonList(member.getProfile()), 3);
return profile.mapToPair(p -> new Tuple2<>(p.getName(), p)).collectAsMap();
}
The JavaSparkContext was declared in a config class in a different sub-package:
@Bean
public SparkConf conf() {
return new SparkConf().setAppName(appName).setMaster(masterUri);
}
@Bean
public JavaSparkContext sc() {
return new JavaSparkContext(conf());
}
The application builds perfectly but when I try to hit the url with my data, i.e. a Member object:
{
"id": "62d121c30cc723640fd03cc2",
"email": "[email protected]",
"username": "allison88",
"profile": {
"name": "Allison Mcleod",
"company": "Ziore",
"dob": "1988-06-19",
"address": "71 Glendale Court, Hobucken, Pennsylvania",
"about": "Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est."
},
"roles": [
"guest",
"owner"
],
"createdAt": "2020-07-20T20:40:51.840Z",
"updatedAt": "2020-07-21T20:40:51.840Z"
}
I am getting the following error that says something about the object being not serializable:
2022-07-18 09:52:22.887 INFO 13108 --- [nio-8080-exec-2] com.leofierus.service.WordCountService : Processing data for member List: Member(id=62d121c30cc723640fd03cc2, [email protected], username=allison88, profile=Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.), roles=[guest, owner], createdAt=2020-07-21 02:10:51.84, updatedAt=2020-07-22 02:10:51.84)
2022-07-18 09:52:23.416 INFO 13108 --- [nio-8080-exec-2] org.apache.spark.SparkContext : Starting job: count at WordCountService.java:33
2022-07-18 09:52:23.455 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Got job 0 (count at WordCountService.java:33) with 3 output partitions
2022-07-18 09:52:23.455 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Final stage: ResultStage 0 (count at WordCountService.java:33)
2022-07-18 09:52:23.455 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Parents of final stage: List()
2022-07-18 09:52:23.463 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Missing parents: List()
2022-07-18 09:52:23.471 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at WordCountService.java:32), which has no missing parents
2022-07-18 09:52:23.631 INFO 13108 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore : Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 2.1 GB)
2022-07-18 09:52:23.730 INFO 13108 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore : Block broadcast_0_piece0 stored as bytes in memory (estimated size 1342.0 B, free 2.1 GB)
2022-07-18 09:52:23.730 INFO 13108 --- [er-event-loop-0] o.apache.spark.storage.BlockManagerInfo : Added broadcast_0_piece0 in memory on MalharP:54693 (size: 1342.0 B, free: 2.1 GB)
2022-07-18 09:52:23.730 INFO 13108 --- [uler-event-loop] org.apache.spark.SparkContext : Created broadcast 0 from broadcast at DAGScheduler.scala:1161
2022-07-18 09:52:23.762 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : Submitting 3 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] at parallelize at WordCountService.java:32) (first 15 tasks are for partitions Vector(0, 1, 2))
2022-07-18 09:52:23.770 INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl : Adding task set 0.0 with 3 tasks
2022-07-18 09:52:24.098 INFO 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager : Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7347 bytes)
2022-07-18 09:52:24.146 INFO 13108 --- [rker for task 0] org.apache.spark.executor.Executor : Running task 0.0 in stage 0.0 (TID 0)
2022-07-18 09:52:24.274 INFO 13108 --- [rker for task 0] org.apache.spark.executor.Executor : Finished task 0.0 in stage 0.0 (TID 0). 709 bytes result sent to driver
2022-07-18 09:52:24.282 INFO 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager : Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7347 bytes)
2022-07-18 09:52:24.282 INFO 13108 --- [rker for task 1] org.apache.spark.executor.Executor : Running task 1.0 in stage 0.0 (TID 1)
2022-07-18 09:52:24.299 INFO 13108 --- [rker for task 1] org.apache.spark.executor.Executor : Finished task 1.0 in stage 0.0 (TID 1). 623 bytes result sent to driver
2022-07-18 09:52:24.306 ERROR 13108 --- [er-event-loop-1] org.apache.spark.util.Utils : Exception encountered
java.io.NotSerializableException: com.leofierus.model.Profile
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:442) ~[na:na]
at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$writeObject$1(ParallelCollectionRDD.scala:59) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) ~[na:na]
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-07-18 09:52:24.314 ERROR 13108 --- [er-event-loop-1] org.apache.spark.util.Utils : Exception encountered
java.io.NotSerializableException: com.leofierus.model.Profile
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) ~[na:na]
at java.base/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:442) ~[na:na]
at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$writeObject$1(ParallelCollectionRDD.scala:59) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[na:na]
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) ~[na:na]
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-07-18 09:52:24.314 ERROR 13108 --- [er-event-loop-1] o.apache.spark.scheduler.TaskSetManager : Failed to serialize task 2, not attempting to retry it.
java.io.NotSerializableException: com.leofierus.model.Profile
Serialization stack:
- object not serializable (class: com.leofierus.model.Profile, value: Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.))
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(Profile(name=Allison Mcleod, company=Ziore, dob=1988-06-19, address=71 Glendale Court, Hobucken, Pennsylvania, about=Duis dolor ex ipsum id eu eiusmod pariatur culpa anim commodo cupidatat elit aliquip dolore. Nulla laboris nostrud ea eiusmod exercitation est.)))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@693)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 2))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:486) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.Option.map(Option.scala:146) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:315) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15(TaskSchedulerImpl.scala:413) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$15$adapted(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) ~[scala-library-2.12.7.jar:na]
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13(TaskSchedulerImpl.scala:409) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$13$adapted(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[scala-library-2.12.7.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.12.7.jar:na]
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:70) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:117) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:102) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-07-18 09:52:24.322 INFO 13108 --- [er-event-loop-1] o.a.spark.scheduler.TaskSchedulerImpl : Removed TaskSet 0.0, whose tasks have all completed, from pool
2022-07-18 09:52:24.322 ERROR 13108 --- [er-event-loop-1] o.a.spark.scheduler.TaskSchedulerImpl : Resource offer failed, task set TaskSet_0.0 was not serializable
2022-07-18 09:52:24.322 INFO 13108 --- [result-getter-1] o.apache.spark.scheduler.TaskSetManager : Finished task 1.0 in stage 0.0 (TID 1) in 40 ms on localhost (executor driver) (1/3)
2022-07-18 09:52:24.330 INFO 13108 --- [result-getter-1] o.a.spark.scheduler.TaskSchedulerImpl : Removed TaskSet 0.0, whose tasks have all completed, from pool
2022-07-18 09:52:24.330 INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl : Cancelling stage 0
2022-07-18 09:52:24.330 INFO 13108 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl : Killing all running tasks in stage 0: Stage cancelled
2022-07-18 09:52:24.330 INFO 13108 --- [result-getter-0] o.apache.spark.scheduler.TaskSetManager : Finished task 0.0 in stage 0.0 (TID 0) in 469 ms on localhost (executor driver) (2/3)
2022-07-18 09:52:24.330 INFO 13108 --- [result-getter-0] o.a.spark.scheduler.TaskSchedulerImpl : Removed TaskSet 0.0, whose tasks have all completed, from pool
2022-07-18 09:52:24.330 INFO 13108 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler : ResultStage 0 (count at WordCountService.java:33) failed in 0.835 s due to Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: com.leofierus.model.Profile
Can someone help me with it? I am very new to Apache Spark and trying to build a POC to see if it fits with the current project requirements that would need thousands of such data objects to be parsed at once.
Solution
Please add implements Serializable
in the classes that you are serializing.
Here, Profile is a sub-object for the Member, so you need to Serialize both classes to make your code executable. I am illustrating this for the member class below:
public Member implements Serializable{
...
}
This will allow you to serialize your object's state to a byte stream so that the byte stream can be reverted back into a copy of the object.
Answered By - Nisarg29
Answer Checked By - Marilyn (JavaFixing Volunteer)