mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 11:07:06 +00:00
(improvement) (headless) Optimize the performance of the method BaseMatchStrategy.executeTasks() (#2363) (#2364)
This commit is contained in:
@@ -12,12 +12,17 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -72,18 +77,39 @@ public abstract class BaseMatchStrategy<T extends MapResult> implements MatchStr
|
||||
}
|
||||
}
|
||||
|
||||
protected void executeTasks(List<Callable<Void>> tasks) {
|
||||
protected Set<T> executeTasks(List<Supplier<List<T>>> tasks) {
|
||||
|
||||
Function<Supplier<List<T>>, Supplier<List<T>>> decorator = taskDecorator();
|
||||
List<CompletableFuture<List<T>>> futures;
|
||||
if (decorator == null) {
|
||||
futures = tasks.stream().map(t -> CompletableFuture.supplyAsync(t, executor)).toList();
|
||||
} else {
|
||||
futures = tasks.stream()
|
||||
.map(t -> CompletableFuture.supplyAsync(decorator.apply(t), executor)).toList();
|
||||
}
|
||||
|
||||
CompletableFuture<List<T>> listCompletableFuture =
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
|
||||
.thenApply(v -> futures.stream()
|
||||
.flatMap(listFuture -> listFuture.join().stream())
|
||||
.collect(Collectors.toList()));
|
||||
try {
|
||||
executor.invokeAll(tasks);
|
||||
for (Callable<Void> future : tasks) {
|
||||
future.call();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
List<T> ts = listCompletableFuture.get();
|
||||
Set<T> results = new HashSet<>();
|
||||
selectResultInOneRound(results, ts);
|
||||
return results;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Task execution interrupted", e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Function<Supplier<List<T>>, Supplier<List<T>>> taskDecorator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public double getThreshold(Double threshold, Double minThreshold, MapModeEnum mapModeEnum) {
|
||||
if (MapModeEnum.STRICT.equals(mapModeEnum)) {
|
||||
return 1.0d;
|
||||
|
||||
@@ -17,6 +17,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -76,6 +78,22 @@ public class DatabaseMatchStrategy extends SingleMatchStrategy<DatabaseMapResult
|
||||
return allElements;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Supplier<List<DatabaseMapResult>>, Supplier<List<DatabaseMapResult>>> taskDecorator() {
|
||||
List<SchemaElement> schemaElements = allElements.get();
|
||||
if (CollectionUtils.isEmpty(schemaElements)) {
|
||||
return null;
|
||||
}
|
||||
return (t) -> (Supplier<List<DatabaseMapResult>>) () -> {
|
||||
try {
|
||||
allElements.set(schemaElements);
|
||||
return t.get();
|
||||
} finally {
|
||||
allElements.remove();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Double getThreshold(ChatQueryContext chatQueryContext) {
|
||||
Double threshold =
|
||||
Double.valueOf(mapperConfig.getParameterValue(MapperConfig.MAPPER_NAME_THRESHOLD));
|
||||
|
||||
@@ -24,8 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.tencent.supersonic.headless.chat.mapper.MapperConfig.*;
|
||||
@@ -141,7 +140,6 @@ public class EmbeddingMatchStrategy extends BatchMatchStrategy<EmbeddingResult>
|
||||
*/
|
||||
public List<EmbeddingResult> detectByBatch(ChatQueryContext chatQueryContext,
|
||||
Set<Long> detectDataSetIds, Set<String> detectSegments, boolean useLlm) {
|
||||
Set<EmbeddingResult> results = ConcurrentHashMap.newKeySet();
|
||||
int embeddingMapperBatch = Integer
|
||||
.valueOf(mapperConfig.getParameterValue(MapperConfig.EMBEDDING_MAPPER_BATCH));
|
||||
|
||||
@@ -154,12 +152,11 @@ public class EmbeddingMatchStrategy extends BatchMatchStrategy<EmbeddingResult>
|
||||
Lists.partition(queryTextsList, embeddingMapperBatch);
|
||||
|
||||
// Create and execute tasks for each batch
|
||||
List<Callable<Void>> tasks = new ArrayList<>();
|
||||
List<Supplier<List<EmbeddingResult>>> tasks = new ArrayList<>();
|
||||
for (List<String> queryTextsSub : queryTextsSubList) {
|
||||
tasks.add(
|
||||
createTask(chatQueryContext, detectDataSetIds, queryTextsSub, results, useLlm));
|
||||
tasks.add(createTask(chatQueryContext, detectDataSetIds, queryTextsSub, useLlm));
|
||||
}
|
||||
executeTasks(tasks);
|
||||
Set<EmbeddingResult> results = executeTasks(tasks);
|
||||
|
||||
// Apply LLM filtering if enabled
|
||||
if (useLlm) {
|
||||
@@ -196,20 +193,13 @@ public class EmbeddingMatchStrategy extends BatchMatchStrategy<EmbeddingResult>
|
||||
* @param chatQueryContext The context of the chat query
|
||||
* @param detectDataSetIds Target dataset IDs
|
||||
* @param queryTextsSub Sub-list of query texts to process
|
||||
* @param results Shared result set for collecting results
|
||||
* @param useLlm Whether to use LLM
|
||||
* @return Callable task
|
||||
* @return Supplier task
|
||||
*/
|
||||
private Callable<Void> createTask(ChatQueryContext chatQueryContext, Set<Long> detectDataSetIds,
|
||||
List<String> queryTextsSub, Set<EmbeddingResult> results, boolean useLlm) {
|
||||
return () -> {
|
||||
List<EmbeddingResult> oneRoundResults = detectByQueryTextsSub(detectDataSetIds,
|
||||
queryTextsSub, chatQueryContext, useLlm);
|
||||
synchronized (results) {
|
||||
selectResultInOneRound(results, oneRoundResults);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
private Supplier<List<EmbeddingResult>> createTask(ChatQueryContext chatQueryContext,
|
||||
Set<Long> detectDataSetIds, List<String> queryTextsSub, boolean useLlm) {
|
||||
return () -> detectByQueryTextsSub(detectDataSetIds, queryTextsSub, chatQueryContext,
|
||||
useLlm);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -11,8 +11,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -26,8 +25,7 @@ public abstract class SingleMatchStrategy<T extends MapResult> extends BaseMatch
|
||||
Set<Long> detectDataSetIds) {
|
||||
Map<Integer, Integer> regOffsetToLength = mapperHelper.getRegOffsetToLength(terms);
|
||||
String text = chatQueryContext.getRequest().getQueryText();
|
||||
Set<T> results = ConcurrentHashMap.newKeySet();
|
||||
List<Callable<Void>> tasks = new ArrayList<>();
|
||||
List<Supplier<List<T>>> tasks = new ArrayList<>();
|
||||
|
||||
for (int startIndex = 0; startIndex <= text.length() - 1;) {
|
||||
for (int index = startIndex; index <= text.length();) {
|
||||
@@ -35,27 +33,20 @@ public abstract class SingleMatchStrategy<T extends MapResult> extends BaseMatch
|
||||
index = mapperHelper.getStepIndex(regOffsetToLength, index);
|
||||
if (index <= text.length()) {
|
||||
String detectSegment = text.substring(startIndex, index).trim();
|
||||
Callable<Void> task = createTask(chatQueryContext, detectDataSetIds,
|
||||
detectSegment, offset, results);
|
||||
Supplier<List<T>> task =
|
||||
createTask(chatQueryContext, detectDataSetIds, detectSegment, offset);
|
||||
tasks.add(task);
|
||||
}
|
||||
}
|
||||
startIndex = mapperHelper.getStepIndex(regOffsetToLength, startIndex);
|
||||
}
|
||||
executeTasks(tasks);
|
||||
Set<T> results = executeTasks(tasks);
|
||||
return new ArrayList<>(results);
|
||||
}
|
||||
|
||||
private Callable<Void> createTask(ChatQueryContext chatQueryContext, Set<Long> detectDataSetIds,
|
||||
String detectSegment, int offset, Set<T> results) {
|
||||
return () -> {
|
||||
List<T> oneRoundResults =
|
||||
detectByStep(chatQueryContext, detectDataSetIds, detectSegment, offset);
|
||||
synchronized (results) {
|
||||
selectResultInOneRound(results, oneRoundResults);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
private Supplier<List<T>> createTask(ChatQueryContext chatQueryContext,
|
||||
Set<Long> detectDataSetIds, String detectSegment, int offset) {
|
||||
return () -> detectByStep(chatQueryContext, detectDataSetIds, detectSegment, offset);
|
||||
}
|
||||
|
||||
public abstract List<T> detectByStep(ChatQueryContext chatQueryContext,
|
||||
|
||||
Reference in New Issue
Block a user