Make an optimization for BatchingClient

This commit is contained in:
xdark 2023-04-01 03:38:07 +03:00
parent ec83d01dca
commit 5373e80e7c
2 changed files with 15 additions and 17 deletions

View File

@ -134,7 +134,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
})); }));
} }
private Future<Map<Long, Card2>> batchRetrieveURLs(Set<Long> keys) { private Future<Map<Long, Card2>> batchRetrieveURLs(List<Long> keys) {
retrieveCardsTimer.start(); retrieveCardsTimer.start();
totalTweets.increment(keys.size()); totalTweets.increment(keys.size());
@ -145,7 +145,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage
GetTweetsRequest request = new GetTweetsRequest() GetTweetsRequest request = new GetTweetsRequest()
.setOptions(options) .setOptions(options)
.setTweet_ids(new ArrayList<>(keys)); .setTweet_ids(keys);
return tweetyPieService.get_tweets(request) return tweetyPieService.get_tweets(request)
.onFailure(throwable -> { .onFailure(throwable -> {

View File

@ -2,7 +2,7 @@ package com.twitter.search.ingester.pipeline.util;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -20,7 +20,7 @@ public class BatchingClient<RQ, RP> {
/** /**
* Issue a request to the underlying store which supports batches of requests. * Issue a request to the underlying store which supports batches of requests.
*/ */
Future<Map<RQ, RP>> batchGet(Set<RQ> requests); Future<Map<RQ, RP>> batchGet(List<RQ> requests);
} }
/** /**
@ -54,7 +54,7 @@ public class BatchingClient<RQ, RP> {
} }
private void maybeBatchCall(RQ request) { private void maybeBatchCall(RQ request) {
Set<RQ> frozenRequests; List<RQ> frozenRequests;
synchronized (unsentRequests) { synchronized (unsentRequests) {
unsentRequests.add(request); unsentRequests.add(request);
if (unsentRequests.size() < batchSize) { if (unsentRequests.size() < batchSize) {
@ -63,27 +63,25 @@ public class BatchingClient<RQ, RP> {
// Make a copy of requests so we can modify it inside executeBatchCall without additional // Make a copy of requests so we can modify it inside executeBatchCall without additional
// synchronization. // synchronization.
frozenRequests = new HashSet<>(unsentRequests); frozenRequests = Arrays.asList((RQ[]) unsentRequests.toArray());
unsentRequests.clear(); unsentRequests.clear();
} }
executeBatchCall(frozenRequests); executeBatchCall(frozenRequests);
} }
private void executeBatchCall(Set<RQ> requests) { private void executeBatchCall(List<RQ> requests) {
batchClient.batchGet(requests) batchClient.batchGet(requests)
.onSuccess(responseMap -> { .onSuccess(responseMap -> {
for (Map.Entry<RQ, RP> entry : responseMap.entrySet()) { for (RQ request : requests) {
Promise<RP> promise = promises.remove(entry.getKey());
if (promise != null) {
promise.become(Future.value(entry.getValue()));
}
}
Set<RQ> outstandingRequests = Sets.difference(requests, responseMap.keySet());
for (RQ request : outstandingRequests) {
Promise<RP> promise = promises.remove(request); Promise<RP> promise = promises.remove(request);
if (promise != null) { if (promise == null) {
continue;
}
RP response = responseMap.get(request);
if (response != null) {
promise.become(Future.value(response));
} else {
promise.become(Future.exception(new ResponseNotReturnedException(request))); promise.become(Future.exception(new ResponseNotReturnedException(request)));
} }
} }