diff --git a/src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity.ts b/src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity.ts index 6af2e63368..d5d377a297 100644 --- a/src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity.ts +++ b/src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity.ts @@ -1,6 +1,7 @@ import { IEntity } from 'src/shared/models/entity'; import { Column, Entity, Index, JoinTable, ManyToOne, OneToMany } from 'typeorm'; import { BuyCrypto } from '../../buy-crypto/process/entities/buy-crypto.entity'; +import { RefReward } from '../../referral/reward/ref-reward.entity'; import { LiquidityManagementOrderStatus, LiquidityManagementPipelineStatus, LiquidityOptimizationType } from '../enums'; import { LiquidityState } from '../interfaces'; import { LiquidityManagementAction } from './liquidity-management-action.entity'; @@ -22,6 +23,9 @@ export class LiquidityManagementPipeline extends IEntity { @OneToMany(() => BuyCrypto, (buyCrypto) => buyCrypto.liquidityPipeline) buyCryptos: BuyCrypto[]; + @OneToMany(() => RefReward, (refReward) => refReward.liquidityPipeline) + refRewards: RefReward[]; + @OneToMany(() => LiquidityManagementOrder, (orders) => orders.pipeline) orders: LiquidityManagementOrder[]; diff --git a/src/subdomains/core/referral/referral.module.ts b/src/subdomains/core/referral/referral.module.ts index 8a02891e92..21dbc95b97 100644 --- a/src/subdomains/core/referral/referral.module.ts +++ b/src/subdomains/core/referral/referral.module.ts @@ -2,6 +2,7 @@ import { forwardRef, Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { BlockchainModule } from 'src/integration/blockchain/blockchain.module'; import { SharedModule } from 'src/shared/shared.module'; +import { LiquidityManagementModule } from 'src/subdomains/core/liquidity-management/liquidity-management.module'; import { UserModule } from 'src/subdomains/generic/user/user.module'; import { DexModule } from 'src/subdomains/supporting/dex/dex.module'; import { NotificationModule } from 'src/subdomains/supporting/notification/notification.module'; @@ -32,6 +33,7 @@ import { RefRewardService } from './reward/services/ref-reward.service'; NotificationModule, PricingModule, TransactionModule, + LiquidityManagementModule, ], controllers: [RefController, RefRewardController], providers: [ diff --git a/src/subdomains/core/referral/reward/ref-reward.entity.ts b/src/subdomains/core/referral/reward/ref-reward.entity.ts index 6cf21c4926..b82dccd82d 100644 --- a/src/subdomains/core/referral/reward/ref-reward.entity.ts +++ b/src/subdomains/core/referral/reward/ref-reward.entity.ts @@ -1,5 +1,6 @@ import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.enum'; import { UpdateResult } from 'src/shared/models/entity'; +import { LiquidityManagementPipeline } from 'src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity'; import { UserData } from 'src/subdomains/generic/user/models/user-data/user-data.entity'; import { User } from 'src/subdomains/generic/user/models/user/user.entity'; import { Transaction } from 'src/subdomains/supporting/payment/entities/transaction.entity'; @@ -40,6 +41,9 @@ export class RefReward extends Reward { @JoinColumn() sourceTransaction?: Transaction; + @ManyToOne(() => LiquidityManagementPipeline, { nullable: true }) + liquidityPipeline?: LiquidityManagementPipeline; + //*** FACTORY METHODS ***// readyToPayout(outputAmount: number): UpdateResult { diff --git a/src/subdomains/core/referral/reward/services/ref-reward-dex.service.ts b/src/subdomains/core/referral/reward/services/ref-reward-dex.service.ts index 110882db41..2c1c7e8b28 100644 --- a/src/subdomains/core/referral/reward/services/ref-reward-dex.service.ts +++ b/src/subdomains/core/referral/reward/services/ref-reward-dex.service.ts @@ -2,14 +2,17 @@ import { Injectable } from '@nestjs/common'; import { Asset } from 'src/shared/models/asset/asset.entity'; import { DfxLogger } from 'src/shared/services/dfx-logger'; import { Util } from 'src/shared/utils/util'; +import { LiquidityManagementPipelineStatus } from 'src/subdomains/core/liquidity-management/enums'; +import { LiquidityManagementService } from 'src/subdomains/core/liquidity-management/services/liquidity-management.service'; import { LiquidityOrderContext } from 'src/subdomains/supporting/dex/entities/liquidity-order.entity'; -import { PurchaseLiquidityRequest, ReserveLiquidityRequest } from 'src/subdomains/supporting/dex/interfaces'; +import { CheckLiquidityRequest, PurchaseLiquidityRequest, ReserveLiquidityRequest } from 'src/subdomains/supporting/dex/interfaces'; import { DexService } from 'src/subdomains/supporting/dex/services/dex.service'; import { PriceCurrency, PriceValidity, PricingService, } from 'src/subdomains/supporting/pricing/services/pricing.service'; +import { In } from 'typeorm'; import { RefReward, RewardStatus } from '../ref-reward.entity'; import { RefRewardRepository } from '../ref-reward.repository'; @@ -27,25 +30,67 @@ export class RefRewardDexService { private readonly refRewardRepo: RefRewardRepository, private readonly dexService: DexService, private readonly priceService: PricingService, + private readonly liquidityService: LiquidityManagementService, ) {} async secureLiquidity(): Promise { + await this.processNewRewards(); + await this.processPendingLiquidityRewards(); + } + + private async processNewRewards(): Promise { const newRefRewards = await this.refRewardRepo.find({ where: { status: RewardStatus.PREPARED }, }); + // Get assets with pending liquidity pipelines to avoid processing them + const pendingLiquidityRewards = await this.refRewardRepo.find({ + where: { status: RewardStatus.PENDING_LIQUIDITY }, + relations: { liquidityPipeline: true }, + }); + const assetsWithPendingPipeline = new Set( + pendingLiquidityRewards + .filter( + (r) => + r.outputAsset && + [LiquidityManagementPipelineStatus.CREATED, LiquidityManagementPipelineStatus.IN_PROGRESS].includes( + r.liquidityPipeline?.status, + ), + ) + .map((r) => r.outputAsset.id), + ); + const groupedRewards = Util.groupByAccessor(newRefRewards, (r) => r.outputAsset.id); for (const rewards of groupedRewards.values()) { try { - // payout asset price const asset = rewards[0].outputAsset; + + // Skip if a pipeline is already running for this asset + if (assetsWithPendingPipeline.has(asset.id)) { + this.logger.verbose(`Skipping ref rewards for ${asset.uniqueName}: pipeline already running`); + continue; + } + const assetPrice = await this.priceService.getPrice(PriceCurrency.EUR, asset, PriceValidity.VALID_ONLY); + // Calculate total output amount for all rewards of this asset + const totalOutputAmount = Util.round(rewards.reduce((sum, r) => sum + assetPrice.convert(r.amountInEur, 8), 0), 8); + + // Check if liquidity is available + const liquidity = await this.dexService.checkLiquidity(this.createCheckLiquidityRequest(asset, totalOutputAmount)); + + if (liquidity.target.availableAmount < totalOutputAmount) { + // Not enough liquidity - start pipeline + await this.startLiquidityPipeline(rewards, asset, totalOutputAmount, liquidity.target.availableAmount); + continue; + } + + // Enough liquidity - process rewards normally for (const reward of rewards) { const outputAmount = assetPrice.convert(reward.amountInEur, 8); - await this.checkLiquidity({ + await this.reserveLiquidity({ amount: outputAmount, asset, rewardId: reward.id.toString(), @@ -59,13 +104,110 @@ export class RefRewardDexService { } } - private async checkLiquidity(request: RefLiquidityRequest): Promise { - const reserveRequest = this.createLiquidityRequest(request); + private async processPendingLiquidityRewards(): Promise { + const pendingRewards = await this.refRewardRepo.find({ + where: { status: RewardStatus.PENDING_LIQUIDITY }, + relations: { liquidityPipeline: true }, + }); + + // Reset rewards without pipeline to PREPARED (pipeline creation failed) + const rewardsWithoutPipeline = pendingRewards.filter((r) => !r.liquidityPipeline); + if (rewardsWithoutPipeline.length) { + this.logger.info(`Resetting ${rewardsWithoutPipeline.length} ref rewards without pipeline to PREPARED`); + await this.refRewardRepo.update( + { id: In(rewardsWithoutPipeline.map((r) => r.id)) }, + { status: RewardStatus.PREPARED }, + ); + } + + const rewardsWithPipeline = pendingRewards.filter((r) => r.liquidityPipeline); + const groupedRewards = Util.groupByAccessor(rewardsWithPipeline, (r) => r.liquidityPipeline.id); + + for (const rewards of groupedRewards.values()) { + try { + const pipeline = rewards[0].liquidityPipeline; + + // Pipeline failed/stopped: Reset to PREPARED for retry + if ([LiquidityManagementPipelineStatus.FAILED, LiquidityManagementPipelineStatus.STOPPED].includes(pipeline.status)) { + this.logger.info(`Pipeline ${pipeline.id} failed/stopped, resetting ref rewards to PREPARED`); + await this.refRewardRepo.update( + { id: In(rewards.map((r) => r.id)) }, + { status: RewardStatus.PREPARED, liquidityPipeline: null }, + ); + continue; + } + + // Pipeline not complete yet + if (pipeline.status !== LiquidityManagementPipelineStatus.COMPLETE) { + continue; + } + + // Pipeline complete - process rewards with current price + const asset = rewards[0].outputAsset; + const assetPrice = await this.priceService.getPrice(PriceCurrency.EUR, asset, PriceValidity.VALID_ONLY); + + for (const reward of rewards) { + const outputAmount = assetPrice.convert(reward.amountInEur, 8); + + await this.reserveLiquidity({ + amount: outputAmount, + asset, + rewardId: reward.id.toString(), + }); + + await this.refRewardRepo.update(...reward.readyToPayout(outputAmount)); + } + } catch (e) { + this.logger.error(`Error in processing pending liquidity ref rewards:`, e); + } + } + } + + private async startLiquidityPipeline( + rewards: RefReward[], + asset: Asset, + totalAmount: number, + availableAmount: number, + ): Promise { + const deficit = Util.round(totalAmount - availableAmount, 8); + + // Set status BEFORE pipeline attempt (consistent with BuyCrypto) + await this.refRewardRepo.update( + { id: In(rewards.map((r) => r.id)) }, + { status: RewardStatus.PENDING_LIQUIDITY }, + ); + + try { + const pipeline = await this.liquidityService.buyLiquidity(asset.id, deficit, deficit, true); + this.logger.info(`Started liquidity pipeline ${pipeline.id} for ref rewards (deficit: ${deficit} ${asset.uniqueName})`); + + await this.refRewardRepo.update( + { id: In(rewards.map((r) => r.id)) }, + { liquidityPipeline: pipeline }, + ); + } catch (e) { + this.logger.error(`Failed to start liquidity pipeline for ref rewards (${asset.uniqueName}):`, e); + // Status remains PENDING_LIQUIDITY without pipeline - will be reset to PREPARED in processPendingLiquidityRewards() + } + } + + private async reserveLiquidity(request: RefLiquidityRequest): Promise { + const reserveRequest = this.createReserveLiquidityRequest(request); return this.dexService.reserveLiquidity(reserveRequest); } - private createLiquidityRequest(request: RefLiquidityRequest): PurchaseLiquidityRequest | ReserveLiquidityRequest { + private createCheckLiquidityRequest(asset: Asset, amount: number): CheckLiquidityRequest { + return { + context: LiquidityOrderContext.REF_PAYOUT, + correlationId: 'ref-liquidity-check', + referenceAsset: asset, + referenceAmount: amount, + targetAsset: asset, + }; + } + + private createReserveLiquidityRequest(request: RefLiquidityRequest): PurchaseLiquidityRequest | ReserveLiquidityRequest { return { context: LiquidityOrderContext.REF_PAYOUT, correlationId: request.rewardId,