Solr: Use Multiple Threads to Import Local stream Files

When import data to Solr, user can use several parameters: stream.file="path" to import multiple local files. But Solr's UpdateRequestHandler import them one by one:
org.apache.solr.handler.ContentStreamHandlerBase.handleRequestBody(SolrQueryRequest, SolrQueryResponse)
for (ContentStream stream : streams) {
  documentLoader.load(req, rsp, stream, processor);
}
So to speed up index, we can use multiple threads to imports files simultaneously.
Meanwhile, I want to extent UpdateRequestHandler to add parameter stream.folder, so it will import all files under on folder, also extend UpdateRequestHandler to add parameter stream.file.pattern, so it will import all files that match the pattern.

package org.codeexample.jeffery.solr;
public class ThreadedUpdateRequestHandler extends UpdateRequestHandler {

 private static String PARAM_THREAD_NUMBER = "threads";

 private static String PARAM_STREAM_FOLDER = "stream.folder";
 private static String PARAM_STREAM_FILE_PATTERN = "stream.file.pattern";

 private static final int DEFAULT_THREAD_NUMBER = 10;
 private static int DEFAULT_THREADS = DEFAULT_THREAD_NUMBER;

 @SuppressWarnings("rawtypes")
 @Override
 public void init(NamedList args) {
  super.init(args);
  if (args != null) {
   NamedList namedList = ((NamedList) args.get("defaults"));
   if (namedList != null) {
    Object obj = namedList.get(PARAM_THREAD_NUMBER);
    if (obj != null) {
     DEFAULT_THREADS = Integer.parseInt(obj.toString());
    }
   }
  }
 }

 @Override
 public void handleRequestBody(final SolrQueryRequest req,
   final SolrQueryResponse rsp) throws Exception {

  List<ContentStream> streams = new ArrayList<ContentStream>();

  handleReqStream(req, streams);
  // here, we handle the new two parameters: stream.folder and
  // strem.filepattern
  handleStreamFolders(req, streams);
  handleFilePatterns(req, streams);
  if (streams.size() < 2) {
   // No need to use threadpool.
   SolrQueryRequestBase reqBase = (SolrQueryRequestBase) req;
   if (!streams.isEmpty()) {
    String contentType = req.getParams().get(
      CommonParams.STREAM_CONTENTTYPE);
    ContentStream stream = streams.get(0);
    if (stream instanceof ContentStreamBase) {
     ((ContentStreamBase) stream).setContentType(contentType);

    }
   }
   reqBase.setContentStreams(streams);
   super.handleRequestBody(req, rsp);
  } else {
   importStreamsMultiThreaded(req, rsp, streams);
  }
 }

 private void handleReqStream(final SolrQueryRequest req,
   List<ContentStream> streams) {
  Iterable<ContentStream> iterabler = req.getContentStreams();
  if (iterabler != null) {
   Iterator<ContentStream> iterator = iterabler.iterator();
   while (iterator.hasNext()) {
    streams.add(iterator.next());
    iterator.remove();
   }
  }
 }

 private ExecutorService importStreamsMultiThreaded(
   final SolrQueryRequest req, final SolrQueryResponse rsp,
   List<ContentStream> streams) throws InterruptedException,
   IOException {
  ExecutorService executor = null;
  SolrParams params = req.getParams();

  final UpdateRequestProcessorChain processorChain = req
    .getCore()
    .getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));

  UpdateRequestProcessor processor = processorChain.createProcessor(req,
    rsp);
  try {
   Map<String, Object> resultMap = new LinkedHashMap<String, Object>();

   resultMap.put("start_time", new Date());
   List<Map<String, Object>> details = new ArrayList<Map<String, Object>>();

   try {

    int threads = determineThreadsNumber(params, streams.size());
    ThreadFactory threadFactory = new ThreadFactory() {
     public Thread newThread(Runnable r) {
      return new Thread(r, "threadedReqeustHandler-"
        + new Date());
     }
    };
    executor = Executors.newFixedThreadPool(threads, threadFactory);
    String contentType = params
      .get(CommonParams.STREAM_CONTENTTYPE);

    Iterator<ContentStream> iterator = streams.iterator();
    while (iterator.hasNext()) {
     ContentStream stream = iterator.next();
     iterator.remove();
     if (stream instanceof ContentStreamBase) {
      ((ContentStreamBase) stream)
        .setContentType(contentType);

     }
     submitTask(req, rsp, processorChain, executor, stream,
       details);
    }

    executor.shutdown();

    boolean terminated = executor.awaitTermination(Long.MAX_VALUE,
      TimeUnit.SECONDS);
    if (!terminated) {
     throw new RuntimeException("Request takes too much time");
    }
    // Perhaps commit from the parameters
    RequestHandlerUtils.handleCommit(req, processor, params, false);
    RequestHandlerUtils.handleRollback(req, processor, params,
      false);
   } finally {
    resultMap.put("end_time", new Date());

    // check whether there is error in details
    for (Map<String, Object> map : details) {
     Exception ex = (Exception) map.get("exception");
     if (ex != null) {
      rsp.setException(ex);
      if (ex instanceof SolrException) {
       rsp.add("status", ((SolrException) ex).code());
      } else {
       rsp.add("status",
         SolrException.ErrorCode.BAD_REQUEST);
      }
      break;
     }
    }
   }
   resultMap.put("details", details);
   rsp.add("result", resultMap);
   return executor;
  } finally {
   if (executor != null && !executor.isShutdown()) {
    executor.shutdownNow();
   }
   // finish the request
   processor.finish();
  }
 }

 private int determineThreadsNumber(SolrParams params, int streamSize) {
  int threads = DEFAULT_THREADS;
  String str = params.get(PARAM_THREAD_NUMBER);
  if (str != null) {
   threads = Integer.parseInt(str);
  }

  if (streamSize < threads) {
   threads = streamSize;
  }
  return threads;
 }

 private void handleFilePatterns(final SolrQueryRequest req,
   List<ContentStream> streams) {
  String[] strs = req.getParams().getParams(PARAM_STREAM_FILE_PATTERN);
  if (strs != null) {
   for (String filePattern : strs) {
    // it may point to a file
    File file = new File(filePattern);
    if (file.isFile()) {
     streams.add(new ContentStreamBase.FileStream(file));
    } else {
     // only supports tail regular expression, such as
     // c:\foldera\c*.csv
     int lastIndex = filePattern.lastIndexOf(File.separator);
     if (lastIndex > -1) {
      File folder = new File(filePattern.substring(0,
        lastIndex));

      if (!folder.exists()) {
       throw new RuntimeException("Folder " + folder
         + " doesn't exists.");
      }

      String pattern = filePattern.substring(lastIndex + 1);
      pattern = convertPattern(pattern);
      final Pattern p = Pattern.compile(pattern);

      File[] files = folder.listFiles(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
        Matcher matcher = p.matcher(name);
        return matcher.matches();
       }
      });

      if (files != null) {
       for (File tmp : files) {
        streams.add(new ContentStreamBase.FileStream(
          tmp));
       }
      }
     }
    }
   }
  }
 }

 private void handleStreamFolders(final SolrQueryRequest req,
   List<ContentStream> streams) {
  String[] strs = req.getParams().getParams(PARAM_STREAM_FOLDER);
  if (strs != null) {
   for (String folderStr : strs) {

    File folder = new File(folderStr);

    File[] files = folder.listFiles();

    if (files != null) {
     for (File file : files) {
      streams.add(new ContentStreamBase.FileStream(file));
     }
    }
   }
  }
 }

 /**
  * replace * to .*, replace . to \.
  */
 private String convertPattern(String pattern) {
  pattern = pattern.replaceAll("\\.", "\\\\.");
  pattern = pattern.replaceAll("\\*", ".*");
  return pattern;
 }

 private void submitTask(final SolrQueryRequest req,
   final SolrQueryResponse rsp,
   final UpdateRequestProcessorChain processorChain,
   ExecutorService executor, final ContentStream stream,
   final List<Map<String, Object>> rspResult) {
  Thread thread = new Thread() {
   public void run() {
    Map<String, Object> map = new LinkedHashMap<String, Object>();
    map.put("start_time", new Date().toString());

    if (stream instanceof ContentStreamBase.FileStream) {
     map.put("Import File: ",
       ((ContentStreamBase.FileStream) stream).getName());
    }
    try {
     UpdateRequestProcessor processor = null;
     try {
      processor = processorChain.createProcessor(req, rsp);

      ContentStreamLoader documentLoader = newLoader(req,
        processor);

      documentLoader.load(req, rsp, stream, processor);
      System.err.println(rsp);

     } finally {
      if (processor != null) {
       // finish the request
       processor.finish();
      }
     }
    } catch (Exception e) {
     rsp.setException(e);
    } finally {
     map.put("end_time", new Date().toString());
     if (rsp.getException() != null) {
      map.put("exception", rsp.getException());
     }
     rspResult.add(map);
    }

   };
  };

  executor.execute(thread);
 }
}
You can view the complete source code here:
https://github.com/jefferyyuan/solr.misc
Post a Comment

Labels

Java (159) Lucene-Solr (110) All (60) Interview (59) J2SE (53) Algorithm (37) Eclipse (35) Soft Skills (35) Code Example (31) Linux (26) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (15) Defects (14) Text Mining (14) J2EE (13) Network (13) PowerShell (11) Chrome (9) Continuous Integration (9) How to (9) Learning code (9) Performance (9) UIMA (9) html (9) Design (8) Dynamic Languages (8) Http Client (8) Maven (8) Security (8) Trouble Shooting (8) bat (8) blogger (8) Big Data (7) Google (7) Guava (7) JSON (7) Problem Solving (7) ANT (6) Coding Skills (6) Database (6) Scala (6) Shell (6) css (6) Algorithm Series (5) Cache (5) IDE (5) Lesson Learned (5) Miscs (5) Programmer Skills (5) System Design (5) Tips (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) OpenNLP (4) Project Managment (4) Python (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Firefox (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Google Drive (2) Gson (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Bit Operation (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Troubleshooting (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts