Part2: Run Time-Consuming Solr Query Faster: Use Guava CacheBuilder to Cache Response

The Problem
In our web application, the very first request to solr server is a stats query. When there are more than 50 millions data, the first stats query may take 1, 2 or more minutes. As it need load millions of documents, terms into Solr. 

For subsequent stats queries, it will run faster as Solr load them into its caches, but it still takes 5 to 15 or more seconds as the stats query is a compute-intensive task, and there is too many data.

We need make it run faster to make the web GUI more responsive.
Main Steps
1. Auto run queries X minutes after no update after startup or commit to make the first stats query run faster
2.  Use Guava CacheBuilder to Cache Solr Response
This is described in this article.

Task: Use Guava CacheBuilder to Cache Solr Response
We would like to store response of time-consuming request into cache, sol later request will be much faster.

The Implementation
CacheManager
CacheManager is the key class in the implementation. The key of the outer ConcurrentHashMap is SolrCore, its value is a ConcurrentHashMap. The key of inner ConcurrentHashMap is cacheType: such as solr request. Its value is a Guava Cache.

By default the cache = CacheBuilder.newBuilder().concurrencyLevel(16).expireAfterAccess(10, TimeUnit.MINUTES).softValues().recordStats().build(); We can specify parameter -DcacheSpec=concurrencyLevel=10,expireAfterAccess=5m,softValues to use a different kind of cache.

It adds response to cache asynchronously.
public class CacheManager implements CacheStatsOpMXBean {
  protected static final Logger logger = LoggerFactory
      .getLogger(CacheManager.class);
  public static final String CACHE_TAG_SOLR_REQUEST = "CACHE_TAG_SOLR_REQUEST";
  @SuppressWarnings("rawtypes")
  private ConcurrentHashMap<SolrCore,ConcurrentHashMap<String,Cache>> cacheMap = new ConcurrentHashMap<SolrCore,ConcurrentHashMap<String,Cache>>();
  
  private static CacheManager instance = null;
  private ExecutorService executors;
  
  private static String cacheSpec;
  
  private CacheManager() {
    cacheSpec = System.getProperty("cacheSpec");
    executors = Executors.newCachedThreadPool();
  }
  
  public static CacheManager getInstance() {
    if (instance == null) {
      synchronized (CacheManager.class) {
        if (instance == null) {
          instance = new CacheManager();
        }
      }
    }
    return instance;
  }
  
  private <K,V> Cache<K,V> newCache() {
    Cache<K,V> result = null;
    if (StringUtils.isNotBlank(cacheSpec)) {
      try {
        result = CacheBuilder.from(cacheSpec).build();
      } catch (Exception e) {
        logger.error("Invalid cacheSpec: " + cacheSpec, e);
      }
    }
    if (result == null) {
      // default cache
      result = CacheBuilder.newBuilder().concurrencyLevel(16)
          .expireAfterAccess(10, TimeUnit.MINUTES).softValues()
          .recordStats().build();
    }
    return result;
  }
  
  public <K,V> Cache<K,V> getCache(SolrCore core, String cacheTag) {
    cacheMap.putIfAbsent(core, new ConcurrentHashMap<String,Cache>());
    ConcurrentHashMap<String,Cache> coreCache = cacheMap.get(core);
    coreCache.putIfAbsent(cacheTag, newCache());
    return coreCache.get(cacheTag);
  }
  
  public void invalidateAll(SolrCore core) {
    ConcurrentHashMap<String,Cache> coreCache = cacheMap.get(core);
    if (coreCache != null) {
      for (Cache cahe : coreCache.values()) {
        cahe.invalidateAll();
      }
    }
  }

  public void addToCache(final SolrCore core, final String cacheTag,
      final CacheKeySolrQueryRequest cacheKey, final Object rspObj) {
    executors.submit(new Runnable() {
      @Override
      public void run() {
        Cache<CacheKeySolrQueryRequest,Object> cache = CacheManager
            .getInstance().getCache(core, cacheTag);
        cache.put(cacheKey, rspObj);
      }
    });
  }
}
CacheKeySolrQueryRequest
We can't use SolrQueryRequest as the the key of Guava cache. Because it doesn't implement hashCode and equals methods.The hashCode would be different for different requests with same solr query, equals would be false.
So We extract params map: Map from SolrQueryRequest, and implements the hashCode and equals methods. The order in the map and String[] array doesn't matter.

We can also use the deepHahsCode and deepEquals from java-util.
public class CacheKeySolrQueryRequest implements Serializable {
  
  private static final long serialVersionUID = 1L;
  Map<String,String[]> paramsMap;
  String url;
  
  private CacheKeySolrQueryRequest(SolrQueryRequest request) {
    this.paramsMap = SolrParams.toMultiMap(request.getParams().toNamedList());
    // remove unimportant params
    paramsMap.remove(CommonParams.TIME_ALLOWED);
    if (request.getContext().get("url") != null) {
      this.url = request.getContext().get("url").toString();
    }
  }
  
  public static CacheKeySolrQueryRequest create(SolrQueryRequest request) {
    CacheKeySolrQueryRequest result = null;
    if ((request.getContentStreams() == null || !request.getContentStreams()
        .iterator().hasNext())) {
      result = new CacheKeySolrQueryRequest(request);
    }
    return result;    
  }

  public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((url == null) ? 0 : url.hashCode());
    // the order in the map doesn't matter
    if (paramsMap != null) {
      int mapHashCode = 1;
      for (Entry<String,String[]> entry : paramsMap.entrySet()) {
        mapHashCode = (entry.getKey() == null ? 0 : entry.getKey().hashCode());
        for (String value : entry.getValue()) {
          mapHashCode = prime * mapHashCode
              + (value == null ? 0 : value.hashCode());
        }
      }
      
      result = prime * result + mapHashCode;
    }
    return result;
  }

  public boolean equals(Object obj) {
    if (this == obj) return true;
    if (obj == null) return false;
    if (getClass() != obj.getClass()) return false;
    CacheKeySolrQueryRequest other = (CacheKeySolrQueryRequest) obj;
    if (url == null) {
      if (other.url != null) return false;
    } else if (!url.equals(other.url)) return false;
    
    if (paramsMap == null) {
      if (other.paramsMap != null) return false;
    } else {
      if (paramsMap.size() != other.paramsMap.size()) return false;
      
      Iterator<Entry<String,String[]>> it = paramsMap.entrySet().iterator();
      while (it.hasNext()) {
        Entry<String,String[]> entry = it.next();
        String[] thisValues = entry.getValue();
        String[] otherValues = other.paramsMap.get(entry.getKey());
        if (!haveSameElements(thisValues, otherValues)) return false;
      }
      if (it.hasNext()) {
        return false;
      }
    }
    return true;
  }
  
  // helper class, so we don't have to do a whole lot of autoboxing
  private static class Count {
    public int count = 0;
  }
  // from: http://stackoverflow.com/questions/13501142/java-arraylist-how-can-i-tell-if-two-lists-are-equal-order-not-mattering
  public boolean haveSameElements(String[] list1, String[] list2) {
    if (list1 == list2) return true;
    if (list1 == null || list2 == null || list1.length != list2.length) return false;
    HashMap<String,Count> counts = new HashMap<String,Count>();

    for (String item : list1) {
      if (!counts.containsKey(item)) counts.put(item, new Count());
      counts.get(item).count += 1;
    }
    for (String item : list2) {
      // If the map doesn't contain the item here, then this item wasn't in
      // list1
      if (!counts.containsKey(item)) return false;
      counts.get(item).count -= 1;
    }
    for (Map.Entry<String,Count> entry : counts.entrySet()) {
      if (entry.getValue().count != 0) return false;
    }
    return true;
  }  
}
ResponseCachedSearchHandler
If useCache is true, ResponseCachedSearchHandler will first try to load the response from the cache, if the response is already cached, it will return response directly. If this is the first time this request is executed, it will run the request, if the execution time is longer than minExecuteTime, put response into cache. by default minExecuteTime is -1, mean we will always put response into cache).
We can change value of minExecuteTime, so Solr will only cache response if the requests takes more than specified minimum time.

Before return cached response, we have to call oldRsp.setReturnFields(new SolrReturnFields(oldReq)); this will set what fields to return based on fl parameter in request. Otherwise, solr will return all fields: as no fl parameter is set.

Sub class can extend ResponseCachedSearchHandler: implement isUseCache() method to determine whether solr should cache the response; implement beforeReturnFromCache to do something before return cached response back to solr.
public class ResponseCachedSearchHandler extends SearchHandler {  
  protected static final String PARAM_USE_CACHE = "useCache",
      PARAM_MIN_EXECUTE_TIME = "minExecuteTime";
  
  protected boolean defUseCache = false;
  protected int defMinExecuteTime = -1;
  public void init(NamedList args) {
    super.init(args);
    if (args != null) {
      defUseCache = defaults.getBool(PARAM_USE_CACHE, false);
      defMinExecuteTime = defaults.getInt(PARAM_MIN_EXECUTE_TIME, -1);
    }
  }
  
  public void handleRequestBody(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp) throws Exception {
    
    boolean useCache = isUseCache(oldReq);
    CacheKeySolrQueryRequest cacheKey = null;
    if (useCache) {
      Cache<CacheKeySolrQueryRequest,Object> cache = CacheManager
          .getInstance().getCache(oldReq.getCore(),
              CacheManager.CACHE_TAG_SOLR_REQUEST);
      
      cacheKey = CacheKeySolrQueryRequest.create(oldReq);
      if (cacheKey != null) {
        Object cachedRsp = cache.getIfPresent(cacheKey);
        if (cachedRsp != null) {
          NamedList<Object> valuesNL = oldRsp.getValues();
          valuesNL.add("response", cachedRsp);
          // SolrReturnFields defines which fields to return.
          oldRsp.setReturnFields(new SolrReturnFields(oldReq));
          beforeReturnFromCache(oldReq, oldRsp);
          return;
        }
      }
    }
    Stopwatch stopwatch = new Stopwatch().start();
    executeRequest(oldReq, oldRsp);
    long executeTime = stopwatch.elapsedTime(TimeUnit.MILLISECONDS);
    stopwatch.stop();
    beforeReturnNoCache(oldReq, oldRsp);
    addRspToCache(oldReq, oldRsp, useCache, cacheKey, executeTime);
  }
  
  protected void addRspToCache(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp, boolean useCache,
      CacheKeySolrQueryRequest cacheKey, long executeTime) {
    long minExecuteTime = oldReq.getParams().getInt(PARAM_MIN_EXECUTE_TIME,
        defMinExecuteTime);
    if (useCache && cacheKey != null && executeTime > minExecuteTime) {
      NamedList<Object> valuesNL = oldRsp.getValues();
      Object rspObj = (Object) valuesNL.get("response");
      CacheManager.getInstance().addToCache(oldReq.getCore(),
          CacheManager.CACHE_TAG_SOLR_REQUEST, cacheKey, rspObj);      
    }
  }
  
  /**
   * SubClass can extend this to check whether the request is stats query etc.
   */
  protected boolean isUseCache(SolrQueryRequest oldReq) {
    return oldReq.getParams().getBool(PARAM_USE_CACHE, defUseCache);
  }
  
  protected void beforeReturnNoCache(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp) {}

  protected void beforeReturnFromCache(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp) {}
      
  /**
   * by default, call searchHander.executeRequest
   */
  protected void executeRequest(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp) throws Exception {
    super.handleRequestBody(oldReq, oldRsp);
  }
}
CacheStatsFacetRequestHandler
CacheStatsFacetRequestHandler extends ResponseCachedSearchHandler, so solr will only store response of stats and facet requests. We will change the default requestHandler to use CacheStatsFacetRequestHandler.
<requestHandler name="/select" class="CacheStatsFacetRequestHandler" default="true">
    <!-- omitted -->
  </requestHandler>
public class CacheStatsFacetRequestHandler extends ResponseCachedSearchHandler {
  protected boolean isUseCache(SolrQueryRequest oldReq) {
    boolean useCache = super.isUseCache(oldReq);
    if (useCache) {
      SolrParams params = oldReq.getParams();
      useCache = params.getBool(StatsParams.STATS, false)
          || params.getBool(FacetParams.FACET, false);
    }
    return useCache;
  }
}
InvalidateCacheProcessorFactory
We need invalidate caches after solr commit. We need add the InvalidateCacheProcessorFactory to the default processor chain, and every updateRequestProcessorChain.
<updateRequestProcessorChain name="defaultChain" default="true">
    <processor class="solr.LogUpdateProcessorFactory" />
    <processor class="solr.RunUpdateProcessorFactory" />
    <processor class="InvalidateCacheProcessorFactory" />
    <processor
        class="AutoRunQueriesProcessorFactory"/>      
  </updateRequestProcessorChain>
public class InvalidateCacheProcessorFactory extends
    UpdateRequestProcessorFactory {
  public UpdateRequestProcessor getInstance(SolrQueryRequest req,
      SolrQueryResponse rsp, UpdateRequestProcessor next) {
    return new InvalidateCacheProcessor(next);
  }  
  private static class InvalidateCacheProcessor extends
      UpdateRequestProcessor {    
    public InvalidateCacheProcessor(UpdateRequestProcessor next) {
      super(next);
    }
    public void processCommit(CommitUpdateCommand cmd) throws IOException {
      super.processCommit(cmd);
      CacheManager.getInstance().invalidateAll(cmd.getReq().getCore());
    }
  }
}
Post a Comment

Labels

Java (159) Lucene-Solr (112) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (38) Eclipse (33) Code Example (31) Linux (25) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) Shell (7) ANT (6) Coding Skills (6) Database (6) Lesson Learned (6) Programmer Skills (6) Scala (6) Tips (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) System Design (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts