Test Code to create collections
MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(4 /*numServers*/, testBaseDir, solrXml, JettyConfig.builder().setContext("/solr").build());
cluster.createCollection(collectionName, 2/*numShards*/, 2/*replicationFactor*/, "cie-default", null);
MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(4 /*numServers*/, testBaseDir, solrXml, JettyConfig.builder().setContext("/solr").build());
cluster.createCollection(collectionName, 2/*numShards*/, 2/*replicationFactor*/, "cie-default", null);
CollectionsHandler
CollectionsHandler.handleRequestBody(SolrQueryRequest, SolrQueryResponse)
CollectionAction action = CollectionAction.get(a); // CollectionAction .CREATE(true)
CollectionOperation operation = CollectionOperation.get(action); //CollectionOperation .CREATE_OP(CREATE)
Map result = operation.call(req, rsp, this);
Return a mpa like this:
{name=collectionName, fromApi=true, replicationFactor=2, collection.configName=configName, numShards=2, stateFormat=2}
ZkNodeProps props = new ZkNodeProps(result);
if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), props, rsp, operation.timeOut);
CollectionsHandler. handleResponse
QueueEvent event = coreContainer.getZkController() .getOverseerCollectionQueue() .offer(Utils.toJSON(m), timeout);
This uses DistributedQueue.offer(byte[] data, long timeout) to add a task to /overseer/collection-queue-work/qnr-numbers.
It uses LatchWatcher to wait until this task is processed.
Overseer and OverseerCollectionProcessor
OverseerCollectionProcessor.processMessage(ZkNodeProps, String operation /*create*/)
OverseerCollectionProcessor.processMessage(ZkNodeProps, String)
{
"name":"collectionName",
"fromApi":"true",
"replicationFactor":"2",
"collection.configName":"configName",
"numShards":"2",
"stateFormat":"2",
"operation":"create"}
OverseerCollectionProcessor.createCollection(ClusterState, ZkNodeProps, NamedList)
ClusterStateMutator.getShardNames(numSlices, shardNames);
positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor); // round-robin if rule not set
createConfNode(configName, collectionName, isLegacyCloud);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// This message will be processed by ClusterStateUpdater
// wait for a while until we do see the collection in the clusterState
for (Map.Entry e : positionVsNodes.entrySet()) {
if (isLegacyCloud) {
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
} else {
coresToCreate.put(coreName, sreq);
}
}
This will send http call and be handled by CoreAdminHandler.handleRequestBody.
ClusterStateUpdater
// if there were any errors while processing
// the state queue, items would have been left in the
// work queue so let's process those first
byte[] data = workQueue.peek();
boolean hadWorkItems = data != null;
while (data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
ClusterStateUpdater.processQueueItem
zkWriteCommand = processMessage(clusterState, message, operation);
stats.success(operation);
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
processMessage
case CREATE:
return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
overseer.ClusterStateMutator.createCollection(ClusterState, ZkNodeProps)