Solr: Sort Group Ascendingly(asc_max) by Max Value in Each Group

User Case
In Solr Group, when sort=time asc_max, we want to sort group ascendingly by the the max(not min) value of time field in each group, vice verse, when sort=tme desc_max, we want to sort group descendingly by the the min(not max) value of time field in each group.

Back Ground
Using Solr group, we can group documents with a common field value, and use sort to specify how groups are sorted.
For example: sort=time asc&group.field=subject
Solr will

Check Result Grouping
When sort=time asc, the groups are sorted by the minimum value of time field in each group; when sort=time desc, the groups are sorted by the maximum value of time field in each group.

But in some cases, this is not what we wanted:
When sort=time asc, we want to sort the groups by the max(not min) value of time field in each group, vice verse when sort=time desc, we many want to sort the groups by the min(not max) value of time field in each group.

Solr doesn't support this. We have to figure it out how to implement it on our own.

The following is my first version of implementation: it works but there is still a lot to improve.

How Solr Group Works
Basically there are two phrases, first collect method AbstractFirstPassGroupingCollector(in our case:sort=time asc, it's TermFirstPassGroupingCollector) go thorough all docs that matches the query and filters,  its orderedGroups maintains the top X group based on the min(sort=mtm asc) or max(sort=mtm desc) value in each group, then Solr gets the top groups(AbstractFirstPassGroupingCollector.getTopGroups(int, boolean)). Then Solr calls AbstractSecondPassGroupingCollector(in this case TermSecondPassGroupingCollector) to get docs in the group.

FieldComparator & LongAbnormalComparator
FieldComparator plays one important role here: it compares hits so as to determine their sort order. Here we will create our custom LongAbnormalComparator, which will use Map minMaxMap to store the max value of each group in asc(asc_max) mode, store the min value of each group in desc(desc_min) mode. When compare group, it will use the values in minMaxMap  to compare group. 

One caveat here is that the size of  minMaxMap and values array has to be all group count in the index, so it can contain all groups.
public static interface AbnormalComparator { }
public static final class LongAbnormalComparator extends LongComparator implements  AbnormalComparator{
  private boolean reverse;
  protected final BytesRef[] slotGroupValues;
  protected final Map<BytesRef,Long> minMaxMap = new HashMap<BytesRef,Long>();
  BinaryDocValues groupTerms;
  private SortedDocValues index;
  private String groupField;

  LongAbnormalComparator(int numHits, String field, Parser parser,
      Long missingValue, boolean reverse, String groupField) {
    super(numHits, field, parser, missingValue);
    this.groupField = groupField;
    slotGroupValues = new BytesRef[numHits];
    this.reverse = reverse;
  }
  
  @Override
  public FieldComparator<Long> setNextReader(AtomicReaderContext context)
      throws IOException {
    index = FieldCache.DEFAULT.getTermsIndex(context.reader(), groupField);
    return super.setNextReader(context);
  }
  
  public boolean isCompetitive(int slot, int doc) {
    boolean isCompetitive = false;
    long v2 = currentReaderValues.get(doc);
    if (v2 == 0) {
      v2 = getMissingValue();
    }
    
    BytesRef groupBytes = getGroupFieldValue(doc);
    Long oldValue = minMaxMap.get(groupBytes);
    if (reverse) {
      if (oldValue == null) {
        isCompetitive = true;
      } else {
        if (v2 < oldValue) {
          isCompetitive = true;
        }
      }
    } else {
      if (oldValue == null) {
        isCompetitive = true;
      } else {
        if (v2 > oldValue) {
          isCompetitive = true;
        }
      }
    }
    return isCompetitive;
  }

  private BytesRef getGroupFieldValue(int doc) {
    BytesRef groupBytes = new BytesRef();
    index.get(doc, groupBytes);
    return groupBytes;
  }

  @Override
  public int compare(int slot1, int slot2) {
    // in abnormal mode, the treeset will be only created once - buildSortedSet
    // it compare max value
    BytesRef group1 = slotGroupValues[slot1];
    BytesRef group2 = slotGroupValues[slot2];
    final long v1 = minMaxMap.get(group1);
    final long v2 = minMaxMap.get(group2);
    if (v1 > v2) {
      return 1;
    } else if (v1 < v2) {
      return -1;
    } else {
      return 0;
    }
  }
  
  private long getMissingValue()
  {
    if(reverse)
    {
      // sort=time desc_min, 
      return Long.MAX_VALUE;
    }
    else
    {
   // sort=time asc_max
      return Long.MIN_VALUE;
    }
  }
  @Override
  public void copy(int slot, int doc) {
    long v2 = currentReaderValues.get(doc);
    if (v2 == 0) {
      v2 = getMissingValue();
    }
    
    BytesRef groupBytes = getGroupFieldValue(doc);
    Long oldValue = minMaxMap.get(groupBytes);
    
    // update maxValues if needed
    if (reverse) {
      if (oldValue == null) {
        update(slot, doc, v2, groupBytes);
      } else {
        // desc_min mode, only update if curr is smaller
        if (v2 < oldValue) {
          update(slot, doc, v2, groupBytes);
        }
      }
    } else {
      // asc_max mode, only update if curr is larger
      if (oldValue == null) {
        update(slot, doc, v2, groupBytes);
      } else {
        if (v2 > oldValue) {
          update(slot, doc, v2, groupBytes);
        }
      }
    }
    values[slot] = v2;
  }

  private void update(int slot, int doc, long v2, BytesRef groupBytes) {
    slotGroupValues[slot] = groupBytes;
    minMaxMap.put(groupBytes, v2);
  }
  
 public long[] getValues() {
    return values;
  }
Change in TermFirstPassGroupingCollector
public class TermFirstPassGroupingCollector extends AbstractFirstPassGroupingCollector<BytesRef> {
  private String groupField;
  private boolean hasAbnormal;

  public TermFirstPassGroupingCollector(String groupField, Sort groupSort,
      int topNGroups) throws IOException {
    super();
    if (topNGroups < 1) {
      throw new IllegalArgumentException("topNGroups must be >= 1 (got "
          + topNGroups + ")");
    }
    final SortField[] sortFields = groupSort.getSort();
    for (int i = 0; i < sortFields.length; i++) {
      final SortField sortField = sortFields[i];      
      sortField.isAbnormal();
      if (sortField.isAbnormal()) {
        hasAbnormal = true;
        break;
      }
    }
    if (!hasAbnormal) {
      super.init(groupSort, topNGroups);
      return;
    }
    Integer groupCount = groupCountTL.get();
    subInit(groupSort, groupCount, sortFields, groupField);
  }
  
  private static final ThreadLocal<Integer> groupCountTL= new ThreadLocal<Integer>();
  public static void setGroupCountTLValue(int groupCount)
  {
    groupCountTL.set(groupCount);
  }
  public static void removeGroupCountTL()
  {
    groupCountTL.remove();
  }
  private void subInit(Sort groupSort, int topNGroups,
      final SortField[] sortFields, String groupField) throws IOException {
    this.groupField = groupField;
    this.groupSort = groupSort;
    this.topNGroups = topNGroups;
    
    comparators = new FieldComparator[sortFields.length];
    compIDXEnd = comparators.length - 1;
    reversed = new int[sortFields.length];
    for (int i = 0; i < sortFields.length; i++) {
      final SortField sortField = sortFields[i];
      comparators[i] = sortField.getComparatorWithAbnormal(topNGroups, i,
          groupField);
      reversed[i] = sortField.getReverse() ? -1 : 1;
    }
    
    spareSlot = topNGroups;
    groupMap = new HashMap<BytesRef,CollectedSearchGroup<BytesRef>>(topNGroups);
  }
  
  @Override
  public void collect(int doc) throws IOException {
    if(!hasAbnormal)
    {
      super.collect(doc);
      return;
    }
    final BytesRef groupValue = getDocGroupValue(doc);
    final CollectedSearchGroup<BytesRef> group = groupMap.get(groupValue);

    if (group == null) {
        // Add a new CollectedSearchGroup:
        CollectedSearchGroup<BytesRef> sg = new CollectedSearchGroup<BytesRef>(comparators);
        sg.groupValue = copyDocGroupValue(groupValue, null);
        sg.comparatorSlot = groupMap.size();
        sg.topDoc = docBase + doc;
        for (FieldComparator<?> fc : comparators) {
          fc.copy(sg.comparatorSlot, doc);
        }
        groupMap.put(sg.groupValue, sg);
        return;
    }
    // Update existing group:
    for (int compIDX = 0;; compIDX++) {
      final FieldComparator<?> fc = comparators[compIDX];
      
      if (fc instanceof LongAbnormalComparator) {
        LongAbnormalComparator my = (LongAbnormalComparator) fc;
        if (!my.isCompetitive(group.comparatorSlot, doc)) {
          return;
        }
        else
        {
          fc.copy(spareSlot, doc);
          // Definitely competitive; set remaining comparators:
          for (int compIDX2 = compIDX + 1; compIDX2 < comparators.length; compIDX2++) {
            comparators[compIDX2].copy(spareSlot, doc);
          }
          break;
        }
      } else {
        fc.copy(spareSlot, doc);
        int c = reversed[compIDX] * fc.compare(group.comparatorSlot, spareSlot);
        if (c < 0) {
          // Definitely not competitive.
          return;
        } else if (c > 0) {
          // Definitely competitive; set remaining comparators:
          for (int compIDX2 = compIDX + 1; compIDX2 < comparators.length; compIDX2++) {
            comparators[compIDX2].copy(spareSlot, doc);
          }
          break;
        } else if (compIDX == compIDXEnd) {
          // Here c=0. If we're at the last comparator, this doc is not
          // competitive, since docs are visited in doc Id order, which means
          // this doc cannot compete with any other document in the queue.
          return;
        }
      }
    }

    // Remove before updating the group since lookup is done via comparators
    // TODO: optimize this
    final CollectedSearchGroup<BytesRef> prevLast;
    if (orderedGroups != null) {
      prevLast = orderedGroups.last();
      orderedGroups.remove(group);
//      assert orderedGroups.size() == topNGroups-1;
    } else {
      prevLast = null;
    }

    group.topDoc = docBase + doc;

    // Swap slots
    final int tmp = spareSlot;
    spareSlot = group.comparatorSlot;
    group.comparatorSlot = tmp;

    // Re-add the changed group
    if (orderedGroups != null) {
      orderedGroups.add(group);
//      assert orderedGroups.size() == topNGroups;
      final CollectedSearchGroup<?> newLast = orderedGroups.last();
      // If we changed the value of the last group, or changed which group was last, then update bottom:
      if (group == newLast || prevLast != newLast) {
        for (FieldComparator<?> fc : comparators) {
          fc.setBottom(newLast.comparatorSlot);
        }
      }
    }
  }
}
What is Missing
Update solr.search.QueryParsing.StrParser.getSortDirection() to parse the aort string, when sort is like: asc_max, desc_min, set SortField abnormal value to true.
We need one Wrapper Request Handler, when sort is like time asc_max or time desc_min, it will first use TermAllGroupsCollector to get all group count.


Resources
Solr Join: Return Parent and Child Documents
Use Solr map function query(group.sort=map(type,1,1,-1) ) in group flat mode
Solr: Update other Document in DocTransformer by Writing custom SolrWriter
Solr: Use DocTransformer to dynamically Generate groupCount and time value for group doc
Post a Comment

Labels

Java (159) Lucene-Solr (112) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (38) Eclipse (33) Code Example (31) Linux (25) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) Shell (7) ANT (6) Coding Skills (6) Database (6) Lesson Learned (6) Programmer Skills (6) Scala (6) Tips (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) System Design (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts