Spark ETL: Using Either to handle invalid data


Senario
Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.

Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.

SparkService
Here we use dataSet.rdd.map not dataSet.map.

If we tried to call dataSet.map and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found

This seems because in current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either, Either doesn't implements:

Serializable traitsealed abstract class Either[+A, +B] extends AnyRef

In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable

In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.

To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
@Service
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def save(csvFile: String): DataImportResult = {
  val startDate = ZonedDateTime.now
  val spark = SpringContextBridge.getBean(classOf[SparkSession])
  val csvInput = spark.read
    .option("header", "true")
    .option("inferschema", false)
    .option("ignoreLeadingWhiteSpace", true)
    .option("ignoreTrailingWhiteSpace ", true)
    .csv(csvFile)

  val newRdd = csvInput.rdd
    .map { row =>
      try {
        val event = new EventModel()
        event.setId(row.getString(0))
        event.setEventDate(Util.ParseDate(row.getString(1)))
        // ...
        event.setUpdateDate(new Date())
        Right(event)
      } catch {
        case e: Throwable => Left(row.toSeq.map({ _.toString() }).toString(), e.getMessage)
      }
    }
  newRdd.cache()
  val failedRdd = newRdd.map(_.left).filter(_.e.isLeft).map(_.get)

  val failedCount = failedRdd.count()
  val errorData = failedRdd.take(10)
  val successRdd = newRdd.map(_.right).filter(_.e.isRight).map(_.get);
  successRdd.cache
  successRdd //... other process
  .foreachPartition { it =>
   {
     val repo = SpringContextBridge.getBean(classOf[EventRepo])
     it.grouped(1000).foreach { x => repo.saveWithoutCommit(x.toIterable.asJava) }
   }
  }

  SpringContextBridge.getBean(classOf[EventRepo]).hardCommit()

  val validDataCount = successRdd.count
  val result = DataImportResult(startDate, ZonedDateTime.now, startDate.until(ZonedDateTime.now, ChronoUnit.MILLIS), validDataCount, failedCount, errorData)
  logger.info(result.toString())
  result
}
case class DataImportResult(
  @BeanProperty startTime: ZonedDateTime, @BeanProperty endTime: ZonedDateTime, @BeanProperty timeTakenMill: Long,
  @BeanProperty validCount: Long, @BeanProperty failedCount: Long, @BeanProperty errorData: Array[(String, String)])

object MyEncoders {
  implicit def eventEncoder: = org.apache.spark.sql.Encoders.bean[classOf[EventModel]]
}
SparkConfiguration
@Configuration
class SparkConfiguration {
  val logger = LoggerFactory.getLogger(getClass)

  @Bean
  def sparkSessionConfiguration = {
    val spark = SparkSession
      .builder().master("spark://master:port")
      .appName("My Spark App")
      .config("some.config", "some.value")
      .getOrCreate()
    logger.info(s"Created SparkSession: ${spark}")
    spark
  }
}
Maven pom.xml
Add dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)