Solr: Export Large(Millions) Data to a CSV File

The Problem
The task is to dump all access time over 5 years data to csv file from several solr servers which are deployed in virtual machines with limited resource: 4g memory only.
There are 2.8 millions items that matches the query. 

First Approach that doesn't work
The first approach is to get 1000 rows each time, repeat until get all data: the query looks like start=X&rows=1000. This seems easy and should just work.
But when I run it, all solr servers froze for several hours, and did't finish after 3 hours.

This is caused by a long-lasting problem in Solr: Deep pagination. 
Simply put, when we try to get the 1,000,000 to 1,001,000 data, Solr has to load 1,001,00 sorted documents from index, then get last 1000 data. 
The problem get even worse in solr cloud or distributed search mode, as every shard has to sorted 1,001,00 documents and send all docs to one dest solr server, which will then iterate all data to get the 1000 data.

Luckily, this problem is going to be fixed in Solr-5463 in the coming Solr 4.7.
Please read more detail at Solr Deep Pagination Problem Fixed in Solr-5463

Final Solution
I check outed latest branch_4x from Solr SVN, run the following commands to build a new solr.war:
ant get-maven-poms
cd maven-build
mvn -DskipTests install
Then define a custom Solr request handler: CSVExportToFileHandler. It supports all solr query params: such as q, fq, rows, shards, etc. But one limit is the start must be 0. Also it supports all parameters in CSVResponseWriter, such as csv.header, csv.separator, csv.null.
Main ideas
1. Use the cursor feature introduced in Solr-5463. 
Add cursorMark=* in the first query, parse nextCursorMark from response, and it as new cursorMark value in subsequent value.

2. Execute query and disk write in parallel.
To run the dump task faster, I execute the query and disk write operation in parallel. Disk operation is run in a separate thread using Executors.newSingleThreadExecutor().

The file will be dumped into a data folder in solr server.

The query looks like below:
http://solr1:8080/solr/exportcsvfile?q=accesstime:[* TO NOW-5YEAR/DAY]&sort=accesstime desc, contentid asc &shards=solr1:8080/solr,solr2:8080/solr,solr3:8080/solr&overwrite=true&parallel=true&fileName=over5yearsold.csv
The response of dump operation looks like below:
<response>
  <long name="readRows">2821608</long>
  <bool name="success">true</bool>
  <str name="msg">File saved to: C:\lifelongprogrammer\solr\data-folder\over5yearsold.csv</str>
  <long name="timeTaken">2092575</long>
</response>
The configuration in SolrConfig.xml:
<requestHandler name="/exportcsvfile" class="org.codeexample.lifelongprogrammer.solr.handler.CSVExportToFileHandler">
  <lst name="defaults">
    <str name="dataFolder">data-folder</str>
    <bool name="parallel">false</bool>
    <double name="freeMemoryRatioLimit">0.2</double>
  </lst>
</requestHandler>
The Code
Please read Using Object Pool Design Pattern to Reduce Garbage Collection in Solr for the source code of PooledLocalSolrQueryRequest.
package org.codeexample.lifelongprogrammer.solr.handler;
public class CSVExportToFileHandler extends RequestHandlerBase {
  protected static final Logger logger = LoggerFactory
      .getLogger(CSVExportToFileHandler.class);
  private static final Charset UTF8 = Charset.forName("UTF-8");
  
  private static final int ROWS_MAX_ONE_TIME = 1000;
  
  private static final String PARAM_DATA_FOLDER = "dataFolder",
      PARAM_FILE_NAME = "fileName", PARAM_OVERWRITE = "overwrite",
      PARAM_IO_PARALLEL = "parallel",
      PARAM_FREE_MEMORY_RATIO_LIMIT = "freeMemoryRatioLimit";
  private String dataFolder;
  private boolean defaultIOParallel;
  private Double freeMemoryRatioLimit = null;
  
  @SuppressWarnings("rawtypes")
  @Override
  public void init(NamedList args) {
    super.init(args);
    
    if (defaults != null) {
      dataFolder = defaults.get(PARAM_DATA_FOLDER);
      if (StringUtils.isBlank(dataFolder)) {
        throw new IllegalArgumentException("No dataFolder is set!");
      }
      String str = defaults.get(PARAM_IO_PARALLEL);
      if (StringUtils.isNotBlank(str)) {
        defaultIOParallel = Boolean.parseBoolean(str);
      }
      str = defaults.get(PARAM_FREE_MEMORY_RATIO_LIMIT);
      if (StringUtils.isNotBlank(str)) {
        freeMemoryRatioLimit = Double.parseDouble(str);
      }
    }
  }
  
  @Override
  public void handleRequestBody(SolrQueryRequest oldReq,
      SolrQueryResponse oldRsp) throws Exception {
    Stopwatch stopwatch = new Stopwatch().start();
    
    SolrParams params = oldReq.getParams();
    boolean success = true;
    File destFile = getDestFile(oldReq, params);
    Writer writer = null;
    
    try {
      FileOutputStream fos = new FileOutputStream(destFile);
      writer = new OutputStreamWriter(fos, UTF8);
      Integer rows = params.getInt(CommonParams.ROWS);
      if (rows != null && rows < ROWS_MAX_ONE_TIME) {
        ModifiableSolrParams newParams = new ModifiableSolrParams(
            oldReq.getParams());
        newParams.set(CommonParams.WT, "csv");
        exeAndwriteRspToFie(
            PooledLocalSolrQueryRequest.obtain(oldReq.getCore(), newParams),
            writer, false, null);
      } else {
        long readRows = exeAndwriteRspToFileMultileSteps(oldReq, rows, writer);
        oldRsp.add("readRows", readRows);
      }
      oldRsp.add("success", success);
      oldRsp.add("msg", "File saved to: " + destFile);
    } catch (Exception e) {
      success = false;
      oldRsp.add("success", success);
      oldRsp.setException(e);
      logger.error("Exception happened when handles " + params, e);
    } finally {
      PooledLocalSolrQueryRequest.closeAll();
      long timeTaken = stopwatch.elapsed(TimeUnit.MILLISECONDS);
      logger.info("Import done, success: " + success + ", timeTaken: "
          + timeTaken + ", file saved to: " + destFile);
      oldRsp.add("timeTaken", timeTaken);
      IOUtils.closeQuietly(writer);
    }
  }
  
  @SuppressWarnings("unchecked")
  public long exeAndwriteRspToFileMultileSteps(SolrQueryRequest oldReq,
      Integer rows, Writer writer) throws FileNotFoundException, IOException,
      InterruptedException {
    SolrParams oldParams = oldReq.getParams();
    int start = 0;
    if (oldParams.getInt(CommonParams.START) != null) {
      start = oldParams.getInt(CommonParams.START);
    }
    if (start != 0) {
      throw new IllegalArgumentException(
          "Start must be 0, as Cursor functionality requires start=0");
    }
    // if rows is set, not null, track read rows
    int maxRows = -1;
    boolean trackReadRows = false;
    if (oldParams.getInt(CommonParams.ROWS) != null) {
      maxRows = oldParams.getInt(CommonParams.ROWS);
      trackReadRows = true;
    }
    long readRows = 0;
    boolean ioParallel = oldParams
        .getBool(PARAM_IO_PARALLEL, defaultIOParallel);
    ExecutorService ioExecutor = null;
    if (ioParallel) {
      ioExecutor = Executors.newSingleThreadExecutor();
    }
    // only check queryParallel when ioParallel is true
    Long numFound = null;
    boolean firstOp = true;
    int logTimes = 0;
    String cursorMark = "*";
    try {
      while (true) {
        ModifiableSolrParams newParams = newParams(oldParams, maxRows,
            trackReadRows, readRows, firstOp, cursorMark);
        firstOp = false;
        
        SolrQueryResponse newRsp = exeAndwriteRspToFie(
            PooledLocalSolrQueryRequest.obtain(oldReq.getCore(), newParams),
            writer, ioParallel, ioExecutor);
        
        NamedList<Object> valuesNL = newRsp.getValues();
        String nextCursorMark = (String) valuesNL.get("nextCursorMark");
        logger.info("New nextCursorMark: " + nextCursorMark);
        
        if (StringUtils.equals(nextCursorMark, cursorMark)) {
          // if same, means there is no data to read.
          break;
        }
        cursorMark = nextCursorMark;
        
        Object rspObj = (Object) valuesNL.get("response");
        if (rspObj == null) {
          throw new RuntimeException("response is null, " + valuesNL);
        }
        if (rspObj instanceof ResultContext) {
          ResultContext rc = (ResultContext) rspObj;
          if (numFound == null) {
            numFound = (long) rc.docs.matches();
            logger.info("numFound: " + numFound);
            if (maxRows == -1) {
              maxRows = numFound.intValue();
            }
          }
          readRows += rc.docs.size();
        } else if (rspObj instanceof SolrDocumentList) {
          SolrDocumentList docList = (SolrDocumentList) rspObj;
          if (numFound == null) {
            numFound = docList.getNumFound();
            logger.info("numFound: " + numFound);
            if (maxRows == -1) {
              maxRows = numFound.intValue();
            }
          }
          readRows += docList.size();
        } else {
          throw new RuntimeException("Unkown response type: "
              + rspObj.getClass());
        }
        
        if (readRows == maxRows) {
          break;
        } else if (readRows > maxRows) {
          throw new RuntimeException("Should not happen, want to get "
              + maxRows + ", but we have read " + readRows);
        }
        // log 10 times
        if (logger.isDebugEnabled()) {
          if (readRows > maxRows / 10 * logTimes) logger
              .info("Handled to end: " + maxRows);
        }
      }
      return readRows;
    } finally {
      if (ioParallel) {
        // wait IO thread complte
        ioExecutor.shutdown();
        ioExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
      }
    }
  }
  
  public ModifiableSolrParams newParams(SolrParams oldParams, int maxRows,
      boolean trackReadRows, long readRows, boolean firstOp, String cursorMark) {
    ModifiableSolrParams newParams = new ModifiableSolrParams(oldParams);
    newParams.set(CommonParams.START, 0);
    if (!trackReadRows) {
      newParams.set(CommonParams.ROWS, ROWS_MAX_ONE_TIME);
    } else {
      if (maxRows == -1) {
        // for the first time
        newParams.set(CommonParams.ROWS, ROWS_MAX_ONE_TIME);
      } else {
        if (maxRows - readRows > ROWS_MAX_ONE_TIME) {
          newParams.set(CommonParams.ROWS, ROWS_MAX_ONE_TIME);
        } else {
          newParams.set(CommonParams.ROWS, "" + (maxRows - readRows));
        }
      }
    }
    newParams.set(CommonParams.WT, "csv");
    newParams.set("cursorMark", cursorMark);
    
    if (!firstOp) {
      // don't outut header
      newParams.set("csv.header", false);
    }
    return newParams;
  }
  
  /**
   * IO operation is expensive, so we want to do query and IO write in parallel.
   * 
   * @param parallel
   *          whether execute query and IO in parallel, if ture, it may take
   *          more memory.
   */
  public SolrQueryResponse exeAndwriteRspToFie(
      final PooledLocalSolrQueryRequest req, final Writer writer,
      boolean parallel, ExecutorService executor) throws FileNotFoundException,
      IOException {
    final SolrCore core = req.getCore();
    SolrRequestHandler handler = core.getRequestHandler("/select");
    final SolrQueryResponse newRsp = new SolrQueryResponse();
    handler.handleRequest(req, newRsp);
    Exception ex = newRsp.getException();
    if (ex != null) {
      throw new RuntimeException("Exception happend when handle: " + req, ex);
    }
    // Also consider memory usage
    if (parallel) {
      if (freeMemoryRatioLimit != null) {
        double curFreeMemoryRatio = getFreeMemoryRatio();
        parallel = curFreeMemoryRatio > freeMemoryRatioLimit;
        if (logger.isDebugEnabled() && !parallel) {
          logger.info("curFreeMemoryRatio: " + curFreeMemoryRatio
              + " is lesser than " + freeMemoryRatioLimit);
        }
      }
    }
    if (parallel) {
      executor.submit(new Runnable() {
        @Override
        public void run() {
          try {
            writeRspToFile(req, writer, core, newRsp);
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        }
      });
    } else {
      writeRspToFile(req, writer, core, newRsp);
    }
    return newRsp;
  }
  
  public static double getFreeMemoryRatio() {
    Runtime runtime = Runtime.getRuntime();
    long max = runtime.maxMemory();
    long current = runtime.totalMemory();
    long free = runtime.freeMemory();
    long available = max - current + free;
    return (double) (available) / (double) max;
  }
  
  public void writeRspToFile(PooledLocalSolrQueryRequest req, Writer writer,
      SolrCore core, SolrQueryResponse newRsp) throws IOException {
    QueryResponseWriter responseWriter = core.getQueryResponseWriter("csv");
    responseWriter.write(writer, req, newRsp);
    req.recycle();
  }
  
  public File getDestFile(SolrQueryRequest req, SolrParams params)
      throws SolrServerException {
    String fileName = params.get(PARAM_FILE_NAME);
    if (StringUtils.isBlank(fileName)) {
      throw new IllegalArgumentException("No fileName is set!");
    }
    
    if (!new File(dataFolder).isAbsolute()) {
      dataFolder = SolrResourceLoader.normalizeDir(req.getCore()
          .getCoreDescriptor().getCoreContainer().getSolrHome()
          + dataFolder);
    }
    if (!new File(dataFolder).exists()) {
      boolean created = new File(dataFolder).mkdir();
      if (!created) throw new SolrServerException("Unable to create folder: "
          + dataFolder);
    }
    
    File destFile = new File(dataFolder, fileName);
    if (destFile.exists()) {
      boolean overwrite = params.getBool(PARAM_OVERWRITE, false);
      if (overwrite) {
        boolean deleted = destFile.delete();
        if (!deleted) throw new RuntimeException("Failed to deleted old file: "
            + destFile);
      } else {
        throw new IllegalArgumentException("File: " + destFile
            + " already exists.");
      }
    }
    return destFile;
  } 
}
Post a Comment

Labels

Java (159) Lucene-Solr (110) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (37) Eclipse (33) Code Example (31) Linux (24) JavaScript (23) Spring (22) Windows (22) Web Development (20) Nutch2 (18) Tools (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) ANT (6) Coding Skills (6) Database (6) Scala (6) Shell (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) Lesson Learned (5) Programmer Skills (5) System Design (5) Tips (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts