Cassandra Tips




Export Keyspace Schema:
./cqlsh -e "USE keyspace_name; DESC KEYSPACE" > schema.cql

Get row count of table

./nodetool cfstats qosdb



Spark Optimization Tips


Batch Operation
If we need call external resource(db, Solr), run them in batch.

Memory: Lazily Load & Split Data in Partition
For operations like: mapPartition, foreachPartition etc, if each partition is big, don't call List.newArraylist(Iterator<String> t), as this would load the whole partition into memory. especially if you later load more data.
In stead, call Guava  UnmodifiableIterator<List<String>> partitions =Iterators.partition(t, PARTITION_SIZE) to split the partition, which evaluates and get the smaller list lazily.

Call Async and Run different Rdds Operations in parallel?
1. On different related Rdds, if your spark clusters can run these multiple Rdds operations in parallel: Rdds are relatively small, each Rdd will not use all nodes, then call async(countAsync, foreachAsync, etc) would help, otherwise it may not.
2. On related Rdds: 
If these Rdds have not be materialized, Don't do it.
For example: 
a=sc.textFile()..other. transformers; 
a.cache()
b=a.union(cRdd).other. transformers;
a.countAsync()
b.countAsync()
This will cause the file read twice, and transformers on a executed twice.

Combine Operations
Reduce Operations
Don't do them at all
For example, if you need call foreachPartition, mapPartition, and also need know the size of Rdd: 
Don't call rdd.count, instead use accumulator and call accumulator.add(list.size()) in each partition. 

Coalesce if needed: after filter a lot of data

Filter Data Early
Don't call sc. collect Unnecessarily 
Use Rdds, don't store big data in driver node.

Use Spark UI  to Monitor
Shuffle writes, how many partitions, memory usage, when actions start/stop, whether actions are run in parallel.

Use Event History to Compare performance after Change

Spark Tips: Miscs


Remote Debug Code Running in Spark Shell
export SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9999"
./spark-shell

change spark.ui.port
./spark-shell --conf spark.ui.port=12345

Disable/Change Log Level in Spark Shell
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

Spark Event Logs and History Server
-- So we can save data, and compare them.
spark.eventLog.enabled=true
spark.eventLog.dir=/tmp/spark-events

spark-home/sbin/start-history-server.sh /tmp/spark-events

Examples
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)

counts.saveAsTextFile("hdfs://...")

Eclipse: Breakpoint in AspectJ Ignored


The Problem
Breakpoint in Aspect class in Eclipse doesn't work.

The Solution
For better performance, by default, AspectJ compiler inlines aspect code into methods. This makes the breakpoints in Aspect classes not work.

To fix it, we can install Eclipse AspectJ Development Tools (AJDT).
It can visualize how aspects were affecting classes in a project.

Then Right CLick project --> Properties -> AspectJ Compiler -> Possible Optimizations: Select noinline.

Apache Spark: Using AspectJ, Fake-S3 and Local Files to Save Money and Cost


We are using Amazon S3 to store files and running Spark application in AWS. 
When run integration test, it's slow as it need call S3 apt to locate files and get files remotely.

So in order to run integration test faster and save cost, we can use fake-s3 and local files instead.

Fake-S3  && How to install/run it
Check https://github.com/jubos/fake-s3
Saving Time and Money with Fake S3

High Level Idea
During first time when we run integration test with specific parameters, we actually call S3, but get and save them locally and put them into fake-s3, change the filePath from s3n:// to local file path. So the rest program will interact with local files.

After this, when run integration test with same parameters, we don't call Amazon S3 at all, instead, we use local fake-s3.

Low Level Implementation
We don't want to change program code to add this kind of logic - as this is only related when we are running local integration test.

Using AspectJ to change behavior for local integration test only
In our main spring-context.xml:
$lt;import resource="classpath:config/${env}/spring-context.xml"/$gt;
in config/local/spring-context.xml, create AspectJ that will do the following things:
$lt;bean id="usingFakeS3Aspect"
 class="commons.util.aspect.UsingFakeS3Aspect"$gt;
 $lt;property name="retriesNumber" value="2" /$gt;
$lt;/bean$gt;

$lt;aop:config$gt;
 $lt;aop:aspect id="akeS3Aspect" ref="usingFakeS3Aspect"$gt;
  $lt;aop:pointcut id="pointCutS3PostConstructor"
   expression="execution(* utils.S3Utils.postConstructor(..)) && target(s3Util)" /$gt;
  $lt;aop:after method="afterPostConstructor" pointcut-ref="pointCutS3PostConstructor" /$gt;

  $lt;aop:pointcut id="pointCutSetLogFilePath"
   expression="execution( * config.QosEventsContextImpl.setLogFilePath(*)) && target(context) && args(logFilePath)" /$gt;
  $lt;aop:around method="useLocalFSInsteadOfS3n" pointcut-ref="pointCutSetLogFilePath" /$gt;
 $lt;/aop:aspect$gt;
$lt;/aop:config$gt;
1.  After S3Utils post constructor, if test.run.useFakeS3==true, use fake-s3:
s3.setEndpoint("http://localhost:4567");
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
2. Around context.setFilePath, if test.run.useLocalFS=true, if local files doesn't exist, get them from Amazon S3, save to local and put to fake-s3, then change file path from s3n:// to local file path.
Implementation Code
public class UsingFakeS3Aspect {
    private static final Logger LOGGER = LoggerFactory.getLogger(UsingFakeS3Aspect.class);
    private S3Utils s3Utils;
    //@After("execution(* utils.S3Utils.postConstructor(..)) && target(s3Util)")
    public void afterPostConstructor(S3Utils s3Util) {
        this.s3Utils = s3Util;
        boolean useFakeS3 = "true".equals(System.getProperty("test.run.useFakeS3", "false"));
        if (useFakeS3) {
            LOGGER.info("Using fake-s3");
            AwsBucketConfig s3 = s3Util.getS3Config();
            s3.setEndpoint("http://localhost:4567");
            s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
        }
    }

    // see http://stackoverflow.com/questions/4312224/aspectj-overwrite-an-argument-of-a-method
    //@Around("execution( * ContextImpl.setLogFilePath(*) ) && target(context) && args(logFilePath)")
    public void useLocalFSInsteadOfS3n(final ProceedingJoinPoint pjp, Context context, String logFilePath)
            throws Throwable {
        boolean useLocalFS = "true".equals(System.getProperty("test.run.useLocalFS"));

        String newLogFilePath = logFilePath;
        if (useLocalFS && logFilePath != null && logFilePath.startsWith("s3n://")) {
            boolean isFirstTime = "false".equals(System.getProperty("test.run.useFakeS3", "false"));
            newLogFilePath = copyFromAWSToFakeS3AndUsingLocalFiles(context.getFilesToIngest(), isFirstTime);
        }
        pjp.proceed(new Object[] {context, newLogFilePath});
        // pjp.proceed(new Object[] {newLogFilePath});
    }

    private String copyFromAWSToFakeS3AndUsingLocalFiles(Set$lt;String$gt; filesToIngest, boolean isFirstTime) {
        String logFilePath;
        StringBuilder sb = new StringBuilder();
        AmazonS3Client fakeS3 = S3Utils.createFakeS3();
        s3Utils.createfakeBucketIfNotExists(fakeS3);
        // check whether this file exists in fake-s3, if not create it
        for (String fileToIngest : filesToIngest) {
            File localFile = new File(LOCAL_S3_ROOT, fileToIngest);
            if (!localFile.exists()) {
                s3Utils.saveAWSFileToLocal(fileToIngest, localFile);
                s3Utils.saveLocalFileToFakeS3(localFile, fakeS3);
            } else {
                if (isFirstTime) {
                    // only save local file to fake-s3 once - the first time.
                    s3Utils.saveLocalFileToFakeS3(localFile, fakeS3);
                }
            }
            sb.append(new File(LOCAL_S3_ROOT, fileToIngest).getAbsolutePath()).append(",");
        }

        if (sb.length() $gt; 0) {
            sb.setLength(sb.length() - 1);
        }
        logFilePath = sb.toString();
        return logFilePath;
    }

    public static final String LOCAL_S3_ROOT = "/some-path";
    // Another approach: this would cause setLogFilePath called again with changed parameters.
    // @After("execution( * ContextImpl.setLogFilePath(*) ) && target(context)")
    public void useLocalFSInsteadOfS3nAnotherApproach(Context context) {
        String logFilePath = context.getLogFilePath();
        boolean useLocalFS = "true".equals(System.getProperty("test.run.useLocalFS"));

        if (useLocalFS && logFilePath != null && logFilePath.startsWith("s3n://")) {
            boolean isFirstTime = "false".equals(System.getProperty("test.run.useFakeS3", "false"));
            context.setLogFilePath(copyFromAWSToFakeS3AndUsingLocalFiles(context.getFilesToIngest(), isFirstTime));
            LOGGER.info("Using local FS instead of S3.");
        }
    }    
}

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)