Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IEntity } from 'src/shared/models/entity';
import { Column, Entity, Index, JoinTable, ManyToOne, OneToMany } from 'typeorm';
import { BuyCrypto } from '../../buy-crypto/process/entities/buy-crypto.entity';
import { RefReward } from '../../referral/reward/ref-reward.entity';
import { LiquidityManagementOrderStatus, LiquidityManagementPipelineStatus, LiquidityOptimizationType } from '../enums';
import { LiquidityState } from '../interfaces';
import { LiquidityManagementAction } from './liquidity-management-action.entity';
Expand All @@ -22,6 +23,9 @@ export class LiquidityManagementPipeline extends IEntity {
@OneToMany(() => BuyCrypto, (buyCrypto) => buyCrypto.liquidityPipeline)
buyCryptos: BuyCrypto[];

@OneToMany(() => RefReward, (refReward) => refReward.liquidityPipeline)
refRewards: RefReward[];

@OneToMany(() => LiquidityManagementOrder, (orders) => orders.pipeline)
orders: LiquidityManagementOrder[];

Expand Down
2 changes: 2 additions & 0 deletions src/subdomains/core/referral/referral.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { forwardRef, Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BlockchainModule } from 'src/integration/blockchain/blockchain.module';
import { SharedModule } from 'src/shared/shared.module';
import { LiquidityManagementModule } from 'src/subdomains/core/liquidity-management/liquidity-management.module';
import { UserModule } from 'src/subdomains/generic/user/user.module';
import { DexModule } from 'src/subdomains/supporting/dex/dex.module';
import { NotificationModule } from 'src/subdomains/supporting/notification/notification.module';
Expand Down Expand Up @@ -32,6 +33,7 @@ import { RefRewardService } from './reward/services/ref-reward.service';
NotificationModule,
PricingModule,
TransactionModule,
LiquidityManagementModule,
],
controllers: [RefController, RefRewardController],
providers: [
Expand Down
4 changes: 4 additions & 0 deletions src/subdomains/core/referral/reward/ref-reward.entity.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.enum';
import { UpdateResult } from 'src/shared/models/entity';
import { LiquidityManagementPipeline } from 'src/subdomains/core/liquidity-management/entities/liquidity-management-pipeline.entity';
import { UserData } from 'src/subdomains/generic/user/models/user-data/user-data.entity';
import { User } from 'src/subdomains/generic/user/models/user/user.entity';
import { Transaction } from 'src/subdomains/supporting/payment/entities/transaction.entity';
Expand Down Expand Up @@ -40,6 +41,9 @@ export class RefReward extends Reward {
@JoinColumn()
sourceTransaction?: Transaction;

@ManyToOne(() => LiquidityManagementPipeline, { nullable: true })
liquidityPipeline?: LiquidityManagementPipeline;

//*** FACTORY METHODS ***//

readyToPayout(outputAmount: number): UpdateResult<RefReward> {
Expand Down
154 changes: 148 additions & 6 deletions src/subdomains/core/referral/reward/services/ref-reward-dex.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ import { Injectable } from '@nestjs/common';
import { Asset } from 'src/shared/models/asset/asset.entity';
import { DfxLogger } from 'src/shared/services/dfx-logger';
import { Util } from 'src/shared/utils/util';
import { LiquidityManagementPipelineStatus } from 'src/subdomains/core/liquidity-management/enums';
import { LiquidityManagementService } from 'src/subdomains/core/liquidity-management/services/liquidity-management.service';
import { LiquidityOrderContext } from 'src/subdomains/supporting/dex/entities/liquidity-order.entity';
import { PurchaseLiquidityRequest, ReserveLiquidityRequest } from 'src/subdomains/supporting/dex/interfaces';
import { CheckLiquidityRequest, PurchaseLiquidityRequest, ReserveLiquidityRequest } from 'src/subdomains/supporting/dex/interfaces';
import { DexService } from 'src/subdomains/supporting/dex/services/dex.service';
import {
PriceCurrency,
PriceValidity,
PricingService,
} from 'src/subdomains/supporting/pricing/services/pricing.service';
import { In } from 'typeorm';
import { RefReward, RewardStatus } from '../ref-reward.entity';
import { RefRewardRepository } from '../ref-reward.repository';

Expand All @@ -27,25 +30,67 @@ export class RefRewardDexService {
private readonly refRewardRepo: RefRewardRepository,
private readonly dexService: DexService,
private readonly priceService: PricingService,
private readonly liquidityService: LiquidityManagementService,
) {}

async secureLiquidity(): Promise<void> {
await this.processNewRewards();
await this.processPendingLiquidityRewards();
}

private async processNewRewards(): Promise<void> {
const newRefRewards = await this.refRewardRepo.find({
where: { status: RewardStatus.PREPARED },
});

// Get assets with pending liquidity pipelines to avoid processing them
const pendingLiquidityRewards = await this.refRewardRepo.find({
where: { status: RewardStatus.PENDING_LIQUIDITY },
relations: { liquidityPipeline: true },
});
const assetsWithPendingPipeline = new Set(
pendingLiquidityRewards
.filter(
(r) =>
r.outputAsset &&
[LiquidityManagementPipelineStatus.CREATED, LiquidityManagementPipelineStatus.IN_PROGRESS].includes(
r.liquidityPipeline?.status,
),
)
.map((r) => r.outputAsset.id),
);

const groupedRewards = Util.groupByAccessor<RefReward, number>(newRefRewards, (r) => r.outputAsset.id);

for (const rewards of groupedRewards.values()) {
try {
// payout asset price
const asset = rewards[0].outputAsset;

// Skip if a pipeline is already running for this asset
if (assetsWithPendingPipeline.has(asset.id)) {
this.logger.verbose(`Skipping ref rewards for ${asset.uniqueName}: pipeline already running`);
continue;
}

const assetPrice = await this.priceService.getPrice(PriceCurrency.EUR, asset, PriceValidity.VALID_ONLY);

// Calculate total output amount for all rewards of this asset
const totalOutputAmount = Util.round(rewards.reduce((sum, r) => sum + assetPrice.convert(r.amountInEur, 8), 0), 8);

// Check if liquidity is available
const liquidity = await this.dexService.checkLiquidity(this.createCheckLiquidityRequest(asset, totalOutputAmount));

if (liquidity.target.availableAmount < totalOutputAmount) {
// Not enough liquidity - start pipeline
await this.startLiquidityPipeline(rewards, asset, totalOutputAmount, liquidity.target.availableAmount);
continue;
}

// Enough liquidity - process rewards normally
for (const reward of rewards) {
const outputAmount = assetPrice.convert(reward.amountInEur, 8);

await this.checkLiquidity({
await this.reserveLiquidity({
amount: outputAmount,
asset,
rewardId: reward.id.toString(),
Expand All @@ -59,13 +104,110 @@ export class RefRewardDexService {
}
}

private async checkLiquidity(request: RefLiquidityRequest): Promise<number> {
const reserveRequest = this.createLiquidityRequest(request);
private async processPendingLiquidityRewards(): Promise<void> {
const pendingRewards = await this.refRewardRepo.find({
where: { status: RewardStatus.PENDING_LIQUIDITY },
relations: { liquidityPipeline: true },
});

// Reset rewards without pipeline to PREPARED (pipeline creation failed)
const rewardsWithoutPipeline = pendingRewards.filter((r) => !r.liquidityPipeline);
if (rewardsWithoutPipeline.length) {
this.logger.info(`Resetting ${rewardsWithoutPipeline.length} ref rewards without pipeline to PREPARED`);
await this.refRewardRepo.update(
{ id: In(rewardsWithoutPipeline.map((r) => r.id)) },
{ status: RewardStatus.PREPARED },
);
}

const rewardsWithPipeline = pendingRewards.filter((r) => r.liquidityPipeline);
const groupedRewards = Util.groupByAccessor<RefReward, number>(rewardsWithPipeline, (r) => r.liquidityPipeline.id);

for (const rewards of groupedRewards.values()) {
try {
const pipeline = rewards[0].liquidityPipeline;

// Pipeline failed/stopped: Reset to PREPARED for retry
if ([LiquidityManagementPipelineStatus.FAILED, LiquidityManagementPipelineStatus.STOPPED].includes(pipeline.status)) {
this.logger.info(`Pipeline ${pipeline.id} failed/stopped, resetting ref rewards to PREPARED`);
await this.refRewardRepo.update(
{ id: In(rewards.map((r) => r.id)) },
{ status: RewardStatus.PREPARED, liquidityPipeline: null },
);
continue;
}

// Pipeline not complete yet
if (pipeline.status !== LiquidityManagementPipelineStatus.COMPLETE) {
continue;
}

// Pipeline complete - process rewards with current price
const asset = rewards[0].outputAsset;
const assetPrice = await this.priceService.getPrice(PriceCurrency.EUR, asset, PriceValidity.VALID_ONLY);

for (const reward of rewards) {
const outputAmount = assetPrice.convert(reward.amountInEur, 8);

await this.reserveLiquidity({
amount: outputAmount,
asset,
rewardId: reward.id.toString(),
});

await this.refRewardRepo.update(...reward.readyToPayout(outputAmount));
}
} catch (e) {
this.logger.error(`Error in processing pending liquidity ref rewards:`, e);
}
}
}

private async startLiquidityPipeline(
rewards: RefReward[],
asset: Asset,
totalAmount: number,
availableAmount: number,
): Promise<void> {
const deficit = Util.round(totalAmount - availableAmount, 8);

// Set status BEFORE pipeline attempt (consistent with BuyCrypto)
await this.refRewardRepo.update(
{ id: In(rewards.map((r) => r.id)) },
{ status: RewardStatus.PENDING_LIQUIDITY },
);

try {
const pipeline = await this.liquidityService.buyLiquidity(asset.id, deficit, deficit, true);
this.logger.info(`Started liquidity pipeline ${pipeline.id} for ref rewards (deficit: ${deficit} ${asset.uniqueName})`);

await this.refRewardRepo.update(
{ id: In(rewards.map((r) => r.id)) },
{ liquidityPipeline: pipeline },
);
} catch (e) {
this.logger.error(`Failed to start liquidity pipeline for ref rewards (${asset.uniqueName}):`, e);
// Status remains PENDING_LIQUIDITY without pipeline - will be reset to PREPARED in processPendingLiquidityRewards()
}
}

private async reserveLiquidity(request: RefLiquidityRequest): Promise<number> {
const reserveRequest = this.createReserveLiquidityRequest(request);

return this.dexService.reserveLiquidity(reserveRequest);
}

private createLiquidityRequest(request: RefLiquidityRequest): PurchaseLiquidityRequest | ReserveLiquidityRequest {
private createCheckLiquidityRequest(asset: Asset, amount: number): CheckLiquidityRequest {
return {
context: LiquidityOrderContext.REF_PAYOUT,
correlationId: 'ref-liquidity-check',
referenceAsset: asset,
referenceAmount: amount,
targetAsset: asset,
};
}

private createReserveLiquidityRequest(request: RefLiquidityRequest): PurchaseLiquidityRequest | ReserveLiquidityRequest {
return {
context: LiquidityOrderContext.REF_PAYOUT,
correlationId: request.rewardId,
Expand Down