Kotlin coroutines and Java: How do they interoperate? - Part 2
This post continues what we have left off last time when migrating Java blocking codebase into Kotlin coroutine and how they interoperate
with each other. To summarize, we have migrated ProductDescriptionService
from Java to Kotlin
and enabled it in AggregateService
via a useKotlin
switch. Moreover, we have created unit tests in Kotlin version, whereas the existing Java version
has some minor adjustments. The timing in integration tests is adjusted as we’re moving towards asynchronous (and concurrent) programming.
The figure below serves as reminder of our aggregation service architecture.
Once we understand the pattern for migration, we can continue migrating all service calls from Java to Kotlin and enabled it in AggregateService
via
the switch. The AggregationService
code below shows one possible state when we have successfully migrated them. The AggregatorService
code is available at
the Github feature branch.
@Service
public class AggregatorService {
private static final Logger logger = LoggerFactory.getLogger(AggregatorService.class);
//Java
private final ProductCatalogService productCatalogService;
private final ProductDescriptionService productDescriptionService;
private final ProductOfferService offerService;
private final SellerService sellerService;
private final ProductReviewService reviewService;
//Kotlin
private final ProductCatalogServiceKt productCatalogServiceKt;
private final ProductDescriptionServiceKt productDescriptionServiceKt;
private final ProductOfferServiceKt offerServiceKt;
private final SellerServiceKt sellerServiceKt;
private final ProductReviewServiceKt reviewServiceKt;
//Flag
private final boolean useKotlin;
@Autowired
public AggregatorService(ProductCatalogService productCatalogService,
ProductDescriptionService productDescriptionService,
ProductOfferService offerService,
SellerService sellerService,
ProductReviewService reviewService,
ProductCatalogServiceKt productCatalogServiceKt,
ProductDescriptionServiceKt productDescriptionServiceKt,
ProductOfferServiceKt offerServiceKt,
SellerServiceKt sellerServiceKt,
ProductReviewServiceKt reviewServiceKt,
@Value("${use.kotlin:false}") boolean useKotlin) {
this.productCatalogService = productCatalogService;
this.productDescriptionService = productDescriptionService;
this.offerService = offerService;
this.sellerService = sellerService;
this.reviewService = reviewService;
this.productCatalogServiceKt = productCatalogServiceKt;
this.productDescriptionServiceKt = productDescriptionServiceKt;
this.offerServiceKt = offerServiceKt;
this.sellerServiceKt = sellerServiceKt;
this.reviewServiceKt = reviewServiceKt;
this.useKotlin = useKotlin;
}
public ProductSummary getProductSummary(String productId) throws ProductNotFoundException {
return getProductCatalogReroute(productId)
.exceptionally(t -> {
logger.error("Error retrieving data for product catalog with productId {}: {}", productId, t.getCause().getLocalizedMessage());
return Optional.empty();
})
.thenApply(productCatalogOpt -> productCatalogOpt.map(
productCatalog -> {
final CompletableFuture<Optional<ProductDescription>> productDescriptionAsync = getProductDescriptionReroute(productId)
.exceptionally(t -> {
logger.error("Error retrieving data for product description with productId {}: {}", productId, t.getCause().getLocalizedMessage());
return Optional.empty();
});
final CompletableFuture<List<ProductOfferAndSeller>> productOfferAndSellersAsync = getProductOfferAndSellersReroute(productId)
.exceptionally(t -> {
logger.error("Error retrieving data for product offer with productId {}: {}", productId, t.getCause().getLocalizedMessage());
return Collections.emptyList();
});
final CompletableFuture<Pair<List<String>, Double>> productReviewsAsync = getProductReviewsReroute(productId)
.exceptionally(t -> {
logger.error("Error retrieving data for product review with productId {}: {}", productId, t.getCause().getLocalizedMessage());
return Pair.of(Collections.emptyList(), 0d);
});
final Optional<ProductDescription> productDescription = productDescriptionAsync.join();
final List<ProductOfferAndSeller> productOfferAndSellers = productOfferAndSellersAsync.join();
final Pair<List<String>, Double> productReviews = productReviewsAsync.join();
return new ProductSummary(productId,
productCatalog.getProductName(),
productDescription.map(ProductDescription::getShortDescription).orElse(null),
productDescription.map(ProductDescription::getWeightInKg).orElse(null),
productDescription.map(ProductDescription::getColor).orElse(null),
productOfferAndSellers,
productReviews.getLeft(),
productReviews.getRight());
})
)
.join() //immediately blocking in this case
.orElseThrow(() -> new ProductNotFoundException("Product can't be found!"));
}
private CompletableFuture<Optional<ProductCatalog>> getProductCatalogReroute(String productId) {
if (useKotlin) {
return productCatalogServiceKt.getProductCatalogJavaCall(productId);
} else {
return CompletableFuture.supplyAsync(() -> productCatalogService.getProductCatalog(productId));
}
}
private CompletableFuture<Optional<ProductDescription>> getProductDescriptionReroute(String productId) {
if (useKotlin) {
return productDescriptionServiceKt.getProductDescriptionJavaCall(productId);
} else {
return CompletableFuture.supplyAsync(() -> productDescriptionService.getProductDescription(productId));
}
}
private CompletableFuture<List<ProductOfferAndSeller>> getProductOfferAndSellersReroute(String productId) {
if (useKotlin) {
//Note: This shows the challenges to construct CompletableFuture pipeline, Spring Reactor has slightly better operator naming for pipelining
return offerServiceKt.getProductOffersForJavaCall(productId)
.thenCompose(productOffers -> {
final List<CompletableFuture<ProductOfferAndSeller>> productOfferAndSellerFutures = productOffers.stream()
.map(productOffer -> sellerServiceKt.getSellerForJavaCall(productOffer.getSellerId())
.exceptionally(t -> {
logger.error("Error retrieving data for seller with sellerId {}: {}", productOffer.getSellerId(), t.getCause().getLocalizedMessage());
return Optional.empty();
})
.thenApply(seller -> new ProductOfferAndSeller(productOffer.getPrice(), seller.map(Seller::getSellerName).orElse(null))))
.collect(Collectors.toList());
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(productOfferAndSellerFutures.toArray(new CompletableFuture[0]));
return allDoneFuture.thenApply(v -> productOfferAndSellerFutures.stream()
.map(CompletableFuture::join).
collect(Collectors.toList()));
});
} else {
return CompletableFuture.supplyAsync(() -> offerService.getProductOffers(productId).stream()
.map(productOffer -> {
final Optional<Seller> seller = sellerService.getSeller(productOffer.getSellerId());
return new ProductOfferAndSeller(productOffer.getPrice(), seller.map(Seller::getSellerName).orElse(null));
})
.collect(Collectors.toList()));
}
}
private CompletableFuture<Pair<List<String>, Double>> getProductReviewsReroute(String productId) {
if (useKotlin) {
return reviewServiceKt.getProductReviewsForJavaCall(productId)
.thenApply(this::getReviewsAndRating);
} else {
return CompletableFuture.supplyAsync(() -> getReviewsAndRating(reviewService.getReviews(productId)));
}
}
@NotNull
private Pair<List<String>, Double> getReviewsAndRating(List<ProductReview> reviews) {
final List<String> allReviews = new ArrayList<>();
double rating = 0.0;
for (ProductReview review : reviews) {
allReviews.add(review.getReviewNote());
rating += review.getStar();
}
if (reviews.size() != 0) rating = rating / reviews.size();
return Pair.of(allReviews, rating);
}
}
The code above shows the complexity working with CompletableFuture
. If you take a look at getProductOfferAndSellersReroute
method,
then you will see an example of composing CompletebleFuture
pipeline. Spring Reactor has slightly better operator naming. However,
there are tons of operators that we have to learn to use it properly.
Beside AggregatorService
, we need to adjust unit tests in AggregatorServiceTests
and AggregatorServiceKtTests
. The changes in unit tests
are simple, i.e., adding mocks for both AggregatorServiceTests
and AggregatorServiceKtTests
,
and changing service calls to Kotlin version in AggregatorServiceKtTests
(along with making the mocks to return CompletableFuture
).
This make sures that the functionality of aggregation logic remains the same for both Java and Kotlin.
We also need to adjust the expected execution time in the integration tests into a lower value as we’re using CompletableFuture
for both Java and Kotlin cases. In the previous post, we mentioned that the original Java version has around 1200 ms processing time.
Migrating ProductDescriptionService
decreased the processing time by 200 ms, i.e. around 1000 ms. After migrating the all service calls,
we managed to decrease the processing time further by 400 ms.
The next step is to migrate AggregationService
to Kotlin. This means we adopt the AggregationService
logic into AggregationServiceKt
and start using AggregationServiceKt
in the AggregatorController
controller via useKotlin
flag and in the AggregationServiceKtTests
unit tests.
Changes in both AggregatorController
and AggregationServiceKtTests
are straightforward.
@Service
class AggregatorServiceKt(
private val productCatalogServiceKt: ProductCatalogServiceKt,
private val productDescriptionServiceKt: ProductDescriptionServiceKt,
private val productOfferServiceKt: ProductOfferServiceKt,
private val sellerServiceKt: SellerServiceKt,
private val productReviewServiceKt: ProductReviewServiceKt
) {
@Throws(ProductNotFoundException::class)
fun getProductSummaryForJavaCall(productId: String): CompletableFuture<ProductSummary> = GlobalScope.future { getProductSummary(productId) }
@Throws(ProductNotFoundException::class)
suspend fun getProductSummary(productId: String): ProductSummary = coroutineScope {
val productCatalogDeferred = async { productCatalogServiceKt.getProductCatalog(productId) }
val productCatalog = try {
productCatalogDeferred.await()
} catch (e: Exception) {
logger.error("Error retrieving data for product catalog with productId {}: {}", productId, e.cause?.localizedMessage)
null
}
if (productCatalog == null) throw ProductNotFoundException("Product can't be found!") //simplicity, throw Product Not Found
else
supervisorScope {
val productDescriptionDeferred = async { productDescriptionServiceKt.getProductDescription(productId) }
val productOffersDeferred = async { productOfferServiceKt.getProductOffers(productId) }
val productReviewsDeferred = async { productReviewServiceKt.getProductReviews(productId) }
val productDescription = try{
productDescriptionDeferred.await()
} catch (e: Exception) {
logger.error("Error retrieving data for product description with productId {}: {}", productId, e.cause?.localizedMessage)
null
}
val productReviews = try {
getReviewsAndRating(productReviewsDeferred.await())
} catch (e: Exception) {
logger.error("Error retrieving data for product review with productId {}: {}", productId, e.cause?.localizedMessage)
Pair.of(emptyList(), 0.0)
}
val productOfferAndSellers = try {
productOffersDeferred.await()
.map {
// Note: this is a sequential map, hence it is kind of useless to have coroutine implementation in here
supervisorScope {
val sellerDeferred = async(Dispatchers.IO) { sellerServiceKt.getSeller(it.sellerId) }
try {
val seller = sellerDeferred.await()
ProductOfferAndSeller(it.price, seller?.sellerName)
} catch (e: Exception) {
logger.error("Error retrieving data for seller with sellerId {}: {}", it.sellerId, e.cause?.localizedMessage)
ProductOfferAndSeller(it.price, null)
}
}
}
.toList()
} catch (e: Exception) {
logger.error("Error retrieving data for product offer and seller with productId {}: {}", productId, e.cause?.localizedMessage)
emptyList()
}
ProductSummary(
productId,
productCatalog.productName,
productDescription?.shortDescription,
productDescription?.weightInKg,
productDescription?.color,
productOfferAndSellers,
productReviews.left,
productReviews.right)
}
}
private fun getReviewsAndRating(reviews: List<ProductReview>): Pair<List<String>, Double> {
val allReviews: MutableList<String> = ArrayList()
var rating = 0.0
for (review in reviews) {
allReviews.add(review.reviewNote)
rating += review.star.toDouble()
}
if (reviews.isNotEmpty()) rating /= reviews.size
return Pair.of(allReviews, rating)
}
companion object {
private val logger = LoggerFactory.getLogger(AggregatorServiceKt::class.java)
}
}
In general, concurrent programming is hard. The Kotlin coroutine version is not trivial. The CompletableFuture
version
has a better execution time than the Kotlin coroutine, i.e., 600-650 ms vs 800-900 ms. One reason is that in Kotlin coroutine,
it has a sequential map operation which calls SellerServiceKt.
Making the map operation concurrent (or parallel) is a bit involved. Some ideas:
- Transform the Kotlin list into Java parallel stream. However, this means we operate within the Java realm, and we lose the ability to use Kotlin coroutine.
- Transform the Kotlin list into Kotlin flow. The problem is
that a map operation in Kotlin flow is also sequential. We can use fusing operations such as
channelFlow
,buffer
, etc. - Perform a concurrent loop. It requires us to join two lists: product offers list and sellers list into a single product offer and sellers. It is not a very fluent one.
At this time of writing, we can try the suggestion in kotlinx open issue.
val productOfferAndSellers = try {
productOffersDeferred.await()
.asFlow()
.concurrentMap(this, 5) { //concurrent map
supervisorScope {
val sellerDeferred = async(Dispatchers.IO) { sellerServiceKt.getSeller(it.sellerId) }
try {
val seller = sellerDeferred.await()
ProductOfferAndSeller(it.price, seller?.sellerName)
} catch (e: Exception) {
logger.error("Error retrieving data for seller with sellerId {}: {}", it.sellerId, e.cause?.localizedMessage)
ProductOfferAndSeller(it.price, null)
}
}
}
.toList()
} catch (e: Exception) {
logger.error("Error retrieving data for product offer and seller with productId {}: {}", productId, e.cause?.localizedMessage)
emptyList()
}
The concurrentMap
method is available below.
private fun <T, R> Flow<T>.concurrentMap(scope: CoroutineScope, concurrencyLevel: Int, transform: suspend (T) -> R): Flow<R> = this
.map { scope.async { transform(it) } }
.buffer(concurrencyLevel)
.map { it.await() }
The approach adds one more async
coroutine level and uses a buffer to execute a flow map operation concurrently. It reduces
the execution time by 200 ms.
We still keep our objectives:
- Existing unit tests should remain valid with no/little adjustment.
- Additional unit tests are created to check concurrency.
- Integration tests should remain valid.
A conclusion is that composing Kotlin coroutines are challenging. Using Kotlin flow can help with its useful operator, albeit not as extensive as reactive streams implementation such as rxJava and Spring Reactor.
In the next part, we will discuss moving AggregateController
into Kotlin codebase, removing all Java codes,
and using the event-loop-based server by going completely reactive.
Have fun coding!
Comments