Case Study
Case 1: Update request is first sent to a follower (can be any node)
I: The coordinator nodes receives the add request:
DistributedUpdateProcessor.processAdd(AddUpdateCommand)
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
DISTRIB_FROM_COLLECTION is null
phrase is None
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();
In which shard this doc should store
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
Whether I am the leader that should store the doc: false
2. Forward to the leader that should store the doc
// I need to forward onto the leader...
nodes = new ArrayList<>(1);
3. DistributedUpdateProcessor.processAdd(AddUpdateCommand)
params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString())); ==> TOLEADER
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
II: The leader receives the request:
org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
=TOLEADER
skipToDistrib true
// skip anything that doesn't have the marker interface - UpdateRequestProcessorFactory.RunAlways
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
DistribPhase phase = TOLEADER
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
null
if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
}
nodes = follower nodes
2. The leader adds the doc locally first
boolean dropCmd = false;
if (!forwardToLeader) { // forwardToLeader false
dropCmd = versionAdd(cmd); // usually return false
}
doLocalAdd(cmd);
----
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
super.processAdd(cmd);
}
if (willDistrib) { // true
cmd.solrDoc = clonedDoc;
}
3. The leader forwards the add request to its followers
update.distrib=FROMLEADER&distrib.from=http://192.168.1.12:8983/solr/pie_search_shard1_replica2/
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
if (replicationTracker != null && minRf > 1)
params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
III: Followers receives the request:
1. org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
FROMLEADER
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); //FROMLEADER
final boolean skipToDistrib = distribPhase != null; // true
2.
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
if (req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
// locally we think we are leader but the request says it came FROMLEADER
// that could indicate a problem, let the full logic below figure it out
} else {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
}
}
return empty nodes
if (!forwardToLeader) {
dropCmd = versionAdd(cmd);
}
Case 2: The update request is sent to leader which should store this doc
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
DistribPhase phase: none
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
true
if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
}
nodes = followers
It will forward the request to its followers with params:
update.distrib=FROMLEADER&distrib.from=the_leader_node_url
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (!forwardToLeader) { // false
dropCmd = versionAdd(cmd);
}
It will add to its local at this stage
// It doesn't forward this request to itself again, so no stage update.distrib=TOLEADER
Case 3: The add request is sent to a leader which should not own this doc
Case 4: The add request is sent to a leader which should not own this doc
The coordinator node will forward the add request to the leader of the shard that should store the request
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();
decide which shard this doc belongs to
return nodes - the leader that should store the doc
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
update.distrib=TOLEADER&distrib.from=this_node_url
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
Case 5: Send multiple docs in one command to a follower
XMLLoader.processUpdate(SolrQueryRequest, UpdateRequestProcessor, XMLStreamReader)
while (true) {
if ("doc".equals(currTag)) {
if(addCmd != null) {
log.trace("adding doc...");
addCmd.clear();
addCmd.solrDoc = readDoc(parser);
processor.processAdd(addCmd);
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected tag without an tag surrounding it.");
}
}
}
It calls processAdd for each doc.
Related Code
UpdateRequestProcessorFactory.RunAlways
UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
UpdateRequestProcessorChain.init(PluginInfo)
if the chain includes the RunUpdateProcessorFactory, but does not include an implementation of the DistributingUpdateProcessorFactory interface, then an instance of DistributedUpdateProcessorFactory will be injected immediately prior to the RunUpdateProcessorFactory.
if (0 <= runIndex && 0 == numDistrib) {
// by default, add distrib processor immediately before run
DistributedUpdateProcessorFactory distrib
= new DistributedUpdateProcessorFactory();
distrib.init(new NamedList());
list.add(runIndex, distrib);
}
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))
boolean isOnCoordinateNode = (phase == null || phase == DistribPhase.NONE);
Case 1: Update request is first sent to a follower (can be any node)
I: The coordinator nodes receives the add request:
DistributedUpdateProcessor.processAdd(AddUpdateCommand)
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
DISTRIB_FROM_COLLECTION is null
phrase is None
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();
In which shard this doc should store
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
Whether I am the leader that should store the doc: false
2. Forward to the leader that should store the doc
// I need to forward onto the leader...
nodes = new ArrayList<>(1);
3. DistributedUpdateProcessor.processAdd(AddUpdateCommand)
params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString())); ==> TOLEADER
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
II: The leader receives the request:
org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
=TOLEADER
skipToDistrib true
// skip anything that doesn't have the marker interface - UpdateRequestProcessorFactory.RunAlways
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
DistribPhase phase = TOLEADER
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
null
if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
}
nodes = follower nodes
2. The leader adds the doc locally first
boolean dropCmd = false;
if (!forwardToLeader) { // forwardToLeader false
dropCmd = versionAdd(cmd); // usually return false
}
doLocalAdd(cmd);
----
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
super.processAdd(cmd);
}
if (willDistrib) { // true
cmd.solrDoc = clonedDoc;
}
3. The leader forwards the add request to its followers
update.distrib=FROMLEADER&distrib.from=http://192.168.1.12:8983/solr/pie_search_shard1_replica2/
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
if (replicationTracker != null && minRf > 1)
params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
III: Followers receives the request:
1. org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
FROMLEADER
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); //FROMLEADER
final boolean skipToDistrib = distribPhase != null; // true
2.
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
if (req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
// locally we think we are leader but the request says it came FROMLEADER
// that could indicate a problem, let the full logic below figure it out
} else {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
}
}
return empty nodes
if (!forwardToLeader) {
dropCmd = versionAdd(cmd);
}
Case 2: The update request is sent to leader which should store this doc
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
DistribPhase phase: none
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
true
if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
}
nodes = followers
It will forward the request to its followers with params:
update.distrib=FROMLEADER&distrib.from=the_leader_node_url
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (!forwardToLeader) { // false
dropCmd = versionAdd(cmd);
}
It will add to its local at this stage
// It doesn't forward this request to itself again, so no stage update.distrib=TOLEADER
Case 3: The add request is sent to a leader which should not own this doc
Case 4: The add request is sent to a leader which should not own this doc
The coordinator node will forward the add request to the leader of the shard that should store the request
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();
decide which shard this doc belongs to
return nodes - the leader that should store the doc
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
update.distrib=TOLEADER&distrib.from=this_node_url
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
Case 5: Send multiple docs in one command to a follower
XMLLoader.processUpdate(SolrQueryRequest, UpdateRequestProcessor, XMLStreamReader)
while (true) {
if ("doc".equals(currTag)) {
if(addCmd != null) {
log.trace("adding doc...");
addCmd.clear();
addCmd.solrDoc = readDoc(parser);
processor.processAdd(addCmd);
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected
}
}
}
It calls processAdd for each doc.
Related Code
UpdateRequestProcessorFactory.RunAlways
UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
UpdateRequestProcessorChain.init(PluginInfo)
if the chain includes the RunUpdateProcessorFactory, but does not include an implementation of the DistributingUpdateProcessorFactory interface, then an instance of DistributedUpdateProcessorFactory will be injected immediately prior to the RunUpdateProcessorFactory.
if (0 <= runIndex && 0 == numDistrib) {
// by default, add distrib processor immediately before run
DistributedUpdateProcessorFactory distrib
= new DistributedUpdateProcessorFactory();
distrib.init(new NamedList());
list.add(runIndex, distrib);
}
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))
boolean isOnCoordinateNode = (phase == null || phase == DistribPhase.NONE);