Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ed43693
add otel_grpc param
YaroslavLitvinov Dec 13, 2025
1b3f972
use registry for both logs and spans
YaroslavLitvinov Dec 13, 2025
e8dfc70
add collector artifacts into gitignore
YaroslavLitvinov Dec 13, 2025
580698b
shitdown tracing provider
YaroslavLitvinov Dec 15, 2025
0560d41
fixes for honeycomb
DanCodedThis Dec 15, 2025
23315da
fixes for honeycomb and otlp layers
DanCodedThis Dec 15, 2025
b7f5c3a
fixes for honeycomb and otlp layers
DanCodedThis Dec 15, 2025
9a8e24e
fmt fix
DanCodedThis Dec 15, 2025
c4efc7a
fix
DanCodedThis Dec 15, 2025
050869d
fix
DanCodedThis Dec 15, 2025
6e2df29
hardcoded
DanCodedThis Dec 15, 2025
238af18
not hardcoded
DanCodedThis Dec 15, 2025
92cb9cd
Merge remote-tracking branch 'origin' into yaro/lambda-tracing
YaroslavLitvinov Dec 15, 2025
65a9263
remove duplication
DanCodedThis Dec 15, 2025
a90b0fb
Merge branch 'yaro/lambda-tracing' into DanCodedThis/yaro-lambda-tracing
DanCodedThis Dec 15, 2025
413dd92
remove duplication
DanCodedThis Dec 15, 2025
ef963b8
Merge remote-tracking branch 'origin/DanCodedThis/yaro-lambda-tracing…
DanCodedThis Dec 15, 2025
0e7604e
WIP
DanCodedThis Dec 17, 2025
234f579
readme
DanCodedThis Dec 18, 2025
59efa7e
readme
DanCodedThis Dec 18, 2025
ddf986b
Merge branch 'main' into DanCodedThis/trace-lambda
DanCodedThis Dec 18, 2025
a558127
fixes
DanCodedThis Dec 18, 2025
b8d8463
fixes
DanCodedThis Dec 18, 2025
1ceebaa
fixes
DanCodedThis Dec 18, 2025
717be7e
fmt
DanCodedThis Dec 18, 2025
3865a11
fix
DanCodedThis Dec 18, 2025
4ab2b5d
fix
DanCodedThis Dec 18, 2025
1f62d9b
fix
DanCodedThis Dec 18, 2025
701e9d6
requested changes
DanCodedThis Dec 18, 2025
292d4bd
requested changes
DanCodedThis Dec 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/.env.exmaple
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
METASTORE_CONFIG=config/metastore.yaml
JWT_SECRET=secret
TRACING_LEVEL=info
1 change: 1 addition & 0 deletions crates/api-snowflake-rest/src/tests/create_test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn executor_default_cfg() -> UtilsConfig {
UtilsConfig::default().with_max_concurrency_level(2)
}

#[must_use]
pub fn metastore_default_settings_cfg() -> MetastoreSettingsConfig {
MetastoreSettingsConfig::default()
.with_object_store_connect_timeout(1)
Expand Down
9 changes: 7 additions & 2 deletions crates/embucket-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ executor = { path = "../executor" }
build-info = { path = "../build-info" }
lambda_http = "0.17"
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
base64 = "0.22"
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -26,6 +24,13 @@ flate2 = { version = "1", default-features = false, features = ["rust_backend"]
tower = { workspace = true }
tower-http = { workspace = true }
cfg-if = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "registry", "fmt", "json"] }
tracing-opentelemetry = { version = "0.31.0" }
tracing-allocations = { version = "0.1.0", optional = true }
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] }
opentelemetry_sdk = { version = "0.30.0" }
opentelemetry = { version = "0.30.0" }

[lints]
workspace = true
Expand Down
25 changes: 21 additions & 4 deletions crates/embucket-lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,37 @@

# Function name (override with: make deploy FUNCTION_NAME=your-function)
FUNCTION_NAME ?= embucket-lambda
ENV_FILE ?= .env
ENV_FILE ?= config/.env
IAM_ROLE ?=
OTEL_COLLECTOR_LAYERS ?=
# supported features: "streaming"
FEATURES_PARAM := $(if $(FEATURES),--features $(FEATURES))

build:
cd ../.. && cargo lambda build --release --arm64 --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM)
cd ../.. && cargo lambda build --release -p embucket-lambda --arm64 -o zip --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM)

# Deploy to AWS (must run from workspace root for include paths to work)
deploy: build deploy-only
deploy: build deploy-only public-url
@echo "Deployed $(FUNCTION_NAME) to AWS"

# Quick deploy without rebuild
deploy-only:
cd ../.. && cargo lambda deploy --binary-name bootstrap $(FUNCTION_NAME) --env-file $(ENV_FILE)
cd ../.. && cargo lambda deploy $(OTEL_COLLECTOR_LAYERS) --iam-role $(IAM_ROLE) --env-file $(ENV_FILE) --binary-name bootstrap $(FUNCTION_NAME)
aws logs create-log-group --log-group-name "/aws/lambda/$(FUNCTION_NAME)" >/dev/null 2>&1 || true

public-url:
@set -e; \
echo "Ensuring Function URL config exists for $(FUNCTION_NAME) (auth: NONE)..." ; \
aws lambda create-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null 2>&1 || \
aws lambda update-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null ; \
echo "Ensuring public invoke permission exists..." ; \
aws lambda add-permission --function-name "$(FUNCTION_NAME)" \
--statement-id AllowPublicURLInvoke \
--action lambda:InvokeFunctionUrl \
--principal "*" \
--function-url-auth-type NONE >/dev/null 2>&1 || true ; \
URL="$$(aws lambda get-function-url-config --function-name "$(FUNCTION_NAME)" --query 'FunctionUrl' --output text)"; \
echo "$$URL"

# Watch locally for development
watch:
Expand Down
60 changes: 60 additions & 0 deletions crates/embucket-lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,66 @@ cargo lambda deploy --binary-name bootstrap
```
- It will deploy envs from `.env` if `ENV_FILE` not specified

### Observability

#### AWS traces
We send events, spans to stdout log in json format, and in case if AWS X-Ray is enabled it enhances traces.
- `RUST_LOG` - Controls verbosity log level. Default to "INFO", possible values: "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE".

#### OpenTelemetry configuration

To work with Opentelemtry, you need an Opentelemetry Collector running in your environment with open telemetry config.
The easiest way is to add two layers to your lambda deployment. One of which would be your config file with the remote exporter.

1. Create a folder called collector-config and add a file called `config.yml` with the OpenTelemetry Collector [**configuration**](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/).
2. After which zip the folder with this ocmmand: `zip -r <filename>.zip collector-config`
3. Then publish it to AWS (change the file name and layer name if you want): `aws lambda publish-layer-version
--layer-name <layername>
--zip-file fileb://<filename>.zip
--compatible-runtimes provided.al2 provided.al2023
--compatible-architectures arm64`
4. After which provide this as an external env variable (the first layer is the collector itself): `OTEL_COLLECTOR_LAYERS ?= \
--layer-arn arn:aws:lambda:us-east-2:184161586896:layer:opentelemetry-collector-arm64-0_19_0:1\
--layer-arn arn:aws:lambda:<region>:<account_id>:layer:<layername>:<version>`
5. Now you can deploy the function with the new layer.

If you later update the configratuin and publish the layer again remember to change the layer `<version>` number, after the first publish it is `1`.

#### Exporting telemetry spans to [**honeycomb.io**](https://docs.honeycomb.io/send-data/opentelemetry/collector/)

OpenTelemrty Collector config example for Honeycomb:
```yaml
receivers:
otlp:
protocols:
grpc:
endpoint: localhost:4317
http:
endpoint: localhost:4318

processors:
batch:

exporters:
otlp:
# You can name these envs anything you want as long as they are the same as in .env file
endpoint: "${env:HONEYCOMB_ENDPOINT_URL}"
headers:
x-honeycomb-team: "${env:HONEYCOMB_API_KEY}"

service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
```

- Environment variables configuration:
* `HONEYCOMB_API_KEY` - this is the full ingestion key (not the key id or management key)
* `HONEYCOMB_ENDPOINT_URL` - check the region it can start be `api.honeycomb.io` or `api.eu1.honeycomb.io`
* `OTEL_SERVICE_NAME` - is the x-honeycomb-dataset name

### Test locally

```bash
Expand Down
5 changes: 5 additions & 0 deletions crates/embucket-lambda/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct EnvConfig {
pub iceberg_catalog_timeout_secs: u64,
pub object_store_timeout_secs: u64,
pub object_store_connect_timeout_secs: u64,
pub otel_exporter_otlp_protocol: String,
pub tracing_level: String,
}

impl EnvConfig {
Expand Down Expand Up @@ -58,6 +60,9 @@ impl EnvConfig {
object_store_timeout_secs: parse_env("OBJECT_STORE_TIMEOUT_SECS").unwrap_or(30),
object_store_connect_timeout_secs: parse_env("OBJECT_STORE_CONNECT_TIMEOUT_SECS")
.unwrap_or(3),
otel_exporter_otlp_protocol: parse_env("OTEL_EXPORTER_OTLP_PROTOCOL")
.unwrap_or("grpc".to_string()),
tracing_level: env_or_default("TRACING_LEVEL", "INFO"),
}
}

Expand Down
133 changes: 97 additions & 36 deletions crates/embucket-lambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ use catalog_metastore::metastore_settings_config::MetastoreSettingsConfig;
use http::HeaderMap;
use http_body_util::BodyExt;
use lambda_http::{Body as LambdaBody, Error as LambdaError, Request, Response, service_fn};
use std::io::IsTerminal;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::trace::BatchSpanProcessor;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tower::ServiceExt;
use tracing::{error, info};
use tracing_subscriber::filter::{FilterExt, LevelFilter, Targets, filter_fn};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt};

cfg_if::cfg_if! {
if #[cfg(feature = "streaming")] {
use lambda_http::run_with_streaming_response as run;
Expand All @@ -30,9 +37,13 @@ cfg_if::cfg_if! {

type InitResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

const DISABLED_TARGETS: [&str; 2] = ["h2", "aws_smithy_runtime"];

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
init_tracing();
let env_config = EnvConfig::from_env();

let tracing_provider = init_tracing_and_logs(&env_config);

// Log version and build information on startup
info!(
Expand All @@ -43,7 +54,6 @@ async fn main() -> Result<(), LambdaError> {
"embucket-lambda started"
);

let env_config = EnvConfig::from_env();
info!(
data_format = %env_config.data_format,
max_concurrency = env_config.max_concurrency_level,
Expand All @@ -65,11 +75,18 @@ async fn main() -> Result<(), LambdaError> {
err
})?);

run(service_fn(move |event: Request| {
let err = run(service_fn(move |event: Request| {
let app = Arc::clone(&app);
async move { app.handle_event(event).await }
}))
.await
.await;

tracing_provider.shutdown().map_err(|err| {
error!(error = %err, "Failed to shutdown TracerProvider");
err
})?;

err
}

struct LambdaApp {
Expand Down Expand Up @@ -154,10 +171,6 @@ impl LambdaApp {
);
}

// if let Err(err) = ensure_session_header(&mut parts.headers, &self.state).await {
// return Ok(snowflake_error_response(&err));
// }

let mut axum_request = to_axum_request(parts, body_bytes);
if let Some(addr) = extract_socket_addr(axum_request.headers()) {
axum_request.extensions_mut().insert(ConnectInfo(addr));
Expand Down Expand Up @@ -230,31 +243,79 @@ fn extract_socket_addr(headers: &HeaderMap) -> Option<SocketAddr> {
.map(|ip| SocketAddr::new(ip, 0))
}

fn init_tracing() {
let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string());
let emit_ansi = std::io::stdout().is_terminal();

// Use json format if requested via env var, otherwise use pretty format with span events
let format = std::env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string());

if format == "json" {
let _ = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(true)
.with_ansi(false)
.json()
.with_current_span(true)
.with_span_list(true)
.try_init();
} else {
let _ = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(true)
.with_ansi(emit_ansi)
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::ENTER
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
)
.try_init();
}
#[allow(clippy::expect_used, clippy::redundant_closure_for_method_calls)]
fn init_tracing_and_logs(config: &EnvConfig) -> SdkTracerProvider {
let exporter = match config.otel_exporter_otlp_protocol.to_lowercase().as_str() {
"grpc" => {
// Initialize OTLP exporter using gRPC (Tonic)
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.expect("Failed to create OTLP gRPC exporter")
}
"http/json" => {
// Initialize OTLP exporter using HTTP
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.build()
.expect("Failed to create OTLP HTTP exporter")
}
protocol => panic!("Unsupported OTLP exporter protocol: {protocol}"),
};

let resource = Resource::builder().build();

let tracing_provider = SdkTracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter).build())
.with_resource(resource)
.build();

let targets_with_level =
|targets: &[&'static str], level: LevelFilter| -> Vec<(&str, LevelFilter)> {
// let default_log_targets: Vec<(String, LevelFilter)> =
targets.iter().map(|t| ((*t), level)).collect()
};

let registry = tracing_subscriber::registry()
// Telemetry filtering
.with(
tracing_opentelemetry::OpenTelemetryLayer::new(tracing_provider.tracer("embucket"))
.with_level(true)
.with_filter(
Targets::default()
.with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF))
.with_default(config.tracing_level.parse().unwrap_or(tracing::Level::INFO)),
),
)
.with({
let fmt_filter = match std::env::var("RUST_LOG") {
Ok(val) => match val.parse::<Targets>() {
Ok(log_targets_from_env) => log_targets_from_env,
Err(err) => {
eprintln!("Failed to parse RUST_LOG: {err:?}");
Targets::default()
.with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF))
.with_default(LevelFilter::DEBUG)
}
},
_ => Targets::default()
.with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF))
.with_default(LevelFilter::INFO),
};
// Skip memory allocations spans
let spans_always = filter_fn(|meta| meta.is_span());
let not_alloc_event = filter_fn(|meta| {
meta.target() != "alloc" && meta.target() != "tracing_allocations"
});

tracing_subscriber::fmt::layer()
.with_target(true)
.with_level(true)
.with_span_events(FmtSpan::NONE)
.json()
.with_filter(spans_always.or(not_alloc_event.and(fmt_filter)))
});

registry.init();
tracing_provider
}
8 changes: 8 additions & 0 deletions crates/embucketd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ pub struct CliOpts {
)]
pub auth_demo_password: Option<String>,

#[arg(
long,
env = "OTEL_EXPORTER_OTLP_PROTOCOL",
default_value = "grpc",
help = "OpenTelemetry Exporter Protocol"
)]
pub otel_exporter_otlp_protocol: String,

#[arg(
long,
value_enum,
Expand Down
Loading