1- import {
2- ChainId ,
3- hexToUint8Array ,
4- uint8ArrayToHex ,
5- } from "@certusone/wormhole-sdk" ;
1+ import { ChainId , uint8ArrayToHex } from "@certusone/wormhole-sdk" ;
62
73import {
84 createSpyRPCServiceClient ,
@@ -11,6 +7,8 @@ import {
117
128import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm" ;
139
10+ import { createHash } from "crypto" ;
11+
1412import {
1513 getBatchSummary ,
1614 parseBatchPriceAttestation ,
@@ -25,10 +23,12 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
2523import { sleep , TimestampInSec } from "./helpers" ;
2624import { logger } from "./logging" ;
2725import { PromClient } from "./promClient" ;
26+ import LRUCache from "lru-cache" ;
2827
2928export type PriceInfo = {
30- vaaBytes : string ;
29+ vaa : Buffer ;
3130 seqNum : number ;
31+ publishTime : TimestampInSec ;
3232 attestationTime : TimestampInSec ;
3333 priceFeed : PriceFeed ;
3434 emitterChainId : number ;
@@ -52,6 +52,8 @@ type ListenerConfig = {
5252 readiness : ListenerReadinessConfig ;
5353} ;
5454
55+ type VaaHash = string ;
56+
5557export class Listener implements PriceStore {
5658 // Mapping of Price Feed Id to Vaa
5759 private priceFeedVaaMap = new Map < string , PriceInfo > ( ) ;
@@ -61,13 +63,18 @@ export class Listener implements PriceStore {
6163 private spyConnectionTime : TimestampInSec | undefined ;
6264 private readinessConfig : ListenerReadinessConfig ;
6365 private updateCallbacks : ( ( priceInfo : PriceInfo ) => any ) [ ] ;
66+ private observedVaas : LRUCache < VaaHash , boolean > ;
6467
6568 constructor ( config : ListenerConfig , promClient ?: PromClient ) {
6669 this . promClient = promClient ;
6770 this . spyServiceHost = config . spyServiceHost ;
6871 this . loadFilters ( config . filtersRaw ) ;
6972 this . readinessConfig = config . readiness ;
7073 this . updateCallbacks = [ ] ;
74+ this . observedVaas = new LRUCache ( {
75+ max : 10000 , // At most 10000 items
76+ ttl : 60 * 1000 , // 60 seconds
77+ } ) ;
7178 }
7279
7380 private loadFilters ( filtersRaw ?: string ) {
@@ -114,7 +121,7 @@ export class Listener implements PriceStore {
114121 ) ;
115122 stream = await subscribeSignedVAA ( client , { filters : this . filters } ) ;
116123
117- stream ! . on ( "data" , ( { vaaBytes } : { vaaBytes : string } ) => {
124+ stream ! . on ( "data" , ( { vaaBytes } : { vaaBytes : Buffer } ) => {
118125 this . processVaa ( vaaBytes ) ;
119126 } ) ;
120127
@@ -150,19 +157,29 @@ export class Listener implements PriceStore {
150157 }
151158 }
152159
153- async processVaa ( vaaBytes : string ) {
160+ async processVaa ( vaa : Buffer ) {
154161 const { parse_vaa } = await importCoreWasm ( ) ;
155- const parsedVAA = parse_vaa ( hexToUint8Array ( vaaBytes ) ) ;
162+
163+ const vaaHash : VaaHash = createHash ( "md5" ) . update ( vaa ) . digest ( "base64" ) ;
164+
165+ if ( this . observedVaas . has ( vaaHash ) ) {
166+ return ;
167+ }
168+
169+ this . observedVaas . set ( vaaHash , true ) ;
170+ this . promClient ?. incReceivedVaa ( ) ;
171+
172+ const parsedVaa = parse_vaa ( vaa ) ;
156173
157174 let batchAttestation ;
158175
159176 try {
160177 batchAttestation = await parseBatchPriceAttestation (
161- Buffer . from ( parsedVAA . payload )
178+ Buffer . from ( parsedVaa . payload )
162179 ) ;
163180 } catch ( e : any ) {
164181 logger . error ( e , e . stack ) ;
165- logger . error ( "Parsing failed. Dropping vaa: %o" , parsedVAA ) ;
182+ logger . error ( "Parsing failed. Dropping vaa: %o" , parsedVaa ) ;
166183 return ;
167184 }
168185
@@ -194,15 +211,30 @@ export class Listener implements PriceStore {
194211 ) {
195212 const priceFeed = priceAttestationToPriceFeed ( priceAttestation ) ;
196213 const priceInfo = {
197- seqNum : parsedVAA . sequence ,
198- vaaBytes,
214+ seqNum : parsedVaa . sequence ,
215+ vaa,
216+ publishTime : priceAttestation . publishTime ,
199217 attestationTime : priceAttestation . attestationTime ,
200218 priceFeed,
201- emitterChainId : parsedVAA . emitter_chain ,
219+ emitterChainId : parsedVaa . emitter_chain ,
202220 priceServiceReceiveTime : Math . floor ( new Date ( ) . getTime ( ) / 1000 ) ,
203221 } ;
204222 this . priceFeedVaaMap . set ( key , priceInfo ) ;
205223
224+ if ( lastAttestationTime !== undefined ) {
225+ this . promClient ?. addPriceUpdatesAttestationTimeGap (
226+ priceAttestation . attestationTime - lastAttestationTime
227+ ) ;
228+ }
229+
230+ const lastPublishTime = this . priceFeedVaaMap . get ( key ) ?. publishTime ;
231+
232+ if ( lastPublishTime !== undefined ) {
233+ this . promClient ?. addPriceUpdatesPublishTimeGap (
234+ priceAttestation . publishTime - lastPublishTime
235+ ) ;
236+ }
237+
206238 for ( const callback of this . updateCallbacks ) {
207239 callback ( priceInfo ) ;
208240 }
@@ -211,16 +243,14 @@ export class Listener implements PriceStore {
211243
212244 logger . info (
213245 "Parsed a new Batch Price Attestation: [" +
214- parsedVAA . emitter_chain +
246+ parsedVaa . emitter_chain +
215247 ":" +
216- uint8ArrayToHex ( parsedVAA . emitter_address ) +
248+ uint8ArrayToHex ( parsedVaa . emitter_address ) +
217249 "], seqNum: " +
218- parsedVAA . sequence +
250+ parsedVaa . sequence +
219251 ", Batch Summary: " +
220252 getBatchSummary ( batchAttestation )
221253 ) ;
222-
223- this . promClient ?. incReceivedVaa ( ) ;
224254 }
225255
226256 getLatestPriceInfo ( priceFeedId : string ) : PriceInfo | undefined {
0 commit comments