Solr: Use Multiple Threads to Import Local stream Files


When import data to Solr, user can use several parameters: stream.file="path" to import multiple local files. But Solr's UpdateRequestHandler import them one by one:
org.apache.solr.handler.ContentStreamHandlerBase.handleRequestBody(SolrQueryRequest, SolrQueryResponse)
for (ContentStream stream : streams) {
  documentLoader.load(req, rsp, stream, processor);
}
So to speed up index, we can use multiple threads to imports files simultaneously.
Meanwhile, I want to extent UpdateRequestHandler to add parameter stream.folder, so it will import all files under on folder, also extend UpdateRequestHandler to add parameter stream.file.pattern, so it will import all files that match the pattern.

package org.codeexample.jeffery.solr;
public class ThreadedUpdateRequestHandler extends UpdateRequestHandler {

 private static String PARAM_THREAD_NUMBER = "threads";

 private static String PARAM_STREAM_FOLDER = "stream.folder";
 private static String PARAM_STREAM_FILE_PATTERN = "stream.file.pattern";

 private static final int DEFAULT_THREAD_NUMBER = 10;
 private static int DEFAULT_THREADS = DEFAULT_THREAD_NUMBER;

 @SuppressWarnings("rawtypes")
 @Override
 public void init(NamedList args) {
  super.init(args);
  if (args != null) {
   NamedList namedList = ((NamedList) args.get("defaults"));
   if (namedList != null) {
    Object obj = namedList.get(PARAM_THREAD_NUMBER);
    if (obj != null) {
     DEFAULT_THREADS = Integer.parseInt(obj.toString());
    }
   }
  }
 }

 @Override
 public void handleRequestBody(final SolrQueryRequest req,
   final SolrQueryResponse rsp) throws Exception {

  List<ContentStream> streams = new ArrayList<ContentStream>();

  handleReqStream(req, streams);
  // here, we handle the new two parameters: stream.folder and
  // strem.filepattern
  handleStreamFolders(req, streams);
  handleFilePatterns(req, streams);
  if (streams.size() < 2) {
   // No need to use threadpool.
   SolrQueryRequestBase reqBase = (SolrQueryRequestBase) req;
   if (!streams.isEmpty()) {
    String contentType = req.getParams().get(
      CommonParams.STREAM_CONTENTTYPE);
    ContentStream stream = streams.get(0);
    if (stream instanceof ContentStreamBase) {
     ((ContentStreamBase) stream).setContentType(contentType);

    }
   }
   reqBase.setContentStreams(streams);
   super.handleRequestBody(req, rsp);
  } else {
   importStreamsMultiThreaded(req, rsp, streams);
  }
 }

 private void handleReqStream(final SolrQueryRequest req,
   List<ContentStream> streams) {
  Iterable<ContentStream> iterabler = req.getContentStreams();
  if (iterabler != null) {
   Iterator<ContentStream> iterator = iterabler.iterator();
   while (iterator.hasNext()) {
    streams.add(iterator.next());
    iterator.remove();
   }
  }
 }

 private ExecutorService importStreamsMultiThreaded(
   final SolrQueryRequest req, final SolrQueryResponse rsp,
   List<ContentStream> streams) throws InterruptedException,
   IOException {
  ExecutorService executor = null;
  SolrParams params = req.getParams();

  final UpdateRequestProcessorChain processorChain = req
    .getCore()
    .getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));

  UpdateRequestProcessor processor = processorChain.createProcessor(req,
    rsp);
  try {
   Map<String, Object> resultMap = new LinkedHashMap<String, Object>();

   resultMap.put("start_time", new Date());
   List<Map<String, Object>> details = new ArrayList<Map<String, Object>>();

   try {

    int threads = determineThreadsNumber(params, streams.size());
    ThreadFactory threadFactory = new ThreadFactory() {
     public Thread newThread(Runnable r) {
      return new Thread(r, "threadedReqeustHandler-"
        + new Date());
     }
    };
    executor = Executors.newFixedThreadPool(threads, threadFactory);
    String contentType = params
      .get(CommonParams.STREAM_CONTENTTYPE);

    Iterator<ContentStream> iterator = streams.iterator();
    while (iterator.hasNext()) {
     ContentStream stream = iterator.next();
     iterator.remove();
     if (stream instanceof ContentStreamBase) {
      ((ContentStreamBase) stream)
        .setContentType(contentType);

     }
     submitTask(req, rsp, processorChain, executor, stream,
       details);
    }

    executor.shutdown();

    boolean terminated = executor.awaitTermination(Long.MAX_VALUE,
      TimeUnit.SECONDS);
    if (!terminated) {
     throw new RuntimeException("Request takes too much time");
    }
    // Perhaps commit from the parameters
    RequestHandlerUtils.handleCommit(req, processor, params, false);
    RequestHandlerUtils.handleRollback(req, processor, params,
      false);
   } finally {
    resultMap.put("end_time", new Date());

    // check whether there is error in details
    for (Map<String, Object> map : details) {
     Exception ex = (Exception) map.get("exception");
     if (ex != null) {
      rsp.setException(ex);
      if (ex instanceof SolrException) {
       rsp.add("status", ((SolrException) ex).code());
      } else {
       rsp.add("status",
         SolrException.ErrorCode.BAD_REQUEST);
      }
      break;
     }
    }
   }
   resultMap.put("details", details);
   rsp.add("result", resultMap);
   return executor;
  } finally {
   if (executor != null && !executor.isShutdown()) {
    executor.shutdownNow();
   }
   // finish the request
   processor.finish();
  }
 }

 private int determineThreadsNumber(SolrParams params, int streamSize) {
  int threads = DEFAULT_THREADS;
  String str = params.get(PARAM_THREAD_NUMBER);
  if (str != null) {
   threads = Integer.parseInt(str);
  }

  if (streamSize < threads) {
   threads = streamSize;
  }
  return threads;
 }

 private void handleFilePatterns(final SolrQueryRequest req,
   List<ContentStream> streams) {
  String[] strs = req.getParams().getParams(PARAM_STREAM_FILE_PATTERN);
  if (strs != null) {
   for (String filePattern : strs) {
    // it may point to a file
    File file = new File(filePattern);
    if (file.isFile()) {
     streams.add(new ContentStreamBase.FileStream(file));
    } else {
     // only supports tail regular expression, such as
     // c:\foldera\c*.csv
     int lastIndex = filePattern.lastIndexOf(File.separator);
     if (lastIndex > -1) {
      File folder = new File(filePattern.substring(0,
        lastIndex));

      if (!folder.exists()) {
       throw new RuntimeException("Folder " + folder
         + " doesn't exists.");
      }

      String pattern = filePattern.substring(lastIndex + 1);
      pattern = convertPattern(pattern);
      final Pattern p = Pattern.compile(pattern);

      File[] files = folder.listFiles(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
        Matcher matcher = p.matcher(name);
        return matcher.matches();
       }
      });

      if (files != null) {
       for (File tmp : files) {
        streams.add(new ContentStreamBase.FileStream(
          tmp));
       }
      }
     }
    }
   }
  }
 }

 private void handleStreamFolders(final SolrQueryRequest req,
   List<ContentStream> streams) {
  String[] strs = req.getParams().getParams(PARAM_STREAM_FOLDER);
  if (strs != null) {
   for (String folderStr : strs) {

    File folder = new File(folderStr);

    File[] files = folder.listFiles();

    if (files != null) {
     for (File file : files) {
      streams.add(new ContentStreamBase.FileStream(file));
     }
    }
   }
  }
 }

 /**
  * replace * to .*, replace . to \.
  */
 private String convertPattern(String pattern) {
  pattern = pattern.replaceAll("\\.", "\\\\.");
  pattern = pattern.replaceAll("\\*", ".*");
  return pattern;
 }

 private void submitTask(final SolrQueryRequest req,
   final SolrQueryResponse rsp,
   final UpdateRequestProcessorChain processorChain,
   ExecutorService executor, final ContentStream stream,
   final List<Map<String, Object>> rspResult) {
  Thread thread = new Thread() {
   public void run() {
    Map<String, Object> map = new LinkedHashMap<String, Object>();
    map.put("start_time", new Date().toString());

    if (stream instanceof ContentStreamBase.FileStream) {
     map.put("Import File: ",
       ((ContentStreamBase.FileStream) stream).getName());
    }
    try {
     UpdateRequestProcessor processor = null;
     try {
      processor = processorChain.createProcessor(req, rsp);

      ContentStreamLoader documentLoader = newLoader(req,
        processor);

      documentLoader.load(req, rsp, stream, processor);
      System.err.println(rsp);

     } finally {
      if (processor != null) {
       // finish the request
       processor.finish();
      }
     }
    } catch (Exception e) {
     rsp.setException(e);
    } finally {
     map.put("end_time", new Date().toString());
     if (rsp.getException() != null) {
      map.put("exception", rsp.getException());
     }
     rspResult.add(map);
    }

   };
  };

  executor.execute(thread);
 }
}
You can view the complete source code here:
https://github.com/jefferyyuan/solr.misc

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)