Solr: Use JSON(GSon) Streaming to Reduce Memory Usage


My Solr application runs at user's laptop, the max memory is set to 512mb. It pull JSON data from remote proxy which talks with remote Solr Server: 100m hundred at a time, commit after 20 times.

Our code gets the whole json and put it into memory, and use UpdateRequestProcessor.processAdd(AddUpdateCommand) to add it into local solr.

Recently it throws OutOfMemoryError, after use Eclipse Memory Analyzer (MAT) to analyze the heapdump file.

I found out that it is because the data returned from remote proxy is too large:, one data is 50-60kb on average. But some data is huge, 100 data would be 60 mb: this is rare case, but when this happens it will cause the application throws OutOfMemoryError and stops to work.

To fix this and reduce memory usage at client side, I take several measures:

1. Reboot the application when OutOfMemoryError happens.
2. Run a thread to monitor free memory, at a certain threshold(40%), run gc. If less than 30%, decrease fetch size(100 to 50, to 25) and decrease commit interval( 20 times, 10 times). If less than 50 mb memory, restart the application.
3. Enable Auto SoftCommit and AutoCommit, reduce Solr cache size.
3. Use Streaming JSON. - This is the topic of this article.
Read document one by one from http input stream, put it to queue, instead read the whole big document in to memory. Another thread is responsible to write the document to local solr.

Same approach apples if we use XML: we can use StAX or SAX to read document one by one.

I use GSON, about how to use Gson Streaming to read and write JSON, please read Gson Streaming 

The code to read document one by one from http stream:
-- Use GSon Stream API and Java Executors Future to wait all thread finished: all docs imported.
/**
 * Use Gson Streaming API to read docuemts one by one to reduce memory usage 
 */
private static ImportedResult handleResponse(SolrQueryRequest request,
    InputStream in, int fetchSize) throws UnsupportedEncodingException,
    IOException {
  ImportedResult importedResult = new ImportedResult();
  JsonReader reader = null;
  List<Future<Void>> futures = new ArrayList<Future<Void>>();
  
  try {
    reader = new JsonReader(new InputStreamReader(in, "UTF-8"));
    reader.beginObject();
    String str = reader.nextName();
    reader.beginObject();
    int fetchedSize = 0;
    int numFound = -1, start = -1;
    while (reader.hasNext()) {
      str = reader.nextName();
      if ("numFound".equals(str)) {
        numFound = Integer.valueOf(reader.nextString());
      } else if ("start".equals(str)) {
        start = Integer.valueOf(reader.nextString());
      } else if ("docs".equals(str)) {
        reader.beginArray();
        // read documents
        while (reader.hasNext()) {
          fetchedSize++;
          readOneDoc(request, reader);
        }
        
        reader.endArray();
      }
    }
    
    reader.endObject();
    waitComplete(futures);
    importedResult.setFetched(fetchedSize);
    importedResult.setHasMore((fetchedSize + start) < numFound);
    importedResult.setImportedData((fetchedSize != 0));
    return importedResult;
  } finally {
    if (reader != null) {
      reader.close();
    }
  }
}

private static java.util.concurrent.Future<Void> readOneDoc(
    SolrQueryRequest request, JsonReader reader) throws IOException {
  String str;
  reader.beginObject();
  String id = null, binaryDoc = null;
  while (reader.hasNext()) {
    str = reader.nextName();
    
    if ("id".equals(str)) {
      id = reader.nextString();
    } else if ("binaryDoc".equals(str)) {
      binaryDoc = reader.nextString();
    }
  }
  reader.endObject();
  return CVSyncDataImporter.getInstance().importData(request, id,
      binaryDoc);
}
The code to write document to local solr:
public Future<Void> importData(SolrQueryRequest request, String id,
    String binaryDoc) {
  if (id == null) {
    throw new IllegalArgumentException("id can't be null.");
  }
  if (binaryDoc == null) {
    throw new IllegalArgumentException("binaryDoc can't be null.");
  }
  SolrDataImporter task = new SolrDataImporter(request, id, binaryDoc);
  return executor.submit(task);
}
private static SolrInputDocument convertToSolrDoc(String id,
    String binaryDoc) throws IOException {
  byte[] bindata = Base64.base64ToByteArray(binaryDoc);
  SolrInputDocument resultDoc = (SolrInputDocument) readZippedFile(bindata);
  resultDoc.setField("id", id);
  return resultDoc;
}

private class SolrDataImporter implements Callable<Void> {
  private SolrQueryRequest request;
  private String id, binaryDoc;
  @Override
  public Void call(){
    try {
      UpdateRequestProcessorChain updateChian = request.getCore()
          .getUpdateProcessingChain("mychain");
      SolrInputDocument toSolrServerSolrDoc = convertToSolrDoc(id,
          binaryDoc);
      binaryDoc = null;
      AddUpdateCommand command = new AddUpdateCommand(request);
      command.solrDoc = toSolrServerSolrDoc;
      SolrQueryResponse response = new SolrQueryResponse();
      UpdateRequestProcessor processor = updateChian.createProcessor(request,
          response);
      processor.processAdd(command);
    } catch (Exception e) {
      logger.error("Exception happened when importdata, id: "
          + id, e);
    }
    return null;
  }
}

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)