Solr: Extend StatsComponent to Support stats.query, stats.facet and facet.topn


Statistics is very useful and important to enterprise applications, but function of Solr StatsComponent is quite limited yet.
We can easily extend it to support stats.query or stats.ranges just like Solr facet components.
stats.query and stats.ranges
The idea is that to build a query array from parameter: if the parameter is: stats=true&stats.field=size&f.size.stats.ranges=0, 1024, 10240, the query array would be size:[0, 1024}, [1024, 10240}.

If the parameter is stats=true&stats.field=size&f.size.stats.query=size:[0, 1024}&f.size.stats.query=[1024, 10240}, then the query array would be size:[0, 1024}, [1024, 10240}.

Then iterate each query, get the docSet of the query, and get the intersection between it and the baseDoc., then run getStats on the intersection docSet.
QParser qparser = QParser.getParser(tmpQuery, "lucene",req);
SolrQueryParser parser = new SolrQueryParser(qparser,req.getSchema().getDefaultSearchFieldName());
Query query = parser.parse(tmpQuery);

DocSet docSet = searcher.getDocSet(query);
DocSet interSection = docs.intersection(docSet);
NamedList<Object> stv = (NamedList<Object>) uif.getStats(searcher, interSection, facets).getStatsValues();
Label stats.query
We can also specify a label for the query, for example: f.size.stats.query={!label= "Very Old"}mtm:[ 1980-01-01xxT12:00:00Z TO 2000-01-01T12:00:00Z]
Response would be like below:
<lst name="stats_fields">
            <lst name="size">
                        <lst name="Very Old"/>…
                        <lst name="Old"/>…
                        <lst name="Recent"/>…
            </lst>
</lst>
stats.facet.facet_field.topn and stats.facet.facet_field.sortfield
Another thing that we can improve is: when we specify stats.facet=file_type, there may be many file types, we just want to know the stats for the top 2 file types that takes most disk space.

To do this, we can support parameter stats.facet.facet_field.topn=N, also add parameter stats.facet.facet_field.sortfield=sortfield. sortfield can be sum, count, max, etc.
The implementation is simple, just get the topn element from the returned result:
Please refer how to implement it.

An example solr stats request:
http://localhost:8080/solr/select?q=file_type:pdf OR file_type:ppt OR file_type:txt OR file_type:html&rows=0&stats=true&stats.field=size&f.size.stats.query={!label= "Very Old"}mtm:[ 1980-01-01xxT12:00:00Z TO 2000-01-01T12:00:00Z]&f.size.stats.query={!label="Old"}mtms:[2000-01-01T12:00:00Z TO 2010-01-01T12:00:00Z]&f.size.stats.query={!label="Recent"}mtm:[2010-01-01T12:00:00Z TO 2013-01-01T12:00:00Z]&stats.facet=file_type&f.size.stats.facet.file_type.topn=2&f.size.stats.facet.file_type.topn=sum&stats.facet=user_name
Implementation
The code is like below: you can review the complete code at Github.

class SimpleStats {
 public NamedList<Object> getStatsFields() throws IOException {
    NamedList<Object> res = new SimpleOrderedMap<Object>();
    String[] statsFs = params.getParams(StatsParams.STATS_FIELD);
    boolean isShard = params.getBool(ShardParams.IS_SHARD, false);
    if (null != statsFs) {
    for (String fieldName : statsFs) {
     String[] facets = params.getFieldParams(fieldName,
       StatsParams.STATS_FACET);
     if (facets == null) {
      facets = new String[0]; // make sure it is something...
     }

     SchemaField sf = searcher.getSchema().getField(fieldName);
     FieldType ft = sf.getType();

     List<String> queries = new ArrayList<String>(), labeList = new ArrayList<String>();
     handleQueryParams(fieldName, queries, labeList);

     NamedList<Object> facetResult = new NamedList<Object>();
     String[] facetSortFields = new String[facets.length];
     Integer[] facetTopns = new Integer[facets.length];
     parseSortFieldsAndTopn(fieldName, facets, facetSortFields,
       facetTopns);

     // Currently, only UnInvertedField can deal with multi-part trie
     // fields
     String prefix = TrieField.getMainValuePrefix(ft);
     if (sf.multiValued() || ft.multiValuedFieldCache()
       || prefix != null || !queries.isEmpty()) {
      UnInvertedField uif = UnInvertedField.getUnInvertedField(
        fieldName, searcher);
      if (queries.isEmpty()) {
       facetResult = doGetStatsField(uif, docs, null, isShard,
         fieldName, facets, facetSortFields, facetTopns);

       res.add(facetResult.getName(0), facetResult.getVal(0));
      } else {
       handleMultipleQueries(uif, fieldName, queries,
         labeList, facets, facetSortFields, facetTopns,
         isShard, facetResult);
       res.add(fieldName, facetResult);
      }
     } else {
      NamedList<Object> stv = (NamedList<Object>) getFieldCacheStats(
        fieldName, facets);
      handleStatsTopns(facets, facetSortFields, facetTopns, stv);

      if (isShard || (Long) stv.get("count") > 0) {
       res.add(fieldName, stv);
      } else {
       res.add(fieldName, null);
      }
     }
    }
   }
    return res;
  }
  
  // why does this use a top-level field cache?
  public NamedList<?> getFieldCacheStats(String fieldName, String[] facet ) {
    SchemaField sf = searcher.getSchema().getField(fieldName);
    
    FieldCache.DocTermsIndex si;
    try {
      si = FieldCache.DEFAULT.getTermsIndex(searcher.getAtomicReader(), fieldName);
    } 
    catch (IOException e) {
      throw new RuntimeException( "failed to open field cache for: "+fieldName, e );
    }
    StatsValues allstats = StatsValuesFactory.createStatsValues(sf);
    final int nTerms = si.numOrd();
    if ( nTerms <= 0 || docs.size() <= 0 ) return allstats.getStatsValues();

    // don't worry about faceting if no documents match...
    List<FieldFacetStats> facetStats = new ArrayList<FieldFacetStats>();
    FieldCache.DocTermsIndex facetTermsIndex;
    for( String facetField : facet ) {
      SchemaField fsf = searcher.getSchema().getField(facetField);

      if ( fsf.multiValued()) {
        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
          "Stats can only facet on single-valued fields, not: " + facetField );
      }

      try {
        facetTermsIndex = FieldCache.DEFAULT.getTermsIndex(searcher.getAtomicReader(), facetField);
      }
      catch (IOException e) {
        throw new RuntimeException( "failed to open field cache for: "
          + facetField, e );
      }
      facetStats.add(new FieldFacetStats(facetField, facetTermsIndex, sf, fsf, nTerms));
    }
    
    final BytesRef tempBR = new BytesRef();
    DocIterator iter = docs.iterator();
    while (iter.hasNext()) {
      int docID = iter.nextDoc();
      BytesRef raw = si.lookup(si.getOrd(docID), tempBR);
      if( raw.length > 0 ) {
        allstats.accumulate(raw);
      } else {
        allstats.missing();
      }

      // now update the facets
      for (FieldFacetStats f : facetStats) {
        f.facet(docID, raw);
      }
    }

    for (FieldFacetStats f : facetStats) {
      allstats.addFacet(f.name, f.facetStatsValues);
    }
    return allstats.getStatsValues();
  }

 private void parseSortFieldsAndTopn(String fieldName, String[] facets,
   String[] facetSortFields, Integer[] facetTopns) {
  for (int i = 0; i < facets.length; i++) {
   String facet = facets[i];
   String facetSortField = params.getFieldParam(fieldName,
     "stats.facet." + facet + ".sortfield");
   if (facetSortField != null && !facetSortField.trim().equals("")) {
    facetSortFields[i] = facetSortField;
   }
   String facetTopn = params.getFieldParam(fieldName, "stats.facet."
     + facet + ".topn");
   if (facetTopn != null) {
    facetTopns[i] = Integer.valueOf(facetTopn);
    if (facetSortField == null || facetSortField.trim().equals("")) {
     facetSortFields[i] = "sum";
    }
   }
  }
 }

 private void handleMultipleQueries(final UnInvertedField uif,
   final String filedName, final List<String> queries,
   List<String> labeList, final String[] facets,
   final String[] facetSortFields, final Integer[] facetTopns,
   final boolean isShard, NamedList<Object> facetResult) {
  final List<NamedList<Object>> tmpResult = new ArrayList<NamedList<Object>>();
  for (int i = 0; i < queries.size(); i++) {
   tmpResult.add(null);
  }
  List<Thread> threads = new ArrayList<Thread>();
  for (int i = 0; i < queries.size(); i++) {
   final int index = i;
   Thread thread = new Thread() {
    @Override
    public void run() {
     NamedList<Object> tmpFacetResult = null;
     String tmpQuery = queries.get(index);
     try {
      QParser qparser = QParser.getParser(tmpQuery, "lucene",
        req);
      SolrQueryParser parser = new SolrQueryParser(qparser,
        req.getSchema().getDefaultSearchFieldName());
      Query query = parser.parse(tmpQuery);

      DocSet docSet = searcher.getDocSet(query);
      DocSet interSection = docs.intersection(docSet);
      tmpFacetResult = doGetStatsField(uif, interSection,
        tmpQuery, isShard, filedName, facets,
        facetSortFields, facetTopns);
     } catch (Exception e) {
      tmpFacetResult = new NamedList<Object>();

      NamedList<Object> error = new NamedList<Object>();
      NamedList<Object> value = new NamedList<Object>();
      value.add("msg", e.getMessage());
      StringWriter stacks = new StringWriter();
      e.printStackTrace(new PrintWriter(stacks));
      value.add("trace", stacks.toString());

      error.add("error", value);
      tmpFacetResult.add(tmpQuery, error);
      throw new RuntimeException(e);
     } finally {
      tmpResult.set(index, tmpFacetResult);
     }
    }
   };
   thread.start();
   threads.add(thread);

  }

  for (Thread t : threads) {
   try {
    t.join();
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }

  for (int i = 0; i < tmpResult.size(); i++) {
   NamedList<Object> namedList = tmpResult.get(i);
   if (namedList != null && namedList.size() > 0) {
    // namedList should only have one result, maybe empty
    if (labeList.get(i) != null) {
     facetResult.add(labeList.get(i), namedList.getVal(0));
    } else {
     facetResult.add(namedList.getName(0), namedList.getVal(0));
    }
   }
  }
 }

 /**
  * Post condition: size of queries and labesList should be same.
  */
 private void handleQueryParams(final String filedName,
   List<String> queries, List<String> labesList) {
  // convert ranges to queries
  List<String> rangeQuries = handleRangeParmas(filedName);
  for (String query : rangeQuries) {
   queries.add(query);
   labesList.add(null);
  }
  // handle stats.query
  handleStatsQueryParams(filedName, queries, labesList);
 }

 private void handleStatsQueryParams(final String filedName,
   List<String> queries, List<String> labesList) {
  String[] queriesStr = params.getFieldParams(filedName, "stats.query");
  if (queriesStr != null) {
   // handle query label: example:stats.query={!label="Label Name"}qstr
   for (int i = 0; i < queriesStr.length; i++) {
    String oldQuery = queriesStr[i].trim();
    if (oldQuery.startsWith("{!")) {
     int endIndex = oldQuery.indexOf('}');
     if (endIndex < 0) {
      throw new RuntimeException(
        "Local parameter is not correct: " + oldQuery);
     }
     String label = queriesStr[i].substring(2, endIndex).trim();
     if (label.startsWith("label=")) {
      label = label.substring(6).trim();
      if (label.startsWith("\"") || label.startsWith("'")) {
       label = label.substring(1);
      }
      if (label.endsWith("\"") || label.endsWith("'")) {
       label = label.substring(0, label.length() - 1);
      }
      labesList.add(label);
     } else {
      labesList.add(null);
     }
     queries.add(oldQuery.substring(endIndex + 1).trim());
    } else {
     queries.add(oldQuery);
     labesList.add(null);
    }
   }
  }
 }

 private List<String> getRanges(String field, double start, double end,
   double[] gap) {
  List<Double> ranges = new ArrayList<Double>();
  if (gap == null || gap.length == 0) {
   ranges.add(start);
   ranges.add(end);
  } else {
   double boundEnd = start;
   int i = 0;
   int lastIndex = gap.length - 1;
   while (boundEnd <= end) {
    ranges.add(boundEnd);
    if (lastIndex < i) {
     boundEnd += gap[lastIndex];
    } else {
     boundEnd += gap[i];
    }
    i++;
   }
   if (ranges.get(ranges.size() - 1) < end) {
    ranges.add(end);
   }
  }
  List<String> result = new ArrayList<String>();

  FieldType fieldType = searcher.getSchema().getField(field).getType();
  String calssName = fieldType.getClass().toString();
  if (calssName.contains("LongField") || calssName.contains("IntField")
    || calssName.contains("ShortField")) {
   for (int i = 0; i < ranges.size(); i++) {
    result.add(String.valueOf(ranges.get(i).longValue()));
   }
  } else {
   for (int i = 0; i < ranges.size(); i++) {
    result.add(ranges.get(i).toString());
   }
  }
  return result;
 }

 /**
  * @return a list of query strings
  */
 private List<String> handleRangeParmas(final String statsField) {
  String startStr = params.getFieldParam(statsField, "stats.range.start");
  String endStr = params.getFieldParam(statsField, "stats.range.end");
  String gapStr = params.getFieldParam(statsField, "stats.range.gap");
  String rangesStr = params.getFieldParam(statsField, "stats.ranges");

  Double start = null, end = null;
  if (startStr != null) {
   start = Double.parseDouble(startStr);
  }
  if (endStr != null) {
   end = Double.parseDouble(endStr);
  }

  final List<String> queries = new ArrayList<String>();
  List<String> ranges = new ArrayList<String>();
  String fieldName = statsField;
  if (rangesStr != null) {
   int index = rangesStr.indexOf(":");
   if (index > 0) {
    fieldName = rangesStr.substring(0, index);
   }
   ranges = StrUtils.splitSmart(rangesStr.substring(index + 1), ',');
  } else if (gapStr != null) {
   List<String> strs = StrUtils.splitSmart(gapStr, ',');
   double[] gap = new double[strs.size()];
   for (int i = 0; i < strs.size(); i++) {
    gap[i] = Double.parseDouble(strs.get(i));
   }
   if (start != null && end != null & gap != null) {
    ranges = getRanges(statsField, start, end, gap);
   }
  }

  for (int i = 0; i < ranges.size() - 1; i++) {
   queries.add(fieldName + ":[" + ranges.get(i).trim() + " TO "
     + ranges.get(i + 1) + "}");
  }
  return queries;
 }

 /**
  * Call UnInvertedField to get stats and handle sortfields and topn.
  */
 @SuppressWarnings("unchecked")
 private NamedList<Object> doGetStatsField(UnInvertedField uif, DocSet docs,
   String lable, boolean isShard, String fieldName, String[] facets,
   String[] facetSortFields, Integer[] facetTopns) {
  try {
   NamedList<Object> res = new NamedList<Object>();
   NamedList<Object> stv = (NamedList<Object>) uif.getStats(searcher,
     docs, facets).getStatsValues();

   handleStatsTopns(facets, facetSortFields, facetTopns, stv);
   if (isShard || (Long) stv.get("count") > 0) {
    res.add(lable == null ? fieldName : lable, stv);
   } else {
    res.add(lable == null ? fieldName : lable, null);
   }
   return res;
  } catch (Exception e) {
   throw new RuntimeException(e);
  }
 }

  private void handleStatsTopns(String[] facets, String[] facetSortFields,
      Integer[] facetTopns, NamedList<Object> stv) {
    NamedList<NamedList<NamedList<Object>>> fieldFacet = (NamedList<NamedList<NamedList<Object>>>) (stv
        .get("facets"));
    NamedList<NamedList<NamedList<Object>>> topnFacets = new NamedList<NamedList<NamedList<Object>>>();

    boolean updated = false;

    for (int j = 0; j < facets.length; j++) {
      String sortBy = facetSortFields[j];
      Integer topn = facetTopns[j];
      NamedList<NamedList<Object>> fieldFacetList = (fieldFacet)
          .get(facets[j]);
      if (fieldFacetList == null) {
        continue;
      }
      if (topn == null) {
        if (sortBy == null) {
          topnFacets.add(facets[j], fieldFacetList);
          continue;
        }
        topn = fieldFacetList.size();
      }
      updated = true;

      NamedList<NamedList<Object>> namedList =  getStatsTopn(fieldFacetList, topn, sortBy);
      topnFacets.add(facets[j], namedList);
    }
    if (updated) {
      stv.remove("facets");
      stv.add("facets", topnFacets);
    }
  }
  
  private NamedList<NamedList<Object>> getStatsTopn(
      NamedList<NamedList<Object>> fieldFacetList, int topn, String sortBy) {
    Iterator<Map.Entry<String, NamedList<Object>>> iterator = fieldFacetList
        .iterator();
    TreeSetTopnAscendingByValue<String, Double> topnSet = new TreeSetTopnAscendingByValue<String, Double>(topn);
    while (iterator.hasNext()) {
      Map.Entry<String, NamedList<Object>> entry = iterator.next();
      String newLabel = entry.getKey();
      NamedList<?> namedList = entry.getValue();
      double newValue = Double.parseDouble(namedList.get(sortBy).toString());
      topnSet.add(newLabel, newValue);
    }

    NamedList<NamedList<Object>> namedList = new NamedList<NamedList<Object>>();
    Iterator<Entry<String, Double>>  keyIterator = topnSet.descendingIterator();
    while (keyIterator.hasNext()) {
      Entry<String, Double> entry = keyIterator.next();
      String label = entry.getKey();
      namedList.add(label, fieldFacetList.get(label));
    }
    return namedList;
  }

  static class TreeSetTopnAscendingByValue<K extends Comparable<? super K>, V extends Comparable<? super V>>
      extends TreeSet<Map.Entry<K, V>> {
    private static final long serialVersionUID = 1L;
    private int maxSize = Integer.MAX_VALUE;
    private Comparator<Map.Entry<K, V>> comparator;

    public TreeSetTopnAscendingByValue(Comparator<Map.Entry<K, V>> comparator,
        int maxSize) {
      super(comparator);
      this.comparator = comparator;
      this.maxSize = maxSize;
    }

    public TreeSetTopnAscendingByValue(int maxSize) {
      this(new Comparator<Map.Entry<K, V>>() {
        public int compare(Map.Entry<K, V> e1, Map.Entry<K, V> e2) {
          int res = e1.getValue().compareTo(e2.getValue());
          if (res == 0) {
            res = e1.getKey().compareTo(e2.getKey());
          }
          return res;
        }
      }, maxSize);
    }

    public boolean add(K newKey, V newValue) {
      return add(new SimpleEntry<K, V>(newKey, newValue));
    }

    @Override
    public boolean add(Entry<K, V> newValue) {
      boolean added = false;
      if (this.size() > maxSize)
        throw new RuntimeException("Should never happen.");
      if (this.size() == maxSize) {
        Iterator<Entry<K, V>> iterator = this.iterator();
        Entry<K, V> currentMin = iterator.next();
        // only remove currentMin if newValue > currentMin. if equals, do nothing
        // for better performance.
        if (comparator.compare(newValue, currentMin) > 0) {
          added = super.add(newValue);
          // the added element may already exist, if the map doesn't have this
          // element(added is true), remove currentMin
          if (added) {
            iterator = this.iterator();
            iterator.next();
            iterator.remove();
          }
        } 
      } else {
        added = super.add(newValue);
      }
      return added;
    }
  }
}
Resource:
http://wiki.apache.org/solr/StatsComponent
Solr StatsComponent
https://issues.apache.org/jira/browse/SOLR-4239

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)