Spark ETL: Using Either to handle invalid data


Senario
Usually when we use spark to import data from external system, we want to report to client how many rows we have imported successfully, how many invalid rows in the origin input.

Solution: Using Scala Either
When we use map to parse the data, we can use Scala Either: left will contain the invalid data and its error message, right will contain valid data which will be processed and stored later.

SparkService
Here we use dataSet.rdd.map not dataSet.map.

If we tried to call dataSet.map and use Either, it will fail with exception:
java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[(String, String),xx.EventModel] found

This seems because in current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either, Either doesn't implements:

Serializable traitsealed abstract class Either[+A, +B] extends AnyRef

In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:
sealed abstract class Either[+A, +B] extends Product with Serializable

In order to use java 8 types such as Optional, java.time in rest api, use jackson-datatype-jdk8 and jackson-datatype-jsr310.

To use scala case class with Jackson: add @BeanProperty to the field.
Check jackson related tips.
Please check Using Lombok to Simplify the Code for SpringContextBridge implementation.
@Service
@transient lazy val logger = LoggerFactory.getLogger(getClass)
def save(csvFile: String): DataImportResult = {
  val startDate = ZonedDateTime.now
  val spark = SpringContextBridge.getBean(classOf[SparkSession])
  val csvInput = spark.read
    .option("header", "true")
    .option("inferschema", false)
    .option("ignoreLeadingWhiteSpace", true)
    .option("ignoreTrailingWhiteSpace ", true)
    .csv(csvFile)

  val newRdd = csvInput.rdd
    .map { row =>
      try {
        val event = new EventModel()
        event.setId(row.getString(0))
        event.setEventDate(Util.ParseDate(row.getString(1)))
        // ...
        event.setUpdateDate(new Date())
        Right(event)
      } catch {
        case e: Throwable => Left(row.toSeq.map({ _.toString() }).toString(), e.getMessage)
      }
    }
  newRdd.cache()
  val failedRdd = newRdd.map(_.left).filter(_.e.isLeft).map(_.get)

  val failedCount = failedRdd.count()
  val errorData = failedRdd.take(10)
  val successRdd = newRdd.map(_.right).filter(_.e.isRight).map(_.get);
  successRdd.cache
  successRdd //... other process
  .foreachPartition { it =>
   {
     val repo = SpringContextBridge.getBean(classOf[EventRepo])
     it.grouped(1000).foreach { x => repo.saveWithoutCommit(x.toIterable.asJava) }
   }
  }

  SpringContextBridge.getBean(classOf[EventRepo]).hardCommit()

  val validDataCount = successRdd.count
  val result = DataImportResult(startDate, ZonedDateTime.now, startDate.until(ZonedDateTime.now, ChronoUnit.MILLIS), validDataCount, failedCount, errorData)
  logger.info(result.toString())
  result
}
case class DataImportResult(
  @BeanProperty startTime: ZonedDateTime, @BeanProperty endTime: ZonedDateTime, @BeanProperty timeTakenMill: Long,
  @BeanProperty validCount: Long, @BeanProperty failedCount: Long, @BeanProperty errorData: Array[(String, String)])

object MyEncoders {
  implicit def eventEncoder: = org.apache.spark.sql.Encoders.bean[classOf[EventModel]]
}
SparkConfiguration
@Configuration
class SparkConfiguration {
  val logger = LoggerFactory.getLogger(getClass)

  @Bean
  def sparkSessionConfiguration = {
    val spark = SparkSession
      .builder().master("spark://master:port")
      .appName("My Spark App")
      .config("some.config", "some.value")
      .getOrCreate()
    logger.info(s"Created SparkSession: ${spark}")
    spark
  }
}
Maven pom.xml
Add dependencies: spark-core_2.11, spark-sql_2.11 and org.psnively:spring_scala_3-2-14_2.11
To use mixed scala and java, add net.alchim31.maven:scala-maven-plugin.

InitializationError When Use Spring Test + JUnit


The Problem
When we tried to use Spring Unit Testing framework with JUnit in our project, the simple test failed with error: "initializationError[Runner: JUnit 4]"

I imported the SpringJUnitIntro project to eclipse, it works.
But if I copied all classes to our project, it failed with initializationError. So this should be our project configuration issue.

Google Search, didn't find any thing quite useful.

Then I ran the test in debug mode, it stopped at 
Thread [main] (Class load: SpringJUnit4ClassRunner)
owns: Object  (id=24)
Class.getDeclaredConstructors0(boolean) line: not available [native method]
Class.privateGetDeclaredConstructors(boolean) line: 2671
Class.getConstructor0(Class[], int) line: 3075
Class.getConstructor(Class...) line: 1825
AnnotatedBuilder.buildRunner(Class, Class) line: 29
AnnotatedBuilder.runnerForClass(Class) line: 21
AnnotatedBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 59 //\\
AllDefaultPossibilitiesBuilder.runnerForClass(Class) line: 26
AllDefaultPossibilitiesBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 59
ClassRequest.getRunner() line: 26
FilterRequest.getRunner() line: 31
JUnit4TestLoader.createFilteredTest(Class, String, String[]) line: 77
JUnit4TestLoader.createTest(Class, String, String[], RemoteTestRunner) line: 68
JUnit4TestLoader.loadTests(Class[], String, String[], RemoteTestRunner) line: 43
RemoteTestRunner.runTests(String[], String, TestExecution) line: 444
RemoteTestRunner.runTests(TestExecution) line: 678
RemoteTestRunner.run() line: 382
RemoteTestRunner.main(String[]) line: 192

Then I use F7 to step return, an AnnotatedBuilder(RunnerBuilder).safeRunnerForClass(Class) line: 61, I found the root cause of the issue.

The Throwable e is java.lang.ExceptionInInitializerError,  run "e.printStackTrace();" in Eclipse Display view, then I found the root cause of the issue:
java.lang.ExceptionInInitializerError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  ...
Caused by: java.lang.IllegalStateException: SpringJUnit4ClassRunner requires JUnit 4.12 or higher.
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.(SpringJUnit4ClassRunner.java:102)
... 18 more

org.springframework.test.context.junit4.SpringJUnit4ClassRunner:

static {
  if (!ClassUtils.isPresent("org.junit.internal.Throwables", SpringJUnit4ClassRunner.class.getClassLoader())) {
    throw new IllegalStateException("SpringJUnit4ClassRunner requires JUnit 4.12 or higher.");
  }

  withRulesMethod = ReflectionUtils.findMethod(SpringJUnit4ClassRunner.class, "withRules",
      FrameworkMethod.class, Object.class, Statement.class);
  if (withRulesMethod == null) {
    throw new IllegalStateException("SpringJUnit4ClassRunner requires JUnit 4.12 or higher.");
  }
  ReflectionUtils.makeAccessible(withRulesMethod);
}

At org.junit.internal.runners.ErrorReportingRunner.describeCause(Throwable), the inner exception is lost

  private Description describeCause(Throwable child) {
      return Description.createTestDescription(fTestClass,
              "initializationError");
  }
After upgrade JUnit from 4.11 to 4.12, everything works fine.

Lesson Learned
Try run in debug mode
Use latest version of a library
Version conflict is a common issue in Java.
-- Use different versions of same library
-- Use lower version than supported: 
-- Check least version of X supported
Spring test framework requires JUnit 4.12 or higher. 
-- Check what version of X the library support
-- Check project readme
Example: spark-csv is for sprak 1.x, has been inlined in Apache Spark 2.x
-- Check project dependcy file: pom.xml, build.sbt, build.gradle etc
Example: spark-solr still uses spark 1.5 and use solr 6.x.

Skills for developers:
What's more important is how to debug and find the root cause.

Using Zookeeper to Choose Task Leader


The Scenario
We deploy our application on multiple servers, sometimes old version and new version servers will coexist for several days or weeks.

We run different kinds of cron jobs in our application. For each cron task, we want to make sure it's only ran in one node, and always ran in new version server.

There are different approaches for this problem, but as we already use zookeeper, so it's naturally we use zookeeper to select task leader.

The only difference is that when there are new version server added, the old server who is the task leader will release ownership, so new version server can take the ownership. Also we want to choose a leader for each cron task randomly. So the load will be evenly distributed in all new-version nodes.

The Solution
When application starts, it register itself to zookeeper by creating ephemeral node at path: /myapps/workers.

WorkerNode
The WrokerNode contains information such as this node's version, ip address, id: (type:uuid), a list of random ints - used for sort task.
import org.apache.hadoop.util.ComparableVersion;
import lombok.Data;
import lombok.experimental.Accessors;
@Accessors(chain = true)
@Data
public class WorkerNode implements Serializable {
    private static final Random random = new Random();
    private static final long serialVersionUID = 1L;
    @NotNull
    private String id;
    @NotNull
    private String version;
    // so a leader can be randomly chosen
    @NotNull
    private List<Integer> randomInts = Lists.newArrayList(random.nextInt(), random.nextInt(), random.nextInt(),
            random.nextInt(), random.nextInt());
    @NotNull
    private String ipaddress;

    public static class WorkerNodeComparator implements Comparator<WorkerNode> {
        private final int randomPos;

        public WorkerNodeComparator(final int randomPos) {
            this.randomPos = randomPos;
        }

        @Override
        public int compare(final WorkerNode o1, final WorkerNode o2) {
            final int pos = randomPos % Math.min(o1.randomInts.size(), o2.randomInts.size());
            return ComparisonChain.start().compare(o1.version, o2.version, new VersionComparator())
                    .compare(o1.randomInts.get(pos), o2.randomInts.get(pos)).compare(o1.ipaddress, o2.ipaddress)
                    .compare(o1.id, o2.id).result();
        }
    }
    public static class VersionComparator implements Comparator<String> {
        @Override
        public int compare(final String v1, final String v2) {
            return new ComparableVersion(v1).compareTo(new ComparableVersion(v2));
        }
    }
}
TaskLeader
For each task, it will try to create ephemeral node at /myapps/works/leaders/taskname, whoever wins will be the task leader.

When the leader detect new nodes added, it will call call comparator.compare(newNode, myNode), if the new node's version is bigger or version is same, but with bigger random int, it will release it's ownership.

We use apache curator LeaderSelector recipe.

public class TaskLeader implements Closeable, LeaderSelectorListener {
    private static final Logger logger = LoggerFactory.getLogger(TaskLeader.class);
    private final CuratorFramework client;
    private final LeaderSelector leaderSelector;
    private CountDownLatch closeLatch;    
    private PathChildrenCache workersCache;

    private final String taskName;
    private final String workersPath;
    private final WorkerNode myWorkerNode;

    public TaskLeader(final CuratorFramework client, final String workersPath, final String taskName, final WorkerNode myWorkerNode, final WorkerNodeComparator comparator) {
        this.client = client;
        this.taskName = taskName;
        this.myWorkerNode = myWorkerNode;
        this.workersPath = workersPath;
        this.comparator = comparator;
        this.leaderSelector =
                new LeaderSelector(this.client, ZookeeperService.ZK_TASK_LEADER + "/" + taskName, this);
    }

    public void runForMaster() {
        logger.info("Starting master selection: " + myWorkerNode.getId());
        client.getUnhandledErrorListenable().addListener(errorsListener);
        leaderSelector.setId(myWorkerNode.getId());
        leaderSelector.autoRequeue();
        leaderSelector.start();
    }

    PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() {
        @Override
        public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
            logger.info("workers event: " + event);
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                final WorkerNode newWorkerNode = (WorkerNode) SerializationUtils.deserialize(event.getData().getData());
                logger.info("added new WorkerNode: " + newWorkerNode);
                if (comparator.compare(newWorkerNode, myWorkerNode) > 0) {
                    logger.info("Release my leadership, as new WorkerNode with newer version comes up");
                    close();
                }
            }
        }
    };

    UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
        @Override
        public void unhandledError(final String message, final Throwable e) {
            logger.error("Unrecoverable error: " + message, e);
            close();
        }
    };

    private WorkerNodeComparator comparator;

    public boolean isLeader() {
        return leaderSelector.hasLeadership();
    }

    public Participant getLeader() {
        try {
            return leaderSelector.getLeader();
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "Can't get leader");
        }
    }


    @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        logger.info("ConnectionState: " + newState);
        switch (newState) {
            case CONNECTED:
                break;
            case RECONNECTED:
                break;
            case SUSPENDED:
                logger.warn("Session suspended");
                break;
            case LOST:
                close();
                break;
            case READ_ONLY:
                break;
        }
    }

    @Override
    public void takeLeadership(final CuratorFramework client) throws Exception {
        logger.info("I took the leadership for task " + taskName);
        this.workersCache = new PathChildrenCache(this.client, workersPath, true);
        workersCache.getListenable().addListener(workersCacheListener);
        workersCache.start();
        closeLatch = new CountDownLatch(1);
        closeLatch.await();
    }

    public void close() {
        try {
            logger.info("I gave up my leadership for task " + taskName);
            if (workersCache != null) {
                workersCache.getListenable().removeListener(workersCacheListener);
                workersCache.close();
            }
            if (closeLatch != null) {
                closeLatch.countDown();
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to close", e);
        }
    }
}
Configuration
When add a new task, we just needed to create a new task leader as below. Then when we run cron job, we just need check whether taskLeader is the leader: taskLeader.isLeader(), if true, run the job otherwise do nothing.
@Configuration
public class AppConfig {
    @Bean
    public CuratorFramework curatorFramework() {
        final CuratorFramework framework = CuratorFrameworkFactory.builder().connectString(zkHost)
                .connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(new RetryNTimes(3, 500)).build();
        framework.start();
        return framework;
    }
    @Bean(name = "xTaskLeader")
    public TaskLeader XTaskLeader() {
        final TaskLeader leader =
                new TaskLeader(curatorFramework(), getWorkerNodeWorkerPath(), "xTaskLeader", myNode(),
                        new WorkerNodeComparator(1));
        leader.runForMaster();
        return leader;
    }

    @Bean
    public WorkerNode myNode() {
        String ipAddress;
        try {
            ipAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (final UnknownHostException e1) {
            LOGGER.error("failed to get ipaddress", e1);
            ipAddress = "Unknown IP";
        }
        final WorkerNode WorkerNode =
                new WorkerNode().setVersion(applicationProfile.getVersion()).setIpaddress(ipAddress).setId(UUID.randomUUID().toString());

        String myPath;
        try {
            myPath = curatorFramework().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .forPath(getWorkerNodeWorkerPath() + "/" + WorkerNode.getId(), SerializationUtils.serialize(WorkerNode));
            LOGGER.info("Create zk path: " + myPath);
        } catch (final Exception e) {
            // TODO change this
            LOGGER.error("Failed to create zk path", e);
        }
        return WorkerNode;
    }

    private String getWorkerNodeWorkerPath() {
        return ZookeeperService.getZKWorkersPath();
    }
}
ZookeeperService - get all active nodes
@Service
public class ZookeeperService {
    @Autowired
    private CuratorFramework client;

    private static final String ZK_ROOT_FOLDER = "/myapps";
    private static final String ZK_WORKERS = ZK_ROOT_FOLDER + "/workers";

    public static final String ZK_TASK_LEADER = ZK_ROOT_FOLDER + "/leaders";

    @Autowired
    @Qualifier("xTaskLeader")
    private TaskLeader xTaskLeader;

    public static String getZkWorkersPath() {
        return ZK_WORKERS;
    }

    public Map<String, WorkerNode> getAllWorkerNodes() {
        return getAllWorkerNodes(getZkWorkersPath());
    }

    private Map<String, WorkerNode> getAllWorkerNodes(final String zkWorkersPath) {
        try {
            final Map<String, WorkerNode> map = new HashMap<>();
            final List<String> children = client.getChildren().forPath(zkWorkersPath);

            if (children != null) {
                for (final String child : children) {
                    final String childPath = zkWorkersPath + "/" + child;
                    final WorkerNode WorkerNode =
                            (WorkerNode) SerializationUtils.deserialize(client.getData().forPath(childPath));
                    map.put(WorkerNode.getId(), WorkerNode);
                }
            }

            return map;
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "failed to get workers from zk");
        }
    }
}


Java Logging Tips/Tricks - 2016


Don't log sensitive information
Include as much dynamic info as possible
Paste logging in PR if applicable
Make sure there is enough logging to help trouble shooting issues in production later

sfl4j
log.warn("hello: {}, {}", "A", "B", new RuntimeException("wtf"));
- will print the exception's stack trace
org.slf4j.helpers.MessageFormatter.arrayFormat(String, Object[])
Throwable throwableCandidate = getThrowableCandidate(argArray);

Logback
-Dlogback.debug=true
<configuration debug="true">

Log method and line number
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n

Pattern
Use %M to print method name in debug environment.
%class{1}

MDC - Mapped Diagnostic Context
%X{X-RequestId}, %X{sessionId}
MDC.put("X-RequestId", requestId)
-- Don't forget to MDC.clear() at the beginning and end of the request.

Use MDC in thread pools
@SuppressWarnings("serial")
public class MdcThreadPoolExecutor extends ThreadPoolTaskExecutor {
    public MdcThreadPoolExecutor() {
        super();
    }
    @Override
    public void execute(final Runnable command) {
        super.execute(wrap(command, MDC.getCopyOfContextMap()));
    }
    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return new Runnable() {
            @Override
            public void run() {
                final Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
Duplicated log - fix: additivity="false"
Only log stats into app-stats.log

- Don't log to app.log and stdout(in dev)

Async logging

<root level="INFO">
  <appender-ref ref="dailyrolling" />
  <appender-ref ref="STDOUT" />
</root>
<logger name="common.statistic" level="INFO" additivity="false">
  <appender-ref ref="statsRolling" />
</logger>
<appender name="statsRolling-async" class="ch.qos.logback.classic.AsyncAppender">
  <queueSize>1000</queueSize>
  <discardingThreshold>0</discardingThreshold>
  <appender-ref ref="statsRolling" />
</appender>
<if condition='property("env").equals("local") || property("env").equals("docker")'>
  <then>
  </then>
  <else>
    <root level="WARN">
      <appender-ref ref="dailyrolling-async" />
    </root>
    <logger name="com.xyz.services" level="INFO">
      <appender-ref ref="dailyrolling-async" />
    </logger>
    <logger name="com.xyz.statistic" level="INFO">
      <appender-ref ref="statsRolling-async" />
    </logger>
  </else>
</if>
Parameterized Logging
logger.error("xx {} yy {} xx {}", x, y, z);

SiftingAppender

Log4j
-Dlog4j.debug

Log4j2
https://logging.apache.org/log4j/2.0/faq.html#troubleshooting
-DLog4jDefaultStatusLevel=DEBUG
-Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
Lazy Logging
logger.trace("Some long-running operation returned {}", () -> expensiveOperation());

Logging level configuration
Logging Cassandra Query CQL
Set log level of com.datastax.driver.core.RequestHandler = TRACE
Programmatic configuration of slf4j/logback

splunk - logging best practice
Use clear key-value pairs

To clear log without delete the file
cp /dev/null log
> log

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)