How DistributedUpdateProcessor Works - Learning Solr Code

Case Study
Case 1: Update request is first sent to a follower (can be any node)
I: The coordinator nodes receives the add request:
DistributedUpdateProcessor.processAdd(AddUpdateCommand)

DistribPhase phase =
    DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));

DISTRIB_FROM_COLLECTION is null
phrase is None

DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)

ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();

In which shard this doc should store

Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
    collection, shardId);
isLeader = leaderReplica.getName().equals(
    req.getCore().getCoreDescriptor().getCloudDescriptor()
        .getCoreNodeName());

Whether I am the leader that should store the doc: false

2. Forward to the leader that should store the doc
// I need to forward onto the leader...
nodes = new ArrayList<>(1);

3. DistributedUpdateProcessor.processAdd(AddUpdateCommand)
params.set(DISTRIB_UPDATE_PARAM,
           (isLeader || isSubShardLeader ?
            DistribPhase.FROMLEADER.toString() :
            DistribPhase.TOLEADER.toString())); ==> TOLEADER
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
    zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);

II: The leader receives the request:
org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
=TOLEADER
skipToDistrib true
// skip anything that doesn't have the marker interface - UpdateRequestProcessorFactory.RunAlways

org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
DistribPhase phase = TOLEADER
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
null
if (isLeader || isSubShardLeader) {
          // that means I want to forward onto my replicas...
          // so get the replicas...
          forwardToLeader = false;
}
nodes = follower nodes

2. The leader adds the doc locally first
boolean dropCmd = false;
if (!forwardToLeader) {    // forwardToLeader false
  dropCmd = versionAdd(cmd); // usually return false
}

doLocalAdd(cmd);
----
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
  super.processAdd(cmd);
}

if (willDistrib) { // true
  cmd.solrDoc = clonedDoc;
}

3. The leader forwards the add request to its followers
update.distrib=FROMLEADER&distrib.from=http://192.168.1.12:8983/solr/pie_search_shard1_replica2/
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
           (isLeader || isSubShardLeader ?
            DistribPhase.FROMLEADER.toString() :
            DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
    zkController.getBaseUrl(), req.getCore().getName()));

if (replicationTracker != null && minRf > 1)
  params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));

cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);

III: Followers receives the request:
1. org.apache.solr.update.processor.UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)
FROMLEADER
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); //FROMLEADER
final boolean skipToDistrib = distribPhase != null; // true

2.
org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
  if (req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
    // locally we think we are leader but the request says it came FROMLEADER
    // that could indicate a problem, let the full logic below figure it out
  } else {
    isLeader = false;     // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
    forwardToLeader = false;
    return nodes;
  }
}

return empty nodes
if (!forwardToLeader) {
  dropCmd = versionAdd(cmd);
}

Case 2: The update request is sent to leader which should store this doc
DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
DistribPhase phase: none

Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
    collection, shardId);
isLeader = leaderReplica.getName().equals(
    req.getCore().getCoreDescriptor().getCloudDescriptor()
        .getCoreNodeName());
true

if (isLeader || isSubShardLeader) {
          // that means I want to forward onto my replicas...
          // so get the replicas...
          forwardToLeader = false;
}
nodes = followers

It will forward the request to its followers with params:
update.distrib=FROMLEADER&distrib.from=the_leader_node_url

org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
if (!forwardToLeader) { // false
  dropCmd = versionAdd(cmd);
}
It will add to its local at this stage

// It doesn't forward this request to itself again, so no stage update.distrib=TOLEADER

Case 3: The add request is sent to a leader which should not own this doc
Case 4: The add request is sent to a leader which should not own this doc
The coordinator node will forward the add request to the leader of the shard that should store the request

DistributedUpdateProcessor.setupRequest(String, SolrInputDocument, String)
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
String shardId = slice.getName();

decide which shard this doc belongs to
return nodes - the leader that should store the doc

org.apache.solr.update.processor.DistributedUpdateProcessor.processAdd(AddUpdateCommand)
update.distrib=TOLEADER&distrib.from=this_node_url

cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);

Case 5: Send multiple docs in one command to a follower
XMLLoader.processUpdate(SolrQueryRequest, UpdateRequestProcessor, XMLStreamReader)

while (true) {
  if ("doc".equals(currTag)) {
    if(addCmd != null) {
      log.trace("adding doc...");
      addCmd.clear();
      addCmd.solrDoc = readDoc(parser);
      processor.processAdd(addCmd);
    } else {
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected tag without an tag surrounding it.");
    }
  }
}


It calls processAdd for each doc.

Related Code
UpdateRequestProcessorFactory.RunAlways

UpdateRequestProcessorChain.createProcessor(SolrQueryRequest, SolrQueryResponse)

UpdateRequestProcessorChain.init(PluginInfo)
if the chain includes the RunUpdateProcessorFactory, but does not include an implementation of the DistributingUpdateProcessorFactory interface, then an instance of DistributedUpdateProcessorFactory will be injected immediately prior to the RunUpdateProcessorFactory.
if (0 <= runIndex && 0 == numDistrib) {
  // by default, add distrib processor immediately before run
  DistributedUpdateProcessorFactory distrib
    = new DistributedUpdateProcessorFactory();
  distrib.init(new NamedList());
  list.add(runIndex, distrib);

}

DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))

boolean isOnCoordinateNode = (phase == null || phase == DistribPhase.NONE);

Labels

Java (159) Lucene-Solr (111) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (37) Eclipse (33) Code Example (31) Linux (24) JavaScript (23) Spring (22) Windows (22) Web Development (20) Nutch2 (18) Tools (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) ANT (6) Coding Skills (6) Database (6) Scala (6) Shell (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) Lesson Learned (5) Programmer Skills (5) System Design (5) Tips (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts