diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 0e5c12504b15..5c80b8003f28 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -16,6 +16,9 @@ const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; const assert = std.debug.assert; const posix = std.posix; +const log = std.log; +const Span = std.log.Span; +const Executor = std.log.Executor; /// Thread-safe. allocator: Allocator, @@ -94,6 +97,7 @@ const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, cancel_tid: CancelId, + span: Span = .empty, const Start = *const fn (*Closure) void; @@ -203,6 +207,9 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { defer t.wait_group.finish(); + const executor: Executor = .create(); + defer executor.exit(); + t.mutex.lock(); defer t.mutex.unlock(); @@ -210,7 +217,9 @@ fn worker(t: *Threaded) void { while (t.run_queue.popFirst()) |closure_node| { t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); + executor.link(&closure.span); closure.start(closure); + executor.unlink(&closure.span); t.mutex.lock(); t.busy_count -= 1; } @@ -478,6 +487,7 @@ const AsyncClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, + .span = log.thread_span, }, .func = func, .context_alignment = context_alignment, @@ -623,6 +633,7 @@ const GroupClosure = struct { // Even though we already know the task is canceled, we must still // run the closure in case there are side effects. } + current_closure = closure; gc.func(group, gc.contextPointer()); current_closure = null; @@ -664,6 +675,7 @@ const GroupClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, + .span = log.thread_span, }, .t = t, .group = group, diff --git a/lib/std/log.zig b/lib/std/log.zig index 9568f9ba5211..aad633116715 100644 --- a/lib/std/log.zig +++ b/lib/std/log.zig @@ -25,7 +25,10 @@ //! ``` const std = @import("std.zig"); +const assert = std.debug.assert; const builtin = @import("builtin"); +const SourceLocation = std.builtin.SourceLocation; +const SpanUserdata = std.options.SpanUserdata; pub const Level = enum { /// Error: something has gone wrong. This might be recoverable or might @@ -113,6 +116,229 @@ pub fn defaultLog( stderr.print(format ++ "\n", args) catch return; } +fn trace( + comptime level: log.Level, + comptime scope: @EnumLiteral(), + comptime src: SourceLocation, + comptime event: log.SpanEvent, + executor: log.Executor, + span: *log.Span, +) void { + if (comptime !logEnabled(level, scope)) return; + std.options.traceFn(level, scope, event, executor, span); +} + +pub fn defaultTrace( + comptime level: log.Level, + comptime scope: @EnumLiteral(), + comptime src: SourceLocation, + comptime event: log.SpanEvent, + executor: log.Executor, + span: *log.Span, +) void { + _ = level; + _ = scope; + _ = event; + _ = executor; + _ = span; +} + +/// This thread's currently running executor. +pub threadlocal var thread_executor: Executor = .none; + +/// This thread's currently excuting span. +pub threadlocal var thread_span: Span = .empty; + +/// An executor can be a thread, fiber, or whatever the std.Io implementation +/// decides it is. Internally it is represented by a monotonically increasing +/// integer, but that is an implementation detail, and should not be relied +/// upon. +pub const Executor = enum(u64) { + none = std.math.maxInt(u64), + _, + + /// A globally unique, monotonically increasing executor identifier. + var next_id: std.atomic.Value(u64) = .init(0); + + pub fn create() Executor { + return @enumFromInt(next_id.fetchAdd(1, .monotonic)); + } + + pub fn link(self: Executor, span_: *Span) void { + if (span_.id == .none) return; + span_.vtable.linkFn(span_, self); + } + + pub fn unlink(self: Executor, span_: *Span) void { + if (span_.id == .none) return; + span_.vtable.unlinkFn(span_, self); + } +}; + +/// An execution span. +pub const Span = struct { + id: SpanId, + vtable: *const VTable, + userdata: SpanUserdata, + + pub const empty: Span = .{ + .id = .none, + .vtable = undefined, + .userdata = undefined, + }; + + pub const VTable = struct { + suspendFn: *const fn (self: *Span) void, + resumeFn: *const fn (self: *Span) void, + linkFn: *const fn (self: *Span, executor: Executor) void, + unlinkFn: *const fn (self: *Span, executor: Executor) void, + }; + + pub fn @"suspend"(self: *Span) Span { + if (self.id == .none) return empty; + return self.vtable.suspendFn(self); + } + + pub fn @"resume"(self: *Span) void { + if (self.id == .none) return; + self.vtable.resumeFn(self); + } +}; + +/// Internally this is represented by a monotonically increasing integer, but +/// that is an implementation detail, and should not be relied upon. +pub const SpanId = enum(u64) { + none = std.math.maxInt(u64), + _, + + /// A globally unique, monotonically increasing `Span` identifier. + var next_id: std.atomic.Value(u64) = .init(0); + + /// Acquires a new `SpanId`. + pub fn createNext() SpanId { + return @enumFromInt(next_id.fetchAdd(1, .monotonic)); + } +}; + +/// A tracing span that is generic over the log level and scope, which can be +/// configured via `std.Options`. When the scope or level has been disabled, this +/// type becomes zero sized and all methods become no-ops, allowing tracing to be +/// compiled out. +/// +/// ```zig +/// const span = log.span(.info, "begin request", .{}); +/// span.begin(); +/// defer span.end(); +/// ``` +/// +/// An initialized span can be moved where it needs to be, but once it has begun, +/// moving or copying a span is illegal behavior. +/// +/// When dealing with concurrent execution, to properly track spans in the task +/// that is running concurrently, the original span must be linked and unlinked to +/// the new executor. +/// +/// ```zig +/// std.thread.Spawn(.{}, struct { +/// fn myFn(span: *AnySpan) void { +/// span.link(); +/// defer span.unlink(); +/// +/// // new spans on this thread are linked to the original +/// } +/// }.myFn, .{ &span.any }); +/// ``` +pub fn ScopedSpan(comptime level: Level, comptime scope: @EnumLiteral(), comptime src: std.builtin.SourceLocation) type { + return if (!logEnabled(level, scope)) struct { + const Self = @This(); + + pub fn begin() Self { + return .{}; + } + pub fn end(_: *Self) void {} + } else struct { + const Self = @This(); + + id: SpanId, + prev: Span, + + pub fn begin() Self { + const id: SpanId = .createNext(); + const prev = thread_span; + thread_span = .{ + .id = id, + .vtable = &.{ + .suspendFn = @"suspend", + .resumeFn = @"resume", + .linkFn = link, + .unlinkFn = unlink, + }, + .userdata = undefined, + }; + trace(level, scope, src, .begin, thread_executor, &thread_span); + return .{ .id = id, .prev = prev }; + } + + pub fn end(self: *Self) void { + // Swap the previous span, that we stored from begin, + assert(thread_span.id != .none); + assert(thread_span.id == self.id); + trace(level, scope, src, .end, thread_executor, &thread_span); + thread_span = self.span; + self.* = undefined; + } + + fn @"suspend"(span_: *Span) Span { + assert(span_.id != .none); + assert(thread_span.id != .none); + assert(thread_span.id == span_.id); + trace(level, scope, src, .@"suspend", thread_executor, &thread_span); + const suspended = span_.*; + thread_span = .empty; + return suspended; + } + + fn @"resume"(span_: *Span) void { + assert(span_.id != .none); + assert(thread_span.id == .none); + thread_span = span_.*; + trace(level, scope, src, .@"resume", thread_executor, &thread_span); + } + + fn link(span_: *Span, executor: Executor) void { + assert(executor != .none); + assert(span_.id != .none); + assert(thread_executor == .none); + assert(thread_span.id == .none); + thread_executor = executor; + trace(level, scope, src, .link, thread_executor, span_); + } + + fn unlink(span_: *Span, executor: Executor) void { + assert(thread_executor != .none); + assert(thread_executor == executor); + assert(thread_span.id == .none); + trace(level, scope, src, .unlink, thread_executor, span_); + thread_executor = null; + } + }; +} + +pub const SpanEvent = enum { + /// An executor has begun work on this span. + begin, + /// An executor has completed work on this span. + end, + /// An executor has suspended work on this span. + @"suspend", + /// An executor has resumed work on this span. + @"resume", + /// An executor has started work requested within the span. + link, + /// An executor has stopped work requested within the span. + unlink, +}; + /// Returns a scoped logging namespace that logs all messages using the scope /// provided here. pub fn scoped(comptime scope: @EnumLiteral()) type { @@ -155,6 +381,15 @@ pub fn scoped(comptime scope: @EnumLiteral()) type { ) void { log(.debug, scope, format, args); } + + /// Initialize a new tracing span. The span must be explicitly begun + /// and ended after initialization. + pub fn span( + comptime level: Level, + comptime src: SourceLocation, + ) ScopedSpan(level, scope, src) { + return .begin(); + } }; } @@ -180,3 +415,7 @@ pub const info = default.info; /// Log a debug message using the default scope. This log level is intended to /// be used for messages which are only useful for debugging. pub const debug = default.debug; + +/// Initialize a new tracing span using the default scope. The span must be +/// explicitly begun and ended after initialization. +pub const span = default.span; diff --git a/lib/std/std.zig b/lib/std/std.zig index 5c500d3f55d3..15f61814a61d 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -129,6 +129,18 @@ pub const Options = struct { args: anytype, ) void = log.defaultLog, + /// Per-span userdata. Copied on every span/executor context change; keep it small. + SpanUserdata: type = void, + + traceFn: fn ( + comptime level: log.Level, + comptime scope: @EnumLiteral(), + comptime src: SourceLocation, + comptime event: log.SpanEvent, + executor: log.Executor, + span: *log.Span, + ) void = log.defaultTrace, + /// Overrides `std.heap.page_size_min`. page_size_min: ?usize = null, /// Overrides `std.heap.page_size_max`.