The problem
We store data in solr cloud. As application evolves from single-suppose message application to multiple-tenant message applications, there is much more traffic to our application.
The traffic is high, but the data is small. - As we don't have many active messages at specific time.
To boost search performance, we decide to use in memory embeddedSolr.
The Solution
Admin application still use CloudSolrRepositery to write data into solr cloud.
Client-facing application periodically deletes expired data from embeddedSolr and copies (only) updated/new data from solr cloud to embeddedSolr.
We change server code to use the EmbeddedSolrRepositery. - So there is only little change to existing code.
DataSyncService.copyMessagesFromSolrCloudToEmbeddedSolr deletes expired data from embedded Solr and then copies (only) updated/new data from solr cloud to embeddedSolr. - it ignores data that already exists(with same id and _version_ values).
It's already be called in the ContextLoaderListener so it copies all data from solrcloud to embedded Solr before application startup finishes.
It's also a scheduled task - it will be called periodically.
Here we use SchedulingConfigurer - not @Scheduled because we want to make the interval configurable and changeable. - @Scheduled only supports read value from property file, but doesn't support to call bean method.
Also admin application can change the configuration to enable/disable embedded solr and change the frequency of sync.
Talk is cheap. Show me the code.
@Service(MessageCloudRepository.NAME)
public class MessageCloudRepository extends AbstractMessageRepository {
public static final String NAME = "MessageCloudRepository";
@Autowired
@Qualifier(RestCommonsAppConfig.BEAN_SOLR_CLOUD)
private SolrClient cloudSolrServer;
@Override
public SolrClient getSolrServer() {
return cloudSolrServer;
}
}
@Service(MessageEmbeddedRepository.NAME)
public class MessageEmbeddedRepository extends AbstractMessageRepository {
public static final String NAME = "MessageEmbeddedRepository";
@Autowired
@Qualifier(RestCommonsAppConfig.BEAN_EMBEDDED_MESSAGE_)
private SolrClient embeddedSolrServer;
@Override
public SolrClient getSolrServer() {
return embeddedSolrServer;
}
}
@Service
public class DataSyncService {
@Autowired
@Qualifier(MessageEmbeddedRepository.NAME)
private IMessageRepository embeddedRepository;
@Autowired
private @Qualifier(MessageCloudRepository.NAME) IMessageRepository cloudRepository;
@Autowired
private IConfigService configService;
public void copyMessagesFromSolrCloudToEmbeddedSolr() {
if (!configService.isEmbeddedMessageSolrEnabled()) {
return;
}
deleteExpiredDataFromEmbeddedSolr();
final String query = ""; // the query to get new active data
final SolrQuery solrQuery = new SolrQuery(query);
// but add filter to ignore data already in embeddedSolr with same id and _version_
ignoreExistingData(solrQuery);
final List<Future<List<Message>>> messagesFutures = cloudRepository.findAllAsync(solrQuery);
if (CollectionUtils.isNotEmpty(messagesFutures)) {
for (final Future<List<Message>> messagesFuture : messagesFutures) {
try {
final List<Message> messages = messagesFuture.get().stream().map(message -> {
message.setVersionFromSolrCloud(message.getVersion());
return message;
}).collect(Collectors.toList());
embeddedRepository.saveWithoutCommit(messages);
} catch (InterruptedException | ExecutionException e) {
logger.error("Failed to copy data from solr cloud to embedded solr.", e);
}
}
embeddedRepository.hardCommit();
}
}
/**
* ignore data that is already in embedded solr and with same id and _version_.
*/
protected void ignoreExistingData(final SolrQuery solrQuery) {
final SolrQuery existingDataQuery = new SolrQuery("*:*").setFields(Abstract.FIELD_ID,
Message.FIELD_VERSION_FROM_SOLR_CLOUD);
final Iterable<Message> existingMessages = embeddedRepository.findAllSync(existingDataQuery);
// NOT ((id:id1 AND _version_:v1) OR (id:id2 AND _version_:v2))
final Iterator<Message> it = existingMessages.iterator();
if (it.hasNext()) {
final StringBuilder sb = new StringBuilder();
while (it.hasNext()) {
final Message message = it.next();
sb.append(MessageFormat.format("({0}:{1} AND {2}:{3,number,#})", Abstract.FIELD_ID,
message.getId(), AbstractSolrDocument.FIELD_VERSION_,
message.getVersionFromSolrCloud()));
if (it.hasNext()) {
sb.append(SolrUtil.SEPERATOR_OR);
}
}
solrQuery.addFilterQuery(MessageFormat.format("{0}({1})", SolrUtil.NOT, sb.toString()));
}
}
}
@Configuration
@EnableScheduling
public class ScheduledTaskConfig implements SchedulingConfigurer {
@Autowired
private DataSyncService dataSyncService;
@Autowired
private IConfigService configService;
@Bean(destroyMethod = "shutdown")
public Executor taskExecutor() {
return Executors.newScheduledThreadPool(10);
}
@Override
public void configureTasks(final ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
taskRegistrar.addTriggerTask(new Runnable() {
@Override
public void run() {
dataSyncService.copyMessagesFromSolrCloudToEmbeddedSolr();
}
}, new Trigger() {
@Override
public Date nextExecutionTime(final TriggerContext triggerContext) {
final Calendar nextExecutionTime = new GregorianCalendar();
final Date lastActualExecutionTime = triggerContext.lastActualExecutionTime();
nextExecutionTime.setTime(lastActualExecutionTime != null ? lastActualExecutionTime : new Date());
nextExecutionTime.add(Calendar.MILLISECOND,
configService.getSimpleConfig().extractSyncMessageToEmbeddedSolrIntervalInMill());
return nextExecutionTime.getTime();
}
});
}
}