Spark ETL: Using Either to handle invalid data

Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.

Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.

Here we use not

If we tried to call and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found

This seems because in current latest scala, Either doesn't implements:

Serializable traitsealed abstract class Either[+A, +B] extends AnyRef

In future 2.12, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable

In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.

To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def save(csvFile: String): DataImportResult = {
  val startDate =
  val spark = SpringContextBridge.getBean(classOf[SparkSession])
  val csvInput =
    .option("header", "true")
    .option("inferschema", false)
    .option("ignoreLeadingWhiteSpace", true)
    .option("ignoreTrailingWhiteSpace ", true)

  val newRdd = csvInput.rdd
    .map { row =>
      try {
        val event = new EventModel()
        // ...
        event.setUpdateDate(new Date())
      } catch {
        case e: Throwable => Left({ _.toString() }).toString(), e.getMessage)
  val failedRdd =

  val failedCount = failedRdd.count()
  val errorData = failedRdd.take(10)
  val successRdd =;
  successRdd //... other process
  .foreachPartition { it =>
     val repo = SpringContextBridge.getBean(classOf[EventRepo])
     it.grouped(1000).foreach { x => repo.saveWithoutCommit(x.toIterable.asJava) }


  val validDataCount = successRdd.count
  val result = DataImportResult(startDate,, startDate.until(, ChronoUnit.MILLIS), validDataCount, failedCount, errorData)
case class DataImportResult(
  @BeanProperty startTime: ZonedDateTime, @BeanProperty endTime: ZonedDateTime, @BeanProperty timeTakenMill: Long,
  @BeanProperty validCount: Long, @BeanProperty failedCount: Long, @BeanProperty errorData: Array[(String, String)])

object MyEncoders {
  implicit def eventEncoder: = org.apache.spark.sql.Encoders.bean[classOf[EventModel]]
class SparkConfiguration {
  val logger = LoggerFactory.getLogger(getClass)

  def sparkSessionConfiguration = {
    val spark = SparkSession
      .appName("My Spark App")
      .config("some.config", "some.value")
      .getOrCreate()"Created SparkSession: ${spark}")
Maven pom.xml
Add dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.

InitializationError When Use Spring Test + JUnit

The Problem
When we tried to use Spring Unit Testing framework with JUnit in our project, the simple test failed with error: "initializationError[Runner: JUnit 4]"

I imported the SpringJUnitIntro project to eclipse, it works.
But if I copied all classes to our project, it failed with initializationError. So this should be our project configuration issue.

Google Search, didn't find any thing quite useful.

Then I ran the test in debug mode, it stopped at 
Thread [main] (Class load: SpringJUnit4ClassRunner)
owns: Object  (id=24)
Class.getDeclaredConstructors0(boolean) line: not available [native method]
Class.privateGetDeclaredConstructors(boolean) line: 2671
Class.getConstructor0(Class[], int) line: 3075
Class.getConstructor(Class...) line: 1825
AnnotatedBuilder.buildRunner(Class, Class) line: 29
AnnotatedBuilder.runnerForClass(Class) line: 21
AnnotatedBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 59 //\\
AllDefaultPossibilitiesBuilder.runnerForClass(Class) line: 26
AllDefaultPossibilitiesBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 59
ClassRequest.getRunner() line: 26
FilterRequest.getRunner() line: 31
JUnit4TestLoader.createFilteredTest(Class, String, String[]) line: 77
JUnit4TestLoader.createTest(Class, String, String[], RemoteTestRunner) line: 68
JUnit4TestLoader.loadTests(Class[], String, String[], RemoteTestRunner) line: 43
RemoteTestRunner.runTests(String[], String, TestExecution) line: 444
RemoteTestRunner.runTests(TestExecution) line: 678 line: 382
RemoteTestRunner.main(String[]) line: 192

Then I use F7 to step return, an AnnotatedBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 61, I found the root cause of the issue.

The Throwable e is java.lang.ExceptionInInitializerError,  run "e.printStackTrace();" in Eclipse Display view, then I found the root cause of the issue:
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Caused by: java.lang.IllegalStateException: SpringJUnit4ClassRunner requires JUnit 4.12 or higher.
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.(
... 18 more


static {
  if (!ClassUtils.isPresent("org.junit.internal.Throwables", SpringJUnit4ClassRunner.class.getClassLoader())) {
    throw new IllegalStateException("SpringJUnit4ClassRunner requires JUnit 4.12 or higher.");

  withRulesMethod = ReflectionUtils.findMethod(SpringJUnit4ClassRunner.class, "withRules",
      FrameworkMethod.class, Object.class, Statement.class);
  if (withRulesMethod == null) {
    throw new IllegalStateException("SpringJUnit4ClassRunner requires JUnit 4.12 or higher.");

At org.junit.internal.runners.ErrorReportingRunner.describeCause(Throwable), the inner exception is lost

  private Description describeCause(Throwable child) {
      return Description.createTestDescription(fTestClass,
After upgrade JUnit from 4.11 to 4.12, everything works fine.

Lesson Learned
Try run in debug mode
Use latest version of a library
Version conflict is a common issue in Java.
-- Use different versions of same library
-- Use lower version than supported: 
-- Check least version of X supported
Spring test framework requires JUnit 4.12 or higher. 
-- Check what version of X the library support
-- Check project readme
Example: spark-csv is for sprak 1.x, has been inlined in Apache Spark 2.x
-- Check project dependcy file: pom.xml, build.sbt, build.gradle etc
Example: spark-solr still uses spark 1.5 and use solr 6.x.

Skills for developers:
What's more important is how to debug and find the root cause.

Using Zookeeper to Choose Task Leader

The Scenario
We deploy our application on multiple servers, sometimes old version and new version servers will coexist for several days or weeks.

We run different kinds of cron jobs in our application. For each cron task, we want to make sure it's only ran in one node, and always ran in new version server.

There are different approaches for this problem, but as we already use zookeeper, so it's naturally we use zookeeper to select task leader.

The only difference is that when there are new version server added, the old server who is the task leader will release ownership, so new version server can take the ownership. Also we want to choose a leader for each cron task randomly. So the load will be evenly distributed in all new-version nodes.

The Solution
When application starts, it register itself to zookeeper by creating ephemeral node at path: /myapps/workers.

The WrokerNode contains information such as this node's version, ip address, id: (type:uuid), a list of random ints - used for sort task.
import org.apache.hadoop.util.ComparableVersion;
import lombok.Data;
import lombok.experimental.Accessors;
@Accessors(chain = true)
public class WorkerNode implements Serializable {
    private static final Random random = new Random();
    private static final long serialVersionUID = 1L;
    private String id;
    private String version;
    // so a leader can be randomly chosen
    private List<Integer> randomInts = Lists.newArrayList(random.nextInt(), random.nextInt(), random.nextInt(),
            random.nextInt(), random.nextInt());
    private String ipaddress;

    public static class WorkerNodeComparator implements Comparator<WorkerNode> {
        private final int randomPos;

        public WorkerNodeComparator(final int randomPos) {
            this.randomPos = randomPos;

        public int compare(final WorkerNode o1, final WorkerNode o2) {
            final int pos = randomPos % Math.min(o1.randomInts.size(), o2.randomInts.size());
            return ComparisonChain.start().compare(o1.version, o2.version, new VersionComparator())
                    .compare(o1.randomInts.get(pos), o2.randomInts.get(pos)).compare(o1.ipaddress, o2.ipaddress)
    public static class VersionComparator implements Comparator<String> {
        public int compare(final String v1, final String v2) {
            return new ComparableVersion(v1).compareTo(new ComparableVersion(v2));
For each task, it will try to create ephemeral node at /myapps/works/leaders/taskname, whoever wins will be the task leader.

When the leader detect new nodes added, it will call call, myNode), if the new node's version is bigger or version is same, but with bigger random int, it will release it's ownership.

We use apache curator LeaderSelector recipe.

public class TaskLeader implements Closeable, LeaderSelectorListener {
    private static final Logger logger = LoggerFactory.getLogger(TaskLeader.class);
    private final CuratorFramework client;
    private final LeaderSelector leaderSelector;
    private CountDownLatch closeLatch;    
    private PathChildrenCache workersCache;

    private final String taskName;
    private final String workersPath;
    private final WorkerNode myWorkerNode;

    public TaskLeader(final CuratorFramework client, final String workersPath, final String taskName, final WorkerNode myWorkerNode, final WorkerNodeComparator comparator) {
        this.client = client;
        this.taskName = taskName;
        this.myWorkerNode = myWorkerNode;
        this.workersPath = workersPath;
        this.comparator = comparator;
        this.leaderSelector =
                new LeaderSelector(this.client, ZookeeperService.ZK_TASK_LEADER + "/" + taskName, this);

    public void runForMaster() {"Starting master selection: " + myWorkerNode.getId());

    PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() {
        public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
  "workers event: " + event);
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                final WorkerNode newWorkerNode = (WorkerNode) SerializationUtils.deserialize(event.getData().getData());
      "added new WorkerNode: " + newWorkerNode);
                if (, myWorkerNode) > 0) {
          "Release my leadership, as new WorkerNode with newer version comes up");

    UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
        public void unhandledError(final String message, final Throwable e) {
            logger.error("Unrecoverable error: " + message, e);

    private WorkerNodeComparator comparator;

    public boolean isLeader() {
        return leaderSelector.hasLeadership();

    public Participant getLeader() {
        try {
            return leaderSelector.getLeader();
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "Can't get leader");

    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {"ConnectionState: " + newState);
        switch (newState) {
            case CONNECTED:
            case RECONNECTED:
            case SUSPENDED:
                logger.warn("Session suspended");
            case LOST:
            case READ_ONLY:

    public void takeLeadership(final CuratorFramework client) throws Exception {"I took the leadership for task " + taskName);
        this.workersCache = new PathChildrenCache(this.client, workersPath, true);
        closeLatch = new CountDownLatch(1);

    public void close() {
        try {
  "I gave up my leadership for task " + taskName);
            if (workersCache != null) {
            if (closeLatch != null) {
        } catch (IOException e) {
            throw new RuntimeException("Failed to close", e);
When add a new task, we just needed to create a new task leader as below. Then when we run cron job, we just need check whether taskLeader is the leader: taskLeader.isLeader(), if true, run the job otherwise do nothing.
public class AppConfig {
    public CuratorFramework curatorFramework() {
        final CuratorFramework framework = CuratorFrameworkFactory.builder().connectString(zkHost)
                .connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(new RetryNTimes(3, 500)).build();
        return framework;
    @Bean(name = "xTaskLeader")
    public TaskLeader XTaskLeader() {
        final TaskLeader leader =
                new TaskLeader(curatorFramework(), getWorkerNodeWorkerPath(), "xTaskLeader", myNode(),
                        new WorkerNodeComparator(1));
        return leader;

    public WorkerNode myNode() {
        String ipAddress;
        try {
            ipAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (final UnknownHostException e1) {
            LOGGER.error("failed to get ipaddress", e1);
            ipAddress = "Unknown IP";
        final WorkerNode WorkerNode =
                new WorkerNode().setVersion(applicationProfile.getVersion()).setIpaddress(ipAddress).setId(UUID.randomUUID().toString());

        String myPath;
        try {
            myPath = curatorFramework().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .forPath(getWorkerNodeWorkerPath() + "/" + WorkerNode.getId(), SerializationUtils.serialize(WorkerNode));
  "Create zk path: " + myPath);
        } catch (final Exception e) {
            // TODO change this
            LOGGER.error("Failed to create zk path", e);
        return WorkerNode;

    private String getWorkerNodeWorkerPath() {
        return ZookeeperService.getZKWorkersPath();
ZookeeperService - get all active nodes
public class ZookeeperService {
    private CuratorFramework client;

    private static final String ZK_ROOT_FOLDER = "/myapps";
    private static final String ZK_WORKERS = ZK_ROOT_FOLDER + "/workers";

    public static final String ZK_TASK_LEADER = ZK_ROOT_FOLDER + "/leaders";

    private TaskLeader xTaskLeader;

    public static String getZkWorkersPath() {
        return ZK_WORKERS;

    public Map<String, WorkerNode> getAllWorkerNodes() {
        return getAllWorkerNodes(getZkWorkersPath());

    private Map<String, WorkerNode> getAllWorkerNodes(final String zkWorkersPath) {
        try {
            final Map<String, WorkerNode> map = new HashMap<>();
            final List<String> children = client.getChildren().forPath(zkWorkersPath);

            if (children != null) {
                for (final String child : children) {
                    final String childPath = zkWorkersPath + "/" + child;
                    final WorkerNode WorkerNode =
                            (WorkerNode) SerializationUtils.deserialize(client.getData().forPath(childPath));
                    map.put(WorkerNode.getId(), WorkerNode);

            return map;
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "failed to get workers from zk");

Java Logging Tips/Tricks - 2016

Don't log sensitive information
Include as much dynamic info as possible
Paste logging in PR if applicable
Make sure there is enough logging to help trouble shooting issues in production later

log.warn("hello: {}, {}", "A", "B", new RuntimeException("wtf"));
- will print the exception's stack trace
org.slf4j.helpers.MessageFormatter.arrayFormat(String, Object[])
Throwable throwableCandidate = getThrowableCandidate(argArray);

<configuration debug="true">

Log method and line number
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n

Use %M to print method name in debug environment.

MDC - Mapped Diagnostic Context
%X{X-RequestId}, %X{sessionId}
MDC.put("X-RequestId", requestId)
-- Don't forget to MDC.clear() at the beginning and end of the request.

Use MDC in thread pools
public class MdcThreadPoolExecutor extends ThreadPoolTaskExecutor {
    public MdcThreadPoolExecutor() {
    public void execute(final Runnable command) {
        super.execute(wrap(command, MDC.getCopyOfContextMap()));
    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return new Runnable() {
            public void run() {
                final Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                } else {
                try {
                } finally {
                    if (previous == null) {
                    } else {
Duplicated log - fix: additivity="false"
Only log stats into app-stats.log

- Don't log to app.log and stdout(in dev)

Async logging

<root level="INFO">
  <appender-ref ref="dailyrolling" />
  <appender-ref ref="STDOUT" />
<logger name="common.statistic" level="INFO" additivity="false">
  <appender-ref ref="statsRolling" />
<appender name="statsRolling-async" class="ch.qos.logback.classic.AsyncAppender">
  <appender-ref ref="statsRolling" />
<if condition='property("env").equals("local") || property("env").equals("docker")'>
    <root level="WARN">
      <appender-ref ref="dailyrolling-async" />
    <logger name="" level="INFO">
      <appender-ref ref="dailyrolling-async" />
    <logger name="" level="INFO">
      <appender-ref ref="statsRolling-async" />
Parameterized Logging
logger.error("xx {} yy {} xx {}", x, y, z);



Lazy Logging
logger.trace("Some long-running operation returned {}", () -> expensiveOperation());

Logging level configuration
Logging Cassandra Query CQL
Set log level of com.datastax.driver.core.RequestHandler = TRACE
Programmatic configuration of slf4j/logback

splunk - logging best practice
Use clear key-value pairs

To clear log without delete the file
cp /dev/null log
> log


adsense (5) Algorithm (69) Algorithm Series (35) Android (4) ANT (6) bat (8) Become a Better You (4) Big Data (7) Blogger (14) Bugs (4) Cache (5) Chrome (17) Code Example (29) Code Quality (6) Coding Skills (5) Concurrency (4) Database (7) Debug (16) Design (5) Dev Tips (62) Eclipse (32) GAE (4) Git (5) Good Programming Practices (4) Google (27) Guava (7) How to (9) Http Client (8) IDE (6) Interview (88) J2EE (13) J2SE (49) Jackson (4) Java (177) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (22) Lucene-Solr (112) Mac (10) Maven (8) Memory Usage (4) Network (9) Nutch2 (18) OpenNLP (4) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Review (4) Scala (6) Security (9) Soft Skills (38) Spark (4) Spring (22) System Design (11) Testing (6) Text Mining (14) Tips (12) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)