diff --git a/Cargo.lock b/Cargo.lock index 9c56b98a..9e06c379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3314,6 +3314,9 @@ dependencies = [ "http 1.4.0", "http-body-util", "lambda_http", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "serde", "serde_json", "serde_urlencoded", @@ -3321,6 +3324,8 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", + "tracing-allocations", + "tracing-opentelemetry", "tracing-subscriber", "uuid", ] diff --git a/config/.env.exmaple b/config/.env.exmaple new file mode 100644 index 00000000..9b66cd05 --- /dev/null +++ b/config/.env.exmaple @@ -0,0 +1,3 @@ +METASTORE_CONFIG=config/metastore.yaml +JWT_SECRET=secret +TRACING_LEVEL=info \ No newline at end of file diff --git a/crates/api-snowflake-rest/src/tests/create_test_server.rs b/crates/api-snowflake-rest/src/tests/create_test_server.rs index ebd0f535..96b3a1be 100644 --- a/crates/api-snowflake-rest/src/tests/create_test_server.rs +++ b/crates/api-snowflake-rest/src/tests/create_test_server.rs @@ -30,6 +30,7 @@ pub fn executor_default_cfg() -> UtilsConfig { UtilsConfig::default().with_max_concurrency_level(2) } +#[must_use] pub fn metastore_default_settings_cfg() -> MetastoreSettingsConfig { MetastoreSettingsConfig::default() .with_object_store_connect_timeout(1) diff --git a/crates/embucket-lambda/Cargo.toml b/crates/embucket-lambda/Cargo.toml index e03e8192..516023d7 100644 --- a/crates/embucket-lambda/Cargo.toml +++ b/crates/embucket-lambda/Cargo.toml @@ -12,8 +12,6 @@ executor = { path = "../executor" } build-info = { path = "../build-info" } lambda_http = "0.17" tokio = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } base64 = "0.22" serde = { workspace = true } serde_json = { workspace = true } @@ -26,6 +24,13 @@ flate2 = { version = "1", default-features = false, features = ["rust_backend"] tower = { workspace = true } tower-http = { workspace = true } cfg-if = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "registry", "fmt", "json"] } +tracing-opentelemetry = { version = "0.31.0" } +tracing-allocations = { version = "0.1.0", optional = true } +opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] } +opentelemetry_sdk = { version = "0.30.0" } +opentelemetry = { version = "0.30.0" } [lints] workspace = true diff --git a/crates/embucket-lambda/Makefile b/crates/embucket-lambda/Makefile index d60d679b..c164bdeb 100644 --- a/crates/embucket-lambda/Makefile +++ b/crates/embucket-lambda/Makefile @@ -2,20 +2,37 @@ # Function name (override with: make deploy FUNCTION_NAME=your-function) FUNCTION_NAME ?= embucket-lambda -ENV_FILE ?= .env +ENV_FILE ?= config/.env +IAM_ROLE ?= +OTEL_COLLECTOR_LAYERS ?= # supported features: "streaming" FEATURES_PARAM := $(if $(FEATURES),--features $(FEATURES)) build: - cd ../.. && cargo lambda build --release --arm64 --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM) + cd ../.. && cargo lambda build --release -p embucket-lambda --arm64 -o zip --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM) # Deploy to AWS (must run from workspace root for include paths to work) -deploy: build deploy-only +deploy: build deploy-only public-url @echo "Deployed $(FUNCTION_NAME) to AWS" # Quick deploy without rebuild deploy-only: - cd ../.. && cargo lambda deploy --binary-name bootstrap $(FUNCTION_NAME) --env-file $(ENV_FILE) + cd ../.. && cargo lambda deploy $(OTEL_COLLECTOR_LAYERS) --iam-role $(IAM_ROLE) --env-file $(ENV_FILE) --binary-name bootstrap $(FUNCTION_NAME) + aws logs create-log-group --log-group-name "/aws/lambda/$(FUNCTION_NAME)" >/dev/null 2>&1 || true + +public-url: + @set -e; \ + echo "Ensuring Function URL config exists for $(FUNCTION_NAME) (auth: NONE)..." ; \ + aws lambda create-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null 2>&1 || \ + aws lambda update-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null ; \ + echo "Ensuring public invoke permission exists..." ; \ + aws lambda add-permission --function-name "$(FUNCTION_NAME)" \ + --statement-id AllowPublicURLInvoke \ + --action lambda:InvokeFunctionUrl \ + --principal "*" \ + --function-url-auth-type NONE >/dev/null 2>&1 || true ; \ + URL="$$(aws lambda get-function-url-config --function-name "$(FUNCTION_NAME)" --query 'FunctionUrl' --output text)"; \ + echo "$$URL" # Watch locally for development watch: diff --git a/crates/embucket-lambda/README.md b/crates/embucket-lambda/README.md index 9cb44dbb..063eda07 100644 --- a/crates/embucket-lambda/README.md +++ b/crates/embucket-lambda/README.md @@ -77,6 +77,66 @@ cargo lambda deploy --binary-name bootstrap ``` - It will deploy envs from `.env` if `ENV_FILE` not specified +### Observability + +#### AWS traces +We send events, spans to stdout log in json format, and in case if AWS X-Ray is enabled it enhances traces. +- `RUST_LOG` - Controls verbosity log level. Default to "INFO", possible values: "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE". + +#### OpenTelemetry configuration + +To work with Opentelemtry, you need an Opentelemetry Collector running in your environment with open telemetry config. +The easiest way is to add two layers to your lambda deployment. One of which would be your config file with the remote exporter. + +1. Create a folder called collector-config and add a file called `config.yml` with the OpenTelemetry Collector [**configuration**](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/). +2. After which zip the folder with this ocmmand: `zip -r .zip collector-config` +3. Then publish it to AWS (change the file name and layer name if you want): `aws lambda publish-layer-version + --layer-name + --zip-file fileb://.zip + --compatible-runtimes provided.al2 provided.al2023 + --compatible-architectures arm64` +4. After which provide this as an external env variable (the first layer is the collector itself): `OTEL_COLLECTOR_LAYERS ?= \ + --layer-arn arn:aws:lambda:us-east-2:184161586896:layer:opentelemetry-collector-arm64-0_19_0:1\ + --layer-arn arn:aws:lambda:::layer::` +5. Now you can deploy the function with the new layer. + +If you later update the configratuin and publish the layer again remember to change the layer `` number, after the first publish it is `1`. + +#### Exporting telemetry spans to [**honeycomb.io**](https://docs.honeycomb.io/send-data/opentelemetry/collector/) + +OpenTelemrty Collector config example for Honeycomb: +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: localhost:4317 + http: + endpoint: localhost:4318 + +processors: + batch: + +exporters: + otlp: + # You can name these envs anything you want as long as they are the same as in .env file + endpoint: "${env:HONEYCOMB_ENDPOINT_URL}" + headers: + x-honeycomb-team: "${env:HONEYCOMB_API_KEY}" + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp] +``` + +- Environment variables configuration: + * `HONEYCOMB_API_KEY` - this is the full ingestion key (not the key id or management key) + * `HONEYCOMB_ENDPOINT_URL` - check the region it can start be `api.honeycomb.io` or `api.eu1.honeycomb.io` + * `OTEL_SERVICE_NAME` - is the x-honeycomb-dataset name + ### Test locally ```bash diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index 800eb54b..bfc058ad 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -25,6 +25,8 @@ pub struct EnvConfig { pub iceberg_catalog_timeout_secs: u64, pub object_store_timeout_secs: u64, pub object_store_connect_timeout_secs: u64, + pub otel_exporter_otlp_protocol: String, + pub tracing_level: String, } impl EnvConfig { @@ -58,6 +60,9 @@ impl EnvConfig { object_store_timeout_secs: parse_env("OBJECT_STORE_TIMEOUT_SECS").unwrap_or(30), object_store_connect_timeout_secs: parse_env("OBJECT_STORE_CONNECT_TIMEOUT_SECS") .unwrap_or(3), + otel_exporter_otlp_protocol: parse_env("OTEL_EXPORTER_OTLP_PROTOCOL") + .unwrap_or("grpc".to_string()), + tracing_level: env_or_default("TRACING_LEVEL", "INFO"), } } diff --git a/crates/embucket-lambda/src/main.rs b/crates/embucket-lambda/src/main.rs index 8fe894d2..5e1d0799 100644 --- a/crates/embucket-lambda/src/main.rs +++ b/crates/embucket-lambda/src/main.rs @@ -15,11 +15,18 @@ use catalog_metastore::metastore_settings_config::MetastoreSettingsConfig; use http::HeaderMap; use http_body_util::BodyExt; use lambda_http::{Body as LambdaBody, Error as LambdaError, Request, Response, service_fn}; -use std::io::IsTerminal; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::SdkTracerProvider; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tower::ServiceExt; use tracing::{error, info}; +use tracing_subscriber::filter::{FilterExt, LevelFilter, Targets, filter_fn}; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt}; + cfg_if::cfg_if! { if #[cfg(feature = "streaming")] { use lambda_http::run_with_streaming_response as run; @@ -30,9 +37,13 @@ cfg_if::cfg_if! { type InitResult = Result>; +const DISABLED_TARGETS: [&str; 2] = ["h2", "aws_smithy_runtime"]; + #[tokio::main] async fn main() -> Result<(), LambdaError> { - init_tracing(); + let env_config = EnvConfig::from_env(); + + let tracing_provider = init_tracing_and_logs(&env_config); // Log version and build information on startup info!( @@ -43,7 +54,6 @@ async fn main() -> Result<(), LambdaError> { "embucket-lambda started" ); - let env_config = EnvConfig::from_env(); info!( data_format = %env_config.data_format, max_concurrency = env_config.max_concurrency_level, @@ -65,11 +75,18 @@ async fn main() -> Result<(), LambdaError> { err })?); - run(service_fn(move |event: Request| { + let err = run(service_fn(move |event: Request| { let app = Arc::clone(&app); async move { app.handle_event(event).await } })) - .await + .await; + + tracing_provider.shutdown().map_err(|err| { + error!(error = %err, "Failed to shutdown TracerProvider"); + err + })?; + + err } struct LambdaApp { @@ -154,10 +171,6 @@ impl LambdaApp { ); } - // if let Err(err) = ensure_session_header(&mut parts.headers, &self.state).await { - // return Ok(snowflake_error_response(&err)); - // } - let mut axum_request = to_axum_request(parts, body_bytes); if let Some(addr) = extract_socket_addr(axum_request.headers()) { axum_request.extensions_mut().insert(ConnectInfo(addr)); @@ -230,31 +243,79 @@ fn extract_socket_addr(headers: &HeaderMap) -> Option { .map(|ip| SocketAddr::new(ip, 0)) } -fn init_tracing() { - let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); - let emit_ansi = std::io::stdout().is_terminal(); - - // Use json format if requested via env var, otherwise use pretty format with span events - let format = std::env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string()); - - if format == "json" { - let _ = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(true) - .with_ansi(false) - .json() - .with_current_span(true) - .with_span_list(true) - .try_init(); - } else { - let _ = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(true) - .with_ansi(emit_ansi) - .with_span_events( - tracing_subscriber::fmt::format::FmtSpan::ENTER - | tracing_subscriber::fmt::format::FmtSpan::CLOSE, - ) - .try_init(); - } +#[allow(clippy::expect_used, clippy::redundant_closure_for_method_calls)] +fn init_tracing_and_logs(config: &EnvConfig) -> SdkTracerProvider { + let exporter = match config.otel_exporter_otlp_protocol.to_lowercase().as_str() { + "grpc" => { + // Initialize OTLP exporter using gRPC (Tonic) + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .expect("Failed to create OTLP gRPC exporter") + } + "http/json" => { + // Initialize OTLP exporter using HTTP + opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .expect("Failed to create OTLP HTTP exporter") + } + protocol => panic!("Unsupported OTLP exporter protocol: {protocol}"), + }; + + let resource = Resource::builder().build(); + + let tracing_provider = SdkTracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter).build()) + .with_resource(resource) + .build(); + + let targets_with_level = + |targets: &[&'static str], level: LevelFilter| -> Vec<(&str, LevelFilter)> { + // let default_log_targets: Vec<(String, LevelFilter)> = + targets.iter().map(|t| ((*t), level)).collect() + }; + + let registry = tracing_subscriber::registry() + // Telemetry filtering + .with( + tracing_opentelemetry::OpenTelemetryLayer::new(tracing_provider.tracer("embucket")) + .with_level(true) + .with_filter( + Targets::default() + .with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF)) + .with_default(config.tracing_level.parse().unwrap_or(tracing::Level::INFO)), + ), + ) + .with({ + let fmt_filter = match std::env::var("RUST_LOG") { + Ok(val) => match val.parse::() { + Ok(log_targets_from_env) => log_targets_from_env, + Err(err) => { + eprintln!("Failed to parse RUST_LOG: {err:?}"); + Targets::default() + .with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF)) + .with_default(LevelFilter::DEBUG) + } + }, + _ => Targets::default() + .with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF)) + .with_default(LevelFilter::INFO), + }; + // Skip memory allocations spans + let spans_always = filter_fn(|meta| meta.is_span()); + let not_alloc_event = filter_fn(|meta| { + meta.target() != "alloc" && meta.target() != "tracing_allocations" + }); + + tracing_subscriber::fmt::layer() + .with_target(true) + .with_level(true) + .with_span_events(FmtSpan::NONE) + .json() + .with_filter(spans_always.or(not_alloc_event.and(fmt_filter))) + }); + + registry.init(); + tracing_provider } diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index f6db5875..e1b76e84 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -118,6 +118,14 @@ pub struct CliOpts { )] pub auth_demo_password: Option, + #[arg( + long, + env = "OTEL_EXPORTER_OTLP_PROTOCOL", + default_value = "grpc", + help = "OpenTelemetry Exporter Protocol" + )] + pub otel_exporter_otlp_protocol: String, + #[arg( long, value_enum, diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 7f315aea..e113e53a 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -207,11 +207,23 @@ async fn async_main( #[allow(clippy::expect_used, clippy::redundant_closure_for_method_calls)] fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider { - // Initialize OTLP exporter using gRPC (Tonic) - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .build() - .expect("Failed to create OTLP exporter"); + let exporter = match opts.otel_exporter_otlp_protocol.to_lowercase().as_str() { + "grpc" => { + // Initialize OTLP exporter using gRPC (Tonic) + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .expect("Failed to create OTLP gRPC exporter") + } + "http/json" => { + // Initialize OTLP exporter using HTTP + opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .expect("Failed to create OTLP HTTP exporter") + } + protocol => panic!("Unsupported OTLP protocol: {}", protocol), + }; let resource = Resource::builder().with_service_name("Em").build();