From 4c75e9571dc14d6d330fe05f35357122ff359c27 Mon Sep 17 00:00:00 2001 From: TaprootFreak <142087526+TaprootFreak@users.noreply.github.com> Date: Fri, 19 Dec 2025 10:52:45 +0100 Subject: [PATCH 1/2] feat(liquidity): Add event-based pipeline with WebSocket order monitoring - Add ExchangeWebSocketService for real-time order updates via CCXT Pro - Supports Kraken, Binance, MEXC, XT exchanges - Automatic connection management and cleanup - Add OrderCompletionService for centralized order completion handling - WebSocket primary with 5-second polling fallback - 5-minute cron as safety net - RxJS event stream for immediate pipeline continuation - Update LiquidityManagementPipelineService - Subscribe to order completion events - Immediate pipeline continuation on order completion - Atomic pipeline transitions with pessimistic locks - Add atomic repository methods - tryClaimForExecution: Prevents duplicate order execution - tryContinuePipeline: Race-condition-safe pipeline continuation - saveExecutionResult: Persists all execution fields for WebSocket Performance improvement: - Before: Up to 60s delay between order completion and next order - After: Near-instant continuation (~0ms with WebSocket, ~5s with polling) --- src/integration/exchange/exchange.module.ts | 3 + .../services/exchange-websocket.service.ts | 229 +++++++++++++ .../liquidity-management.module.ts | 2 + .../liquidity-management-order.repository.ts | 50 +++ ...iquidity-management-pipeline.repository.ts | 37 ++ .../liquidity-management-pipeline.service.ts | 268 ++++++++++----- .../services/order-completion.service.ts | 321 ++++++++++++++++++ 7 files changed, 828 insertions(+), 82 deletions(-) create mode 100644 src/integration/exchange/services/exchange-websocket.service.ts create mode 100644 src/subdomains/core/liquidity-management/services/order-completion.service.ts diff --git a/src/integration/exchange/exchange.module.ts b/src/integration/exchange/exchange.module.ts index 7d33e8e338..75860383c1 100644 --- a/src/integration/exchange/exchange.module.ts +++ b/src/integration/exchange/exchange.module.ts @@ -9,6 +9,7 @@ import { BinanceService } from './services/binance.service'; import { BitstampService } from './services/bitstamp.service'; import { ExchangeRegistryService } from './services/exchange-registry.service'; import { ExchangeTxService } from './services/exchange-tx.service'; +import { ExchangeWebSocketService } from './services/exchange-websocket.service'; import { KrakenService } from './services/kraken.service'; import { KucoinService } from './services/kucoin.service'; import { MexcService } from './services/mexc.service'; @@ -30,6 +31,7 @@ import { XtService } from './services/xt.service'; XtService, MexcService, ScryptService, + ExchangeWebSocketService, // P2BService, ], exports: [ @@ -42,6 +44,7 @@ import { XtService } from './services/xt.service'; XtService, MexcService, ScryptService, + ExchangeWebSocketService, ], }) export class ExchangeModule {} diff --git a/src/integration/exchange/services/exchange-websocket.service.ts b/src/integration/exchange/services/exchange-websocket.service.ts new file mode 100644 index 0000000000..235f2e5715 --- /dev/null +++ b/src/integration/exchange/services/exchange-websocket.service.ts @@ -0,0 +1,229 @@ +import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; +import { Exchange, Order, pro } from 'ccxt'; +import { GetConfig } from 'src/config/config'; +import { DfxLogger } from 'src/shared/services/dfx-logger'; + +export interface OrderUpdateEvent { + exchangeName: string; + orderId: string; + symbol: string; + status: 'open' | 'closed' | 'canceled'; + filled: number; + remaining: number; + cost: number; + amount: number; + price: number; +} + +type OrderCallback = (event: OrderUpdateEvent) => void; + +interface WatchedOrder { + orderId: string; + symbol: string; + callback: OrderCallback; +} + +// Pro exchange type with WebSocket methods +type ProExchange = Exchange & { + watchOrders(symbol?: string, since?: number, limit?: number, params?: Record): Promise; + close(): Promise; +}; + +interface ExchangeConnection { + exchange: ProExchange; + watchedOrders: Map; + isWatching: boolean; + watchPromise?: Promise; +} + +@Injectable() +export class ExchangeWebSocketService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new DfxLogger(ExchangeWebSocketService); + + private readonly connections = new Map(); + private isShuttingDown = false; + + onModuleInit(): void { + this.initializeExchanges(); + } + + async onModuleDestroy(): Promise { + this.isShuttingDown = true; + await this.closeAllConnections(); + } + + /** + * Watch an order for completion. Returns unsubscribe function. + */ + watchOrder(exchangeName: string, orderId: string, symbol: string, callback: OrderCallback): () => void { + const connection = this.connections.get(exchangeName); + if (!connection) { + this.logger.warn(`Exchange ${exchangeName} not supported for WebSocket watching`); + return () => {}; + } + + const watchKey = `${orderId}:${symbol}`; + + // Add to watched orders + connection.watchedOrders.set(watchKey, { orderId, symbol, callback }); + this.logger.verbose(`Started watching order ${orderId} on ${exchangeName}`); + + // Start watching if not already + if (!connection.isWatching) { + this.startWatching(exchangeName, connection); + } + + // Return unsubscribe function + return () => { + connection.watchedOrders.delete(watchKey); + this.logger.verbose(`Stopped watching order ${orderId} on ${exchangeName}`); + + // Stop watching if no more orders + if (connection.watchedOrders.size === 0) { + connection.isWatching = false; + } + }; + } + + /** + * Check if an exchange supports WebSocket order watching. + */ + supportsWebSocket(exchangeName: string): boolean { + return this.connections.has(exchangeName); + } + + // --- Private Methods --- // + + private initializeExchanges(): void { + const config = GetConfig(); + + // Initialize Kraken + if (config.kraken?.apiKey) { + this.initializeExchange('Kraken', pro.kraken as unknown as new (c: Record) => ProExchange, config.kraken); + } + + // Initialize Binance + if (config.binance?.apiKey) { + this.initializeExchange('Binance', pro.binance as unknown as new (c: Record) => ProExchange, config.binance); + } + + // Initialize MEXC + if (config.mexc?.apiKey) { + this.initializeExchange('MEXC', pro.mexc as unknown as new (c: Record) => ProExchange, config.mexc); + } + + // Initialize XT + if (config.xt?.apiKey) { + this.initializeExchange('XT', pro.xt as unknown as new (c: Record) => ProExchange, config.xt); + } + + this.logger.info(`Initialized WebSocket connections for: ${Array.from(this.connections.keys()).join(', ')}`); + } + + private initializeExchange( + name: string, + ExchangeClass: new (config: Record) => ProExchange, + config: Record, + ): void { + try { + const exchange = new ExchangeClass({ + ...config, + enableRateLimit: true, + options: { + ordersLimit: 100, // Cache limit for orders + }, + }); + + this.connections.set(name, { + exchange, + watchedOrders: new Map(), + isWatching: false, + }); + } catch (e) { + this.logger.error(`Failed to initialize ${name} WebSocket:`, e); + } + } + + private startWatching(exchangeName: string, connection: ExchangeConnection): void { + if (connection.isWatching) return; + + connection.isWatching = true; + connection.watchPromise = this.watchOrdersLoop(exchangeName, connection); + } + + private async watchOrdersLoop(exchangeName: string, connection: ExchangeConnection): Promise { + const { exchange } = connection; + + while (connection.isWatching && !this.isShuttingDown) { + try { + if (connection.watchedOrders.size === 0) { + // No orders to watch, wait and check again + await this.sleep(1000); + continue; + } + + // Watch all orders (no symbol filter) - CCXT Pro handles multiplexing + // This is more efficient than iterating symbols sequentially + const orders = await exchange.watchOrders(); + this.processOrderUpdates(exchangeName, connection, orders); + } catch (e) { + if (!this.isShuttingDown) { + this.logger.error(`Error in watchOrdersLoop for ${exchangeName}:`, e); + await this.sleep(5000); // Wait before retry + } + } + } + } + + private processOrderUpdates(exchangeName: string, connection: ExchangeConnection, orders: Order[]): void { + for (const order of orders) { + const watchKey = `${order.id}:${order.symbol}`; + const watched = connection.watchedOrders.get(watchKey); + + if (watched) { + const event: OrderUpdateEvent = { + exchangeName, + orderId: order.id, + symbol: order.symbol, + status: order.status as 'open' | 'closed' | 'canceled', + filled: order.filled, + remaining: order.remaining, + cost: order.cost, + amount: order.amount, + price: order.price ?? order.average, + }; + + this.logger.verbose(`Order update for ${order.id} on ${exchangeName}: ${order.status}`); + + try { + watched.callback(event); + } catch (e) { + this.logger.error(`Error in order callback for ${order.id}:`, e); + } + + // Remove from watched if completed + if (order.status === 'closed' || order.status === 'canceled') { + connection.watchedOrders.delete(watchKey); + this.logger.verbose(`Order ${order.id} completed, removed from watch list`); + } + } + } + } + + private async closeAllConnections(): Promise { + for (const [name, connection] of this.connections) { + try { + connection.isWatching = false; + await connection.exchange.close(); + this.logger.verbose(`Closed WebSocket connection for ${name}`); + } catch (e) { + this.logger.error(`Error closing ${name} WebSocket:`, e); + } + } + this.connections.clear(); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/src/subdomains/core/liquidity-management/liquidity-management.module.ts b/src/subdomains/core/liquidity-management/liquidity-management.module.ts index be7b3bb801..8fede00718 100644 --- a/src/subdomains/core/liquidity-management/liquidity-management.module.ts +++ b/src/subdomains/core/liquidity-management/liquidity-management.module.ts @@ -47,6 +47,7 @@ import { LiquidityManagementBalanceService } from './services/liquidity-manageme import { LiquidityManagementPipelineService } from './services/liquidity-management-pipeline.service'; import { LiquidityManagementRuleService } from './services/liquidity-management-rule.service'; import { LiquidityManagementService } from './services/liquidity-management.service'; +import { OrderCompletionService } from './services/order-completion.service'; @Module({ imports: [ @@ -84,6 +85,7 @@ import { LiquidityManagementService } from './services/liquidity-management.serv LiquidityManagementRuleService, LiquidityManagementPipelineService, LiquidityManagementBalanceService, + OrderCompletionService, LiquidityActionIntegrationFactory, LiquidityBalanceIntegrationFactory, BlockchainAdapter, diff --git a/src/subdomains/core/liquidity-management/repositories/liquidity-management-order.repository.ts b/src/subdomains/core/liquidity-management/repositories/liquidity-management-order.repository.ts index 20dbcfbb8a..9c5220bf8b 100644 --- a/src/subdomains/core/liquidity-management/repositories/liquidity-management-order.repository.ts +++ b/src/subdomains/core/liquidity-management/repositories/liquidity-management-order.repository.ts @@ -2,10 +2,60 @@ import { Injectable } from '@nestjs/common'; import { BaseRepository } from 'src/shared/repositories/base.repository'; import { EntityManager } from 'typeorm'; import { LiquidityManagementOrder } from '../entities/liquidity-management-order.entity'; +import { LiquidityManagementOrderStatus } from '../enums'; @Injectable() export class LiquidityManagementOrderRepository extends BaseRepository { constructor(manager: EntityManager) { super(LiquidityManagementOrder, manager); } + + /** + * Atomically tries to claim an order for execution by transitioning from CREATED to IN_PROGRESS. + * Uses UPDATE with WHERE clause to prevent race conditions - only one caller can succeed. + * @returns true if this call successfully acquired the lock, false if already taken + */ + async tryClaimForExecution(orderId: number): Promise { + const result = await this.createQueryBuilder() + .update(LiquidityManagementOrder) + .set({ + status: LiquidityManagementOrderStatus.IN_PROGRESS, + }) + .where('id = :id AND status = :status', { + id: orderId, + status: LiquidityManagementOrderStatus.CREATED, + }) + .execute(); + + return (result.affected ?? 0) > 0; + } + + /** + * Updates all execution result fields after successful exchange execution. + * This includes correlationId, inputAsset, outputAsset, and inputAmount. + */ + async saveExecutionResult( + orderId: number, + correlationId: string, + inputAsset: string, + outputAsset: string, + inputAmount: number, + ): Promise { + await this.update(orderId, { correlationId, inputAsset, outputAsset, inputAmount }); + } + + /** + * @deprecated Use saveExecutionResult instead to save all execution fields + */ + async setCorrelationId(orderId: number, correlationId: string): Promise { + await this.update(orderId, { correlationId }); + } + + /** + * Reverts an order back to CREATED if execution failed unexpectedly. + * This allows the order to be retried by the next cron run. + */ + async revertToCREATED(orderId: number): Promise { + await this.update(orderId, { status: LiquidityManagementOrderStatus.CREATED }); + } } diff --git a/src/subdomains/core/liquidity-management/repositories/liquidity-management-pipeline.repository.ts b/src/subdomains/core/liquidity-management/repositories/liquidity-management-pipeline.repository.ts index 37deff4833..b4f3ecac14 100644 --- a/src/subdomains/core/liquidity-management/repositories/liquidity-management-pipeline.repository.ts +++ b/src/subdomains/core/liquidity-management/repositories/liquidity-management-pipeline.repository.ts @@ -2,10 +2,47 @@ import { Injectable } from '@nestjs/common'; import { BaseRepository } from 'src/shared/repositories/base.repository'; import { EntityManager } from 'typeorm'; import { LiquidityManagementPipeline } from '../entities/liquidity-management-pipeline.entity'; +import { LiquidityManagementOrderStatus, LiquidityManagementPipelineStatus } from '../enums'; @Injectable() export class LiquidityManagementPipelineRepository extends BaseRepository { constructor(manager: EntityManager) { super(LiquidityManagementPipeline, manager); } + + /** + * Atomically continues a pipeline within a transaction with pessimistic lock. + * This prevents race conditions between event handlers and cron jobs. + * @returns the updated pipeline, or null if pipeline not found or not in progress + */ + async tryContinuePipeline( + pipelineId: number, + lastOrderStatus: LiquidityManagementOrderStatus, + ): Promise { + return this.manager.transaction(async (transactionalManager) => { + // Lock the pipeline row for this transaction + const pipeline = await transactionalManager + .createQueryBuilder(LiquidityManagementPipeline, 'p') + .setLock('pessimistic_write') + .leftJoinAndSelect('p.currentAction', 'currentAction') + .leftJoinAndSelect('currentAction.onSuccess', 'onSuccess') + .leftJoinAndSelect('currentAction.onFail', 'onFail') + .leftJoinAndSelect('p.rule', 'rule') + .where('p.id = :id AND p.status = :status', { + id: pipelineId, + status: LiquidityManagementPipelineStatus.IN_PROGRESS, + }) + .getOne(); + + if (!pipeline) { + return null; // Pipeline not found or already completed/failed + } + + // Continue the pipeline (state machine logic) + pipeline.continue(lastOrderStatus); + + // Save within the same transaction + return transactionalManager.save(pipeline); + }); + } } diff --git a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts index dbee9721d9..13729aa864 100644 --- a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts +++ b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts @@ -1,5 +1,6 @@ -import { Injectable, NotFoundException } from '@nestjs/common'; +import { Injectable, NotFoundException, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; import { CronExpression } from '@nestjs/schedule'; +import { Subscription } from 'rxjs'; import { DfxLogger } from 'src/shared/services/dfx-logger'; import { Process } from 'src/shared/services/process.service'; import { DfxCron } from 'src/shared/utils/cron'; @@ -17,24 +18,44 @@ import { LiquidityActionIntegrationFactory } from '../factories/liquidity-action import { LiquidityManagementOrderRepository } from '../repositories/liquidity-management-order.repository'; import { LiquidityManagementPipelineRepository } from '../repositories/liquidity-management-pipeline.repository'; import { LiquidityManagementRuleRepository } from '../repositories/liquidity-management-rule.repository'; +import { OrderCompletionEvent, OrderCompletionService } from './order-completion.service'; @Injectable() -export class LiquidityManagementPipelineService { +export class LiquidityManagementPipelineService implements OnModuleInit, OnModuleDestroy { private readonly logger = new DfxLogger(LiquidityManagementPipelineService); + private orderCompletionSubscription: Subscription; + constructor( private readonly ruleRepo: LiquidityManagementRuleRepository, private readonly orderRepo: LiquidityManagementOrderRepository, private readonly pipelineRepo: LiquidityManagementPipelineRepository, private readonly actionIntegrationFactory: LiquidityActionIntegrationFactory, private readonly notificationService: NotificationService, + private readonly orderCompletionService: OrderCompletionService, ) {} + // Subscribe to order completion events for immediate pipeline continuation + onModuleInit(): void { + this.orderCompletionSubscription = this.orderCompletionService.orderCompleted$.subscribe((event) => + this.onOrderCompleted(event), + ); + } + + // Cleanup subscription to prevent memory leaks + onModuleDestroy(): void { + this.orderCompletionSubscription?.unsubscribe(); + } + //*** JOBS ***// + /** + * Main cron job - now serves as fallback since order completion is event-driven. + * The primary flow is: OrderCompletionService detects completion → emits event → onOrderCompleted() continues pipeline + */ @DfxCron(CronExpression.EVERY_MINUTE, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 }) - async processPipelines() { - await this.checkRunningOrders(); + async processPipelines(): Promise { + // Note: checkRunningOrders is now handled by OrderCompletionService await this.startNewPipelines(); let hasWaitingOrders = true; @@ -45,6 +66,85 @@ export class LiquidityManagementPipelineService { } } + /** + * Event handler for order completion - immediately continues the pipeline. + * Uses atomic transaction with pessimistic lock to prevent race conditions with cron. + */ + private async onOrderCompleted(event: OrderCompletionEvent): Promise { + try { + this.logger.verbose(`Order ${event.orderId} completed, continuing pipeline ${event.pipelineId}`); + + // Atomically continue the pipeline (uses transaction + pessimistic lock) + const pipeline = await this.pipelineRepo.tryContinuePipeline(event.pipelineId, event.status); + + if (!pipeline) { + // Pipeline was already handled by cron or another event + this.logger.verbose(`Pipeline ${event.pipelineId} not found or already processed`); + return; + } + + // Get the last order for completion/failure handling + const lastOrder = await this.orderRepo.findOne({ + where: { pipeline: { id: pipeline.id } }, + order: { id: 'DESC' }, + }); + + if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) { + await this.handlePipelineCompletion(pipeline); + return; + } + + if ( + [LiquidityManagementPipelineStatus.FAILED, LiquidityManagementPipelineStatus.STOPPED].includes(pipeline.status) + ) { + if (!lastOrder) { + this.logger.error(`Pipeline ${pipeline.id} failed but no order found - data inconsistency`); + } + await this.handlePipelineFail(pipeline, lastOrder ?? null); + return; + } + + // Place and execute next order immediately + this.logger.verbose(`Immediately continuing pipeline ${pipeline.id} with action ${pipeline.currentAction?.id}`); + await this.placeLiquidityOrder(pipeline, lastOrder); + + // Find and execute the newly created order + const newOrder = await this.orderRepo.findOne({ + where: { pipeline: { id: pipeline.id }, status: LiquidityManagementOrderStatus.CREATED }, + order: { id: 'DESC' }, + }); + + if (newOrder) { + try { + await this.executeOrder(newOrder); + } catch (e) { + // Handle execution errors same as in startNewOrders + // Important: Emit completion event to continue pipeline immediately + if (e instanceof OrderNotNecessaryException) { + newOrder.complete(); + await this.orderRepo.save(newOrder); + this.orderCompletionService.emitOrderCompletion(newOrder, LiquidityManagementOrderStatus.COMPLETE); + } else if (e instanceof OrderNotProcessableException) { + newOrder.notProcessable(e); + await this.orderRepo.save(newOrder); + this.orderCompletionService.emitOrderCompletion(newOrder, LiquidityManagementOrderStatus.NOT_PROCESSABLE); + } else if (e instanceof OrderFailedException) { + newOrder.fail(e); + await this.orderRepo.save(newOrder); + this.orderCompletionService.emitOrderCompletion(newOrder, LiquidityManagementOrderStatus.FAILED); + } else { + // Unexpected error - log and don't emit event (will be retried by cron) + this.logger.error(`Unexpected error executing order ${newOrder.id}:`, e); + return; + } + this.logger.info(`Order ${newOrder.id} completed immediately with: ${e.message}`); + } + } + } catch (e) { + this.logger.error(`Error handling order completion for pipeline ${event.pipelineId}:`, e); + } + } + //*** PUBLIC API ***// async getProcessingPipelines(): Promise { @@ -99,55 +199,64 @@ export class LiquidityManagementPipelineService { } private async checkRunningPipelines(): Promise { - const runningPipelines = await this.pipelineRepo.find({ - where: { status: LiquidityManagementPipelineStatus.IN_PROGRESS }, - relations: { currentAction: { onSuccess: true, onFail: true } }, + // Get list of running pipelines (without lock - just for iteration) + const runningPipelines = await this.pipelineRepo.findBy({ + status: LiquidityManagementPipelineStatus.IN_PROGRESS, }); - for (const pipeline of runningPipelines) { + for (const pipelineRef of runningPipelines) { try { const lastOrder = await this.orderRepo.findOne({ - where: { pipeline: { id: pipeline.id } }, + where: { pipeline: { id: pipelineRef.id } }, order: { id: 'DESC' }, }); - // check running order - if (lastOrder) { - if ( - lastOrder.status === LiquidityManagementOrderStatus.COMPLETE || - lastOrder.status === LiquidityManagementOrderStatus.FAILED || - lastOrder.status === LiquidityManagementOrderStatus.NOT_PROCESSABLE - ) { - pipeline.continue(lastOrder.status); - await this.pipelineRepo.save(pipeline); - - if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) { - await this.handlePipelineCompletion(pipeline); - continue; - } - - if ( - [LiquidityManagementPipelineStatus.FAILED, LiquidityManagementPipelineStatus.STOPPED].includes( - pipeline.status, - ) - ) { - await this.handlePipelineFail(pipeline, lastOrder); - continue; - } - } else { - // order still running - continue; - } + // Check if order is in terminal state + if (!lastOrder) { + continue; + } + + if ( + ![ + LiquidityManagementOrderStatus.COMPLETE, + LiquidityManagementOrderStatus.FAILED, + LiquidityManagementOrderStatus.NOT_PROCESSABLE, + ].includes(lastOrder.status) + ) { + // Order still running + continue; + } + + // Use atomic method to continue pipeline (prevents race with event handler) + const pipeline = await this.pipelineRepo.tryContinuePipeline(pipelineRef.id, lastOrder.status); + + if (!pipeline) { + // Pipeline was already handled by event handler + continue; } - // start new order + if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) { + await this.handlePipelineCompletion(pipeline); + continue; + } + + if ( + [LiquidityManagementPipelineStatus.FAILED, LiquidityManagementPipelineStatus.STOPPED].includes( + pipeline.status, + ) + ) { + await this.handlePipelineFail(pipeline, lastOrder); + continue; + } + + // Start new order this.logger.verbose( - `Continue with next liquidity management pipeline action. Action ID: ${pipeline.currentAction.id}`, + `Continue with next liquidity management pipeline action. Action ID: ${pipeline.currentAction?.id}`, ); await this.placeLiquidityOrder(pipeline, lastOrder); } catch (e) { - this.logger.error(`Error in checking running liquidity pipeline ${pipeline.id}:`, e); + this.logger.error(`Error in checking running liquidity pipeline ${pipelineRef.id}:`, e); continue; } } @@ -175,12 +284,10 @@ export class LiquidityManagementPipelineService { if (e instanceof OrderNotNecessaryException) { order.complete(); await this.orderRepo.save(order); - } - if (e instanceof OrderNotProcessableException) { + } else if (e instanceof OrderNotProcessableException) { order.notProcessable(e); await this.orderRepo.save(order); - } - if (e instanceof OrderFailedException) { + } else if (e instanceof OrderFailedException) { order.fail(e); await this.orderRepo.save(order); } @@ -195,48 +302,45 @@ export class LiquidityManagementPipelineService { } private async executeOrder(order: LiquidityManagementOrder): Promise { - const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); - - const correlationId = await actionIntegration.executeOrder(order); - order.inProgress(correlationId); - - await this.orderRepo.save(order); - } - - private async checkRunningOrders(): Promise { - const runningOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.IN_PROGRESS }); + // Atomically claim this order - only one caller can succeed + const gotLock = await this.orderRepo.tryClaimForExecution(order.id); + if (!gotLock) { + this.logger.verbose(`Order ${order.id} already being executed by another process`); + return; + } - for (const order of runningOrders) { - try { - await this.checkOrder(order); - } catch (e) { - if (e instanceof OrderNotProcessableException) { - order.notProcessable(e); - await this.orderRepo.save(order); - continue; - } - if (e instanceof OrderFailedException) { - order.fail(e); - await this.orderRepo.save(order); - continue; - } + // We have the lock, proceed with execution + try { + const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); + const correlationId = await actionIntegration.executeOrder(order); + + // Save all execution result fields (correlationId, inputAsset, outputAsset, inputAmount) + // These are set on the order entity during executeOrder() and needed for WebSocket symbol construction + await this.orderRepo.saveExecutionResult( + order.id, + correlationId, + order.inputAsset, + order.outputAsset, + order.inputAmount, + ); - this.logger.error(`Error in checking running liquidity order ${order.id}:`, e); + // Start active polling to quickly detect order completion + this.orderCompletionService.startActivePolling(order.id); + } catch (e) { + // If execution fails unexpectedly, revert to CREATED so it can be retried + if ( + !(e instanceof OrderNotNecessaryException) && + !(e instanceof OrderNotProcessableException) && + !(e instanceof OrderFailedException) + ) { + this.logger.error(`Unexpected error executing order ${order.id}, reverting to CREATED:`, e); + await this.orderRepo.revertToCREATED(order.id); } + throw e; } } - private async checkOrder(order: LiquidityManagementOrder): Promise { - const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); - const isComplete = await actionIntegration.checkCompletion(order); - - if (isComplete) { - order.complete(); - await this.orderRepo.save(order); - - this.logger.verbose(`Liquidity management order ${order.id} complete`); - } - } + // Note: checkRunningOrders and checkOrder have been moved to OrderCompletionService private async handlePipelineCompletion(pipeline: LiquidityManagementPipeline): Promise { const rule = pipeline.rule.reactivate(); @@ -252,7 +356,7 @@ export class LiquidityManagementPipelineService { private async handlePipelineFail( pipeline: LiquidityManagementPipeline, - order: LiquidityManagementOrder, + order: LiquidityManagementOrder | null, ): Promise { const rule = pipeline.rule.pause(); @@ -283,7 +387,7 @@ export class LiquidityManagementPipelineService { private generateFailMessage( pipeline: LiquidityManagementPipeline, - order: LiquidityManagementOrder, + order: LiquidityManagementOrder | null, ): [string, MailRequest] { const { id, type, maxAmount, rule } = pipeline; const errorMessage = `${type} pipeline for max. ${maxAmount} ${rule.targetName} (rule ${ @@ -298,7 +402,7 @@ export class LiquidityManagementPipelineService { errors: [ errorMessage, pipeline.status === LiquidityManagementPipelineStatus.FAILED - ? `Error: ${order.errorMessage}` + ? `Error: ${order?.errorMessage ?? 'Unknown error (order not found)'}` : 'Maximum order count reached', ], }, diff --git a/src/subdomains/core/liquidity-management/services/order-completion.service.ts b/src/subdomains/core/liquidity-management/services/order-completion.service.ts new file mode 100644 index 0000000000..829b9eda65 --- /dev/null +++ b/src/subdomains/core/liquidity-management/services/order-completion.service.ts @@ -0,0 +1,321 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { CronExpression } from '@nestjs/schedule'; +import { Observable, Subject } from 'rxjs'; +import { + ExchangeWebSocketService, + OrderUpdateEvent, +} from 'src/integration/exchange/services/exchange-websocket.service'; +import { DfxLogger } from 'src/shared/services/dfx-logger'; +import { Process } from 'src/shared/services/process.service'; +import { DfxCron } from 'src/shared/utils/cron'; +import { LiquidityManagementOrder } from '../entities/liquidity-management-order.entity'; +import { LiquidityManagementOrderStatus, LiquidityManagementSystem } from '../enums'; +import { OrderFailedException } from '../exceptions/order-failed.exception'; +import { OrderNotProcessableException } from '../exceptions/order-not-processable.exception'; +import { LiquidityActionIntegrationFactory } from '../factories/liquidity-action-integration.factory'; +import { LiquidityManagementOrderRepository } from '../repositories/liquidity-management-order.repository'; + +export interface OrderCompletionEvent { + orderId: number; + pipelineId: number; + status: LiquidityManagementOrderStatus; +} + +// Systems that support WebSocket for trade order updates +const WEBSOCKET_SUPPORTED_SYSTEMS = [ + LiquidityManagementSystem.KRAKEN, + LiquidityManagementSystem.BINANCE, + LiquidityManagementSystem.MEXC, + LiquidityManagementSystem.XT, +]; + +// Commands that are trade orders (not withdrawals/transfers) +const TRADE_COMMANDS = ['buy', 'sell']; + +@Injectable() +export class OrderCompletionService implements OnModuleDestroy { + private readonly logger = new DfxLogger(OrderCompletionService); + + // Central event stream for order completions + private readonly orderCompletedSubject = new Subject(); + + // Active polling intervals for orders + private readonly activePollingIntervals = new Map(); + + // WebSocket unsubscribe functions for orders + private readonly activeWebSocketSubscriptions = new Map void>(); + + // Polling interval in milliseconds (increased since WebSocket is primary for trades) + private readonly POLLING_INTERVAL_MS = 5000; + + constructor( + private readonly orderRepo: LiquidityManagementOrderRepository, + private readonly actionIntegrationFactory: LiquidityActionIntegrationFactory, + private readonly exchangeWebSocketService: ExchangeWebSocketService, + ) {} + + // Observable for subscribers to react to order completions + get orderCompleted$(): Observable { + return this.orderCompletedSubject.asObservable(); + } + + // Cleanup on module destruction + onModuleDestroy(): void { + this.stopAllPolling(); + this.stopAllWebSocketSubscriptions(); + this.orderCompletedSubject.complete(); + } + + /** + * Fallback cron - reduced frequency since active polling is primary. + * Checks all IN_PROGRESS orders every 5 minutes as a safety net. + */ + @DfxCron(CronExpression.EVERY_5_MINUTES, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 300 }) + async checkRunningOrdersFallback(): Promise { + const runningOrders = await this.orderRepo.findBy({ + status: LiquidityManagementOrderStatus.IN_PROGRESS, + }); + + this.logger.verbose(`Fallback check: ${runningOrders.length} orders in progress`); + + for (const order of runningOrders) { + await this.checkAndEmitOrderCompletion(order); + } + } + + /** + * Starts active monitoring for a specific order. + * Uses WebSocket for supported exchanges (Kraken, Binance, MEXC, XT) with polling as fallback. + * For non-WebSocket orders (withdrawals, transfers), uses polling only. + */ + async startActivePolling(orderId: number): Promise { + if (this.activePollingIntervals.has(orderId)) { + return; // Already monitoring + } + + // Load order to check if WebSocket is applicable + const order = await this.orderRepo.findOneBy({ id: orderId }); + if (!order) { + this.logger.warn(`Order ${orderId} not found, skipping active monitoring`); + return; + } + + // Try to start WebSocket subscription for trade orders on supported exchanges + const useWebSocket = this.tryStartWebSocketSubscription(order); + + if (useWebSocket) { + this.logger.verbose(`Started WebSocket monitoring for order ${orderId} (with polling fallback)`); + } else { + this.logger.verbose(`Started polling-only monitoring for order ${orderId}`); + } + + // Always start polling as fallback (or primary for non-WebSocket orders) + const interval = setInterval(async () => { + try { + const currentOrder = await this.orderRepo.findOneBy({ id: orderId }); + + if (!currentOrder || currentOrder.status !== LiquidityManagementOrderStatus.IN_PROGRESS) { + // Order not found or already in terminal state + this.stopActiveMonitoring(orderId); + return; + } + + const completed = await this.checkAndEmitOrderCompletion(currentOrder); + if (completed) { + this.stopActiveMonitoring(orderId); + } + } catch (e) { + this.logger.error(`Error in active polling for order ${orderId}:`, e); + } + }, this.POLLING_INTERVAL_MS); + + this.activePollingIntervals.set(orderId, interval); + } + + /** + * Tries to start a WebSocket subscription for trade orders. + * @returns true if WebSocket subscription was started + */ + private tryStartWebSocketSubscription(order: LiquidityManagementOrder): boolean { + const { action, correlationId, inputAsset, outputAsset } = order; + + // Only trade orders (buy/sell) can use WebSocket + if (!TRADE_COMMANDS.includes(action.command)) { + return false; + } + + // Check if exchange supports WebSocket + if (!WEBSOCKET_SUPPORTED_SYSTEMS.includes(action.system as LiquidityManagementSystem)) { + return false; + } + + // Need correlation ID (exchange order ID) and trading pair + if (!correlationId || !inputAsset || !outputAsset) { + this.logger.verbose(`Order ${order.id} missing data for WebSocket: correlationId=${correlationId}, pair=${inputAsset}/${outputAsset}`); + return false; + } + + // Build trading pair symbol (e.g., "BTC/USDT") + // For BUY: inputAsset=quote (USDT), outputAsset=base (BTC) → BTC/USDT + // For SELL: inputAsset=base (BTC), outputAsset=quote (USDT) → BTC/USDT + const symbol = action.command === 'buy' + ? `${outputAsset}/${inputAsset}` + : `${inputAsset}/${outputAsset}`; + + try { + const unsubscribe = this.exchangeWebSocketService.watchOrder( + action.system, + correlationId, + symbol, + (event) => this.handleWebSocketOrderUpdate(order.id, event), + ); + + this.activeWebSocketSubscriptions.set(order.id, unsubscribe); + return true; + } catch (e) { + this.logger.error(`Failed to start WebSocket for order ${order.id}:`, e); + return false; + } + } + + /** + * Handles WebSocket order update events. + */ + private async handleWebSocketOrderUpdate(orderId: number, event: OrderUpdateEvent): Promise { + try { + if (event.status !== 'closed' && event.status !== 'canceled') { + return; // Order still open + } + + this.logger.verbose(`WebSocket: Order ${orderId} completed with status ${event.status}`); + + // Load fresh order from database + const order = await this.orderRepo.findOneBy({ id: orderId }); + if (!order || order.status !== LiquidityManagementOrderStatus.IN_PROGRESS) { + // Already processed (by polling or another event) + this.stopActiveMonitoring(orderId); + return; + } + + // Update order with trade data from WebSocket + // For BUY: inputAmount = cost (quote spent), outputAmount = filled (base received) + // For SELL: inputAmount = filled (base sold), outputAmount = cost (quote received) + const isBuyOrder = order.action.command === 'buy'; + order.inputAmount = isBuyOrder ? event.cost : event.filled; + order.outputAmount = isBuyOrder ? event.filled : event.cost; + + if (event.status === 'closed') { + order.complete(); + await this.orderRepo.save(order); + this.emitOrderCompletion(order, LiquidityManagementOrderStatus.COMPLETE); + } else { + // canceled - need to check if it's a failure or partial fill + order.fail(new OrderFailedException(`Order was canceled on exchange`)); + await this.orderRepo.save(order); + this.emitOrderCompletion(order, LiquidityManagementOrderStatus.FAILED); + } + + this.stopActiveMonitoring(orderId); + } catch (e) { + this.logger.error(`Error handling WebSocket update for order ${orderId}:`, e); + } + } + + /** + * Stops all active monitoring (polling + WebSocket) for a specific order. + */ + stopActiveMonitoring(orderId: number): void { + // Stop polling + const interval = this.activePollingIntervals.get(orderId); + if (interval) { + clearInterval(interval); + this.activePollingIntervals.delete(orderId); + } + + // Stop WebSocket subscription + const unsubscribe = this.activeWebSocketSubscriptions.get(orderId); + if (unsubscribe) { + unsubscribe(); + this.activeWebSocketSubscriptions.delete(orderId); + } + + this.logger.verbose(`Stopped active monitoring for order ${orderId}`); + } + + /** + * @deprecated Use stopActiveMonitoring instead + */ + stopActivePolling(orderId: number): void { + this.stopActiveMonitoring(orderId); + } + + /** + * Stops all active polling (used on module destroy). + */ + private stopAllPolling(): void { + for (const [orderId, interval] of this.activePollingIntervals) { + clearInterval(interval); + this.logger.verbose(`Stopped active polling for order ${orderId} (cleanup)`); + } + this.activePollingIntervals.clear(); + } + + /** + * Stops all WebSocket subscriptions (used on module destroy). + */ + private stopAllWebSocketSubscriptions(): void { + for (const [orderId, unsubscribe] of this.activeWebSocketSubscriptions) { + unsubscribe(); + this.logger.verbose(`Stopped WebSocket subscription for order ${orderId} (cleanup)`); + } + this.activeWebSocketSubscriptions.clear(); + } + + /** + * Checks if an order is complete and emits an event if so. + * @returns true if order is complete, false otherwise + */ + private async checkAndEmitOrderCompletion(order: LiquidityManagementOrder): Promise { + try { + const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); + const isComplete = await actionIntegration.checkCompletion(order); + + if (isComplete) { + order.complete(); + await this.orderRepo.save(order); + + this.logger.verbose(`Order ${order.id} completed, emitting event`); + this.emitOrderCompletion(order, LiquidityManagementOrderStatus.COMPLETE); + return true; + } + } catch (e) { + if (e instanceof OrderNotProcessableException) { + order.notProcessable(e); + await this.orderRepo.save(order); + this.emitOrderCompletion(order, LiquidityManagementOrderStatus.NOT_PROCESSABLE); + return true; + } else if (e instanceof OrderFailedException) { + order.fail(e); + await this.orderRepo.save(order); + this.emitOrderCompletion(order, LiquidityManagementOrderStatus.FAILED); + return true; + } else { + this.logger.error(`Error checking order ${order.id}:`, e); + } + } + + return false; + } + + /** + * Emits an order completion event to all subscribers. + * Public method for use by PipelineService when orders complete immediately during execution. + */ + emitOrderCompletion(order: LiquidityManagementOrder, status: LiquidityManagementOrderStatus): void { + this.orderCompletedSubject.next({ + orderId: order.id, + pipelineId: order.pipeline.id, + status, + }); + } +} From 43d862a0e8e90adbb800542b4bf6b02c47958f72 Mon Sep 17 00:00:00 2001 From: TaprootFreak <142087526+TaprootFreak@users.noreply.github.com> Date: Fri, 19 Dec 2025 10:59:40 +0100 Subject: [PATCH 2/2] fix: Resolve lint errors (empty function, floating promise) --- .../exchange/services/exchange-websocket.service.ts | 2 +- .../services/liquidity-management-pipeline.service.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/integration/exchange/services/exchange-websocket.service.ts b/src/integration/exchange/services/exchange-websocket.service.ts index 235f2e5715..df306e6d97 100644 --- a/src/integration/exchange/services/exchange-websocket.service.ts +++ b/src/integration/exchange/services/exchange-websocket.service.ts @@ -59,7 +59,7 @@ export class ExchangeWebSocketService implements OnModuleInit, OnModuleDestroy { const connection = this.connections.get(exchangeName); if (!connection) { this.logger.warn(`Exchange ${exchangeName} not supported for WebSocket watching`); - return () => {}; + return () => undefined; } const watchKey = `${orderId}:${symbol}`; diff --git a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts index 13729aa864..711871c590 100644 --- a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts +++ b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts @@ -324,8 +324,8 @@ export class LiquidityManagementPipelineService implements OnModuleInit, OnModul order.inputAmount, ); - // Start active polling to quickly detect order completion - this.orderCompletionService.startActivePolling(order.id); + // Start active polling to quickly detect order completion (fire-and-forget) + void this.orderCompletionService.startActivePolling(order.id); } catch (e) { // If execution fails unexpectedly, revert to CREATED so it can be retried if (