Senario
Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.
Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.
SparkService
Here we use dataSet.rdd.map not dataSet.map.
If we tried to call dataSet.map and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found
This seems because in current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either, Either doesn't implements:
In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.
To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
Add dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.
Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.
Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.
SparkService
Here we use dataSet.rdd.map not dataSet.map.
If we tried to call dataSet.map and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found
This seems because in current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either, Either doesn't implements:
Serializable traitsealed abstract class Either[+A, +B] extends AnyRef
In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable
In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable
In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.
To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
@Service
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def save(csvFile: String): DataImportResult = {
val startDate = ZonedDateTime.now
val spark = SpringContextBridge.getBean(classOf[SparkSession])
val csvInput = spark.read
.option("header", "true")
.option("inferschema", false)
.option("ignoreLeadingWhiteSpace", true)
.option("ignoreTrailingWhiteSpace ", true)
.csv(csvFile)
val newRdd = csvInput.rdd
.map { row =>
try {
val event = new EventModel()
event.setId(row.getString(0))
event.setEventDate(Util.ParseDate(row.getString(1)))
// ...
event.setUpdateDate(new Date())
Right(event)
} catch {
case e: Throwable => Left(row.toSeq.map({ _.toString() }).toString(), e.getMessage)
}
}
newRdd.cache()
val failedRdd = newRdd.map(_.left).filter(_.e.isLeft).map(_.get)
val failedCount = failedRdd.count()
val errorData = failedRdd.take(10)
val successRdd = newRdd.map(_.right).filter(_.e.isRight).map(_.get);
successRdd.cache
successRdd //... other process
.foreachPartition { it =>
{
val repo = SpringContextBridge.getBean(classOf[EventRepo])
it.grouped(1000).foreach { x => repo.saveWithoutCommit(x.toIterable.asJava) }
}
}
SpringContextBridge.getBean(classOf[EventRepo]).hardCommit()
val validDataCount = successRdd.count
val result = DataImportResult(startDate, ZonedDateTime.now, startDate.until(ZonedDateTime.now, ChronoUnit.MILLIS), validDataCount, failedCount, errorData)
logger.info(result.toString())
result
}
case class DataImportResult(
@BeanProperty startTime: ZonedDateTime, @BeanProperty endTime: ZonedDateTime, @BeanProperty timeTakenMill: Long,
@BeanProperty validCount: Long, @BeanProperty failedCount: Long, @BeanProperty errorData: Array[(String, String)])
object MyEncoders {
implicit def eventEncoder: = org.apache.spark.sql.Encoders.bean[classOf[EventModel]]
}
SparkConfiguration@Configuration
class SparkConfiguration {
val logger = LoggerFactory.getLogger(getClass)
@Bean
def sparkSessionConfiguration = {
val spark = SparkSession
.builder().master("spark://master:port")
.appName("My Spark App")
.config("some.config", "some.value")
.getOrCreate()
logger.info(s"Created SparkSession: ${spark}")
spark
}
}
Maven pom.xmlAdd dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.