Apache Spark: Using AspectJ, Fake-S3 and Local Files to Save Money and Cost

We are using Amazon S3 to store files and running Spark application in AWS. 
When run integration test, it's slow as it need call S3 apt to locate files and get files remotely.

So in order to run integration test faster and save cost, we can use fake-s3 and local files instead.

Fake-S3  && How to install/run it
Check https://github.com/jubos/fake-s3
Saving Time and Money with Fake S3

High Level Idea
During first time when we run integration test with specific parameters, we actually call S3, but get and save them locally and put them into fake-s3, change the filePath from s3n:// to local file path. So the rest program will interact with local files.

After this, when run integration test with same parameters, we don't call Amazon S3 at all, instead, we use local fake-s3.

Low Level Implementation
We don't want to change program code to add this kind of logic - as this is only related when we are running local integration test.

Using AspectJ to change behavior for local integration test only
In our main spring-context.xml:
$lt;import resource="classpath:config/${env}/spring-context.xml"/$gt;
in config/local/spring-context.xml, create AspectJ that will do the following things:
$lt;bean id="usingFakeS3Aspect"
 $lt;property name="retriesNumber" value="2" /$gt;

 $lt;aop:aspect id="akeS3Aspect" ref="usingFakeS3Aspect"$gt;
  $lt;aop:pointcut id="pointCutS3PostConstructor"
   expression="execution(* utils.S3Utils.postConstructor(..)) && target(s3Util)" /$gt;
  $lt;aop:after method="afterPostConstructor" pointcut-ref="pointCutS3PostConstructor" /$gt;

  $lt;aop:pointcut id="pointCutSetLogFilePath"
   expression="execution( * config.QosEventsContextImpl.setLogFilePath(*)) && target(context) && args(logFilePath)" /$gt;
  $lt;aop:around method="useLocalFSInsteadOfS3n" pointcut-ref="pointCutSetLogFilePath" /$gt;
1.  After S3Utils post constructor, if test.run.useFakeS3==true, use fake-s3:
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
2. Around context.setFilePath, if test.run.useLocalFS=true, if local files doesn't exist, get them from Amazon S3, save to local and put to fake-s3, then change file path from s3n:// to local file path.
Implementation Code
public class UsingFakeS3Aspect {
    private static final Logger LOGGER = LoggerFactory.getLogger(UsingFakeS3Aspect.class);
    private S3Utils s3Utils;
    //@After("execution(* utils.S3Utils.postConstructor(..)) && target(s3Util)")
    public void afterPostConstructor(S3Utils s3Util) {
        this.s3Utils = s3Util;
        boolean useFakeS3 = "true".equals(System.getProperty("test.run.useFakeS3", "false"));
        if (useFakeS3) {
            LOGGER.info("Using fake-s3");
            AwsBucketConfig s3 = s3Util.getS3Config();
            s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));

    // see http://stackoverflow.com/questions/4312224/aspectj-overwrite-an-argument-of-a-method
    //@Around("execution( * ContextImpl.setLogFilePath(*) ) && target(context) && args(logFilePath)")
    public void useLocalFSInsteadOfS3n(final ProceedingJoinPoint pjp, Context context, String logFilePath)
            throws Throwable {
        boolean useLocalFS = "true".equals(System.getProperty("test.run.useLocalFS"));

        String newLogFilePath = logFilePath;
        if (useLocalFS && logFilePath != null && logFilePath.startsWith("s3n://")) {
            boolean isFirstTime = "false".equals(System.getProperty("test.run.useFakeS3", "false"));
            newLogFilePath = copyFromAWSToFakeS3AndUsingLocalFiles(context.getFilesToIngest(), isFirstTime);
        pjp.proceed(new Object[] {context, newLogFilePath});
        // pjp.proceed(new Object[] {newLogFilePath});

    private String copyFromAWSToFakeS3AndUsingLocalFiles(Set$lt;String$gt; filesToIngest, boolean isFirstTime) {
        String logFilePath;
        StringBuilder sb = new StringBuilder();
        AmazonS3Client fakeS3 = S3Utils.createFakeS3();
        // check whether this file exists in fake-s3, if not create it
        for (String fileToIngest : filesToIngest) {
            File localFile = new File(LOCAL_S3_ROOT, fileToIngest);
            if (!localFile.exists()) {
                s3Utils.saveAWSFileToLocal(fileToIngest, localFile);
                s3Utils.saveLocalFileToFakeS3(localFile, fakeS3);
            } else {
                if (isFirstTime) {
                    // only save local file to fake-s3 once - the first time.
                    s3Utils.saveLocalFileToFakeS3(localFile, fakeS3);
            sb.append(new File(LOCAL_S3_ROOT, fileToIngest).getAbsolutePath()).append(",");

        if (sb.length() $gt; 0) {
            sb.setLength(sb.length() - 1);
        logFilePath = sb.toString();
        return logFilePath;

    public static final String LOCAL_S3_ROOT = "/some-path";
    // Another approach: this would cause setLogFilePath called again with changed parameters.
    // @After("execution( * ContextImpl.setLogFilePath(*) ) && target(context)")
    public void useLocalFSInsteadOfS3nAnotherApproach(Context context) {
        String logFilePath = context.getLogFilePath();
        boolean useLocalFS = "true".equals(System.getProperty("test.run.useLocalFS"));

        if (useLocalFS && logFilePath != null && logFilePath.startsWith("s3n://")) {
            boolean isFirstTime = "false".equals(System.getProperty("test.run.useFakeS3", "false"));
            context.setLogFilePath(copyFromAWSToFakeS3AndUsingLocalFiles(context.getFilesToIngest(), isFirstTime));
            LOGGER.info("Using local FS instead of S3.");


