Creating Custom Solr Type to Stream Large Text Field

The Problem
In my project, we run some queries in Solr server, and return combined response back to client(we can't use SolrJ as the request goes through some proxy applications which add extra functions). But some text fields are too large, we would like to reduce their size, so the transfer content would be much less.
Steps We Have Done
To reduce the content size,  at remote Solr server we use GZipOutputStream and Bse64OutputStream to zip the string, this can reduce the size by more than 85%: Original 134 mb String is compressed to 16mb.
Read more: Java: Use ZIP Stream and Base64 Stream to Compress Large String Data
Java: Use Zip Stream and Base64 Encoder to Compress Large String Data 
At client side, when it receives the zipped base64 string, it first Base64 decodes it, uncompress it, then add it as a field into Solr. 

But If we do all this in memory, it will load the huge original unzipped string 134mb into memory. It will cause the application OutOfMemoryError. Obviously this is not desired.

We want to first use stream(Base64InputStream and GZipInputStream) to unzip it, and write original string into a temp file. When Solr add this field into Solr, it can use Reader to read from the temp file, after it’s done, it can delete the temp file.

In Lucene, we can provide a Reader as a parameter to Field constructor, Lucene will consume the reader and close it after it’s done. 
The field can only indexed, not stored. But this is fine for us, as this field is only used for search.

Solr doesn't expose this function, but we can easily extend Solr to define a custom field which accepts a Reader.

The Solution
Custom Solr Field Type: FileTextField
FileTextField extends solr.schema.TextField. When add value to FileTextField, the value can be a string, or a reader. If it's a reader, createField will create a Lucene Field with Reader as parameter: Field f = new Field(name, fr, type); Lucene will consume the reader, and close it after it's done.
FileTextField has one configuration parameter: deleteFile. If true, it will delete the file after Lucene has read the file and written it to index. If false, it will keep this file. We have to set encoding in its constructors: this can avoid the problem: different encodings are used when write and read the file.
public class FileTextField extends TextField {
  private boolean deleteFile;
  protected void init(IndexSchema schema, Map<String,String> args) {
    String str = args.remove("deleteFile");
    if (str != null) {
      deleteFile = Boolean.parseBoolean(str);
    }
    super.init(schema, args);
  }

  public IndexableField createField(SchemaField field, Object value, float boost) {
    if (!field.indexed() && !field.stored()) {
      if (log.isTraceEnabled()) log.trace("Ignoring unindexed/unstored field: "
          + field);
      return null;
    }
    if (value instanceof FileHolder) {
      FileHolder fileHolder = (FileHolder) value;
      fileHolder.setDeleteFile(deleteFile);
      Reader reader;
      try {
        reader = fileHolder.getReader();
        return createFileTextField(field.getName(), reader, boost);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    } else {
      return super.createField(field, value, boost);
    }
  }
  
  public Field createFileTextField(String name, Reader fr,
      float boost) {
    Field f = new org.apache.lucene.document.TextField(name, fr);
    f.setBoost(boost);
    return f;
  }
}
FileHolder 
In order to get the file path, we create a wrapper FileHolder, it's get reader method return an InputStreamReader. If deleteFile is true, its close method will delete the file after close the stream.
public static class FileHolder {
 private String filePath;
 private boolean deleteFile;
 private String encoding;
 public FileHolder(File file, String encoding)
   throws FileNotFoundException, UnsupportedEncodingException {
  this.filePath = file.getAbsolutePath();
  this.encoding = encoding;
 }
 public void setDeleteFile(boolean deleteFile) {
  this.deleteFile = deleteFile;
 }
  // @return  an InputStreamReader, if deleteFile is true, it will delete the file when the reader is closed.
 public Reader getReader() throws IOException {
  InputStreamReader reader = new InputStreamReader(new FileInputStream(
    filePath), encoding) {
   @Override
   public void close() throws IOException {
    super.close();
    if (deleteFile) {
     boolean deleted = new File(filePath).delete();
     if (!deleted) {
      log.error("Unable to delete " + filePath);
     }
    }
   }
  };
  return reader;
 }
}
Define FileTextField field in schema FileTextField is similar as Solr Text Field, we can define tokenizer and filters for index and query. It accepts an additional parameter: deleteFile. The value of stored can't be true. Read Document from Stream and add FileTextField into Solr The following code uses GSon's streaming JsonReader to read one document. We can determine size of zippedcontent by size field. If it's too large, we will write the uncompressed string into a temporary file and add a FleHodler instance to content field. 
private static Future<Void> readOneDoc(JsonReader reader, SolrQueryRequest request)
  throws IOException {
String str;
reader.beginObject();
long size = 0;
Object unzippedcontent = null;
boolean useFileText = false;
SolrInputDocument solrDoc = new SolrInputDocument();
while (reader.hasNext()) {
  str = reader.nextName();
  if ("size".equals(str)) {
 size = Long.parseLong(reader.nextString());
 if (size > size_LIMIT_FILETEXT) {
   useFileText = true;
 }
  } else if ("zippedcontent".equals(str) && reader.peek() != JsonToken.NULL) {
 if (useFileText) {
   // unzippedcontent is a FleHodler
   unzippedcontent = unzipValueToTmpFile(reader);
 } else {
   // decoded and uncompressed string
   unzippedcontent = unzipValueDirectly(reader);
 }
  } else {
 // in case, we change server side code.
 reader.skipValue();
  }
}
reader.endObject();
// add it to solr
UpdateRequestProcessorChain updateChian = request.getCore()
   .getUpdateProcessingChain("/update");
AddUpdateCommand command = new AddUpdateCommand(request);
command.solrDoc = solrDoc;
UpdateRequestProcessor processor = updateChian.createProcessor(request, new SolrQueryResponse());
processor.processAdd(command); 
}

private static String unzipValueDirectly(JsonReader reader)
  throws IOException {
String value = reader.nextString();
ZipInputStream zi = null;
try {
  Base64InputStream base64is = new Base64InputStream(new ByteArrayInputStream(
   value.getBytes("UTF-8")));
  zi = new ZipInputStream(base64is);
  zi.getNextEntry();
  return IOUtils.toString(zi);
} finally {
  IOUtils.closeQuietly(zi);
}
}

private static FileHolder unzipValueToTmpFile(JsonReader reader) throws IOException {
File tmpFile = File.createTempFile(TMP_FILE_PREFIX_ZIPPEDCONTENT, TMP_FILE_PREFIX_SUFFIX);
String value = reader.nextString();
ZipInputStream zi = null;
OutputStreamWriter osw = null;

try {
 Base64InputStream base64is = new Base64InputStream(new ByteArrayInputStream(
   value.getBytes("UTF-8")));
  zi = new ZipInputStream(base64is);
  zi.getNextEntry();
  osw = new OutputStreamWriter(new FileOutputStream(tmpFile), "UTF-8");
  IOUtils.copy(zi, osw);
  zi.closeEntry();
} finally {
  IOUtils.closeQuietly(osw);
  IOUtils.closeQuietly(base64is);
}
return new FileHolder(tmpFile.getAbsolutePath(), "UTF-8");
}
Conclusion
  • Use GZipOutput/InputStream and Bse64Output/InputStream to compress the large text. This can reduce size of text about 85%, this can reduce the time to transfer the request/response.
  • To reduce memory usage at client side:
  •   We use stream api(GSon stream or XML Stax) to read doc one by one.
  •   Define a custom Solr Field Type: FileTextField which accepts FileHolder as value. FileTextField will eventually pass a reader to Lucene. Lucene will use the reader to read content and add to index.
  •   When the text field is too big, first uncompress it to a temp file, create a FileHolder instance, then set the FileHolder instance as field value.
Post a Comment

Labels

Java (159) Lucene-Solr (110) All (60) Interview (59) J2SE (53) Algorithm (37) Eclipse (35) Soft Skills (35) Code Example (31) Linux (26) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (15) Defects (14) Text Mining (14) J2EE (13) Network (13) PowerShell (11) Chrome (9) Continuous Integration (9) How to (9) Learning code (9) Performance (9) UIMA (9) html (9) Design (8) Dynamic Languages (8) Http Client (8) Maven (8) Security (8) Trouble Shooting (8) bat (8) blogger (8) Big Data (7) Google (7) Guava (7) JSON (7) Problem Solving (7) ANT (6) Coding Skills (6) Database (6) Scala (6) Shell (6) css (6) Algorithm Series (5) Cache (5) IDE (5) Lesson Learned (5) Miscs (5) Programmer Skills (5) System Design (5) Tips (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) OpenNLP (4) Project Managment (4) Python (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Firefox (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Google Drive (2) Gson (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Bit Operation (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Troubleshooting (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts