Solr supports distributed search, its syntax is like: http://localhost:8080/solr/select?shards=localhost:8080/solr,localhost:9090/solr&indent=true&q=nexus7.
It doesn't support to update/upload files to multiple cores, but it is easy to support:
We can add one parameter shards to specify url of multiple cores, add one parameter shardn.parameter_name=parameter_value to specify the parameter which will be sent to shardn, parameters that not starts with shardn will be sent to all cores.
Example: Upload all csv files in folder1 to core1, all csv files in folder2 to core2:
http://localhost:8080/solr/cores?shards=http://localhost:8080/solr/collection1/,http://localhost:8080/solr/collection2/&url=/import/csv&shard0.stream.folder=foler1_path&shard1.stream.folder=folder2path&stream.contentType=text/csv;charset=utf-8
Please refer here about how to use multiple threads to upload multiple local streams files, and support stream.folder and stream.file.pattern.
Commit to core1 and core2 in one request:
http://localhost:8080/solr/cores?shards=http://localhost:8080/solr/collection1/,http://localhost:8080/solr/collection2/,&url=/update&commit=true"
Now we can update multiple cores in one request, and it's easy to write our script.
The code is like below. You can also view the complete source code here: https://github.com/jefferyyuan/solr.misc
public class MultiCoreUpdateRequestHandler extends UpdateRequestHandler {
private static String PARAM_SHARDS = "shards";
@Override
public void handleRequestBody(final SolrQueryRequest req,
final SolrQueryResponse rsp) throws Exception {
try {
SolrParams params = req.getParams();
String shardsStr = params.get(PARAM_SHARDS);
if (shardsStr == null) {
throw new RuntimeException("No shards paramter found.");
}
List<String> shards = StrUtils.splitSmart(shardsStr, ',');
List<ModifiableSolrParams> shardParams = new ArrayList<ModifiableSolrParams>();
for (int i = 0; i < shards.size(); i++) {
shardParams.add(new ModifiableSolrParams());
}
Iterator<String> iterator = params.getParameterNamesIterator();
String shardParamPrefix = "shard";
while (iterator.hasNext()) {
String paramName = iterator.next();
if (paramName.equals(PARAM_SHARDS)) continue;
if (paramName.startsWith(shardParamPrefix)) {
int index = paramName.indexOf(".");
if (index < 0) continue;
String numStr = paramName.substring(shardParamPrefix.length(), index);
try {
int shardNumber = Integer.parseInt(numStr);
String shardParam = paramName.substring(index + 1);
shardParams.get(shardNumber).add(shardParam, params.get(paramName));
} catch (Exception e) {
// do nothing
}
} else {
// add common parameters
for (ModifiableSolrParams tmp : shardParams) {
tmp.add(paramName, params.get(paramName));
}
}
}
handleShards(shards, shardParams, rsp);
} finally {}
}
private void handleShards(final List<String> shards,
final List<ModifiableSolrParams> shardParams, final SolrQueryResponse rsp)
throws InterruptedException {
ExecutorService executor = null;
executor = Executors.newFixedThreadPool(shards.size());
for (int i = 0; i < shards.size(); i++) {
final int index = i;
executor.submit(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
Map<String,Object> resultMap = new LinkedHashMap<String,Object>();
try {
SolrServer solr = new HttpSolrServer(shards.get(index));
ModifiableSolrParams params = shardParams.get(index);
UpdateRequest request = new UpdateRequest(params.get("url"));
resultMap.put("params", params.toNamedList());
request.setParams(params);
UpdateResponse response = request.process(solr);
NamedList<Object> header = response.getResponseHeader();
resultMap.put("responseHeader", header);
System.err.println(response);
} catch (Exception e) {
NamedList<Object> error = new NamedList<Object>();
error.add("msg", e.getMessage());
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
error.add("trace", sw.toString());
resultMap.put("error", error);
throw new RuntimeException(e);
} finally {
rsp.add("shard" + index, resultMap);
}
}
});
}
executor.shutdown();
boolean terminated = executor.awaitTermination(Long.MAX_VALUE,
TimeUnit.SECONDS);
if (!terminated) {
throw new RuntimeException("Request takes too much time");
}
}
}