I am a beginner in spark Scala . Given rdd1 and rdd2
val rdd1 = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 6),
| ("key3", 5)))
val rdd2 = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 6),
("key4", 7)))
I need to identify which value was deleted, which value was inserted and which value was modified I need a to add a column indicating this statue. I am expecting a result of the form
(key1,1,5,"modif")
(key2,6,6, " ")
(key3,5, null, "del")
(Key4,null,"insert")
here is my code
val grouped = rdd1.cogroup(rdd2)
def f(x: (String, (Iterable[Int], Iterable[Int]))) = {
if(x._2._1 == null && x._2._2 != null) x+ "insert"
if(x._2._1 != null && x._2._1== null) x+ "suppression"
else x + "modif"
}
But It seems not getting inside the two first "if conditions", the code returns a new column containing "modif" at each component.
Thank you for your precious help!!!