Spark ETL: Using Either to handle invalid data

Senario
Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.

Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.

SparkService
Here we use dataSet.rdd.map not dataSet.map.

If we tried to call dataSet.map and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found

This seems because in current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either, Either doesn't implements:

Serializable traitsealed abstract class Either[+A, +B] extends AnyRef

In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable

In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.

To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
@Service
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def save(csvFile: String): DataImportResult = {
  val startDate = ZonedDateTime.now
  val spark = SpringContextBridge.getBean(classOf[SparkSession])
  val csvInput = spark.read
    .option("header", "true")
    .option("inferschema", false)
    .option("ignoreLeadingWhiteSpace", true)
    .option("ignoreTrailingWhiteSpace ", true)
    .csv(csvFile)

  val newRdd = csvInput.rdd
    .map { row =>
      try {
        val event = new EventModel()
        event.setId(row.getString(0))
        event.setEventDate(Util.ParseDate(row.getString(1)))
        // ...
        event.setUpdateDate(new Date())
        Right(event)
      } catch {
        case e: Throwable => Left(row.toSeq.map({ _.toString() }).toString(), e.getMessage)
      }
    }
  newRdd.cache()
  val failedRdd = newRdd.map(_.left).filter(_.e.isLeft).map(_.get)

  val failedCount = failedRdd.count()
  val errorData = failedRdd.take(10)
  val successRdd = newRdd.map(_.right).filter(_.e.isRight).map(_.get);
  successRdd.cache
  successRdd //... other process
  .foreachPartition { it =>
   {
     val repo = SpringContextBridge.getBean(classOf[EventRepo])
     it.grouped(1000).foreach { x => repo.saveWithoutCommit(x.toIterable.asJava) }
   }
  }

  SpringContextBridge.getBean(classOf[EventRepo]).hardCommit()

  val validDataCount = successRdd.count
  val result = DataImportResult(startDate, ZonedDateTime.now, startDate.until(ZonedDateTime.now, ChronoUnit.MILLIS), validDataCount, failedCount, errorData)
  logger.info(result.toString())
  result
}
case class DataImportResult(
  @BeanProperty startTime: ZonedDateTime, @BeanProperty endTime: ZonedDateTime, @BeanProperty timeTakenMill: Long,
  @BeanProperty validCount: Long, @BeanProperty failedCount: Long, @BeanProperty errorData: Array[(String, String)])

object MyEncoders {
  implicit def eventEncoder: = org.apache.spark.sql.Encoders.bean[classOf[EventModel]]
}
SparkConfiguration
@Configuration
class SparkConfiguration {
  val logger = LoggerFactory.getLogger(getClass)

  @Bean
  def sparkSessionConfiguration = {
    val spark = SparkSession
      .builder().master("spark://master:port")
      .appName("My Spark App")
      .config("some.config", "some.value")
      .getOrCreate()
    logger.info(s"Created SparkSession: ${spark}")
    spark
  }
}
Maven pom.xml
Add dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.
Post a Comment

Labels

Java (159) Lucene-Solr (110) All (58) Interview (58) J2SE (53) Algorithm (43) Soft Skills (36) Eclipse (34) Code Example (31) Linux (24) JavaScript (23) Spring (22) Windows (22) Web Development (20) Nutch2 (18) Tools (18) Bugs (17) Debug (15) Defects (14) Text Mining (14) J2EE (13) Network (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) UIMA (9) html (9) Dynamic Languages (8) Http Client (8) Maven (8) Security (8) Trouble Shooting (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) Problem Solving (7) ANT (6) Coding Skills (6) Database (6) Scala (6) Shell (6) css (6) Algorithm Series (5) Cache (5) IDE (5) Lesson Learned (5) Programmer Skills (5) System Design (5) Tips (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Python (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Troubleshooting (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts