Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/std/Io/Threaded.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
const assert = std.debug.assert;
const posix = std.posix;
const log = std.log;
const Span = std.log.Span;
const Executor = std.log.Executor;

/// Thread-safe.
allocator: Allocator,
Expand Down Expand Up @@ -94,6 +97,7 @@ const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
cancel_tid: CancelId,
span: Span = .empty,

const Start = *const fn (*Closure) void;

Expand Down Expand Up @@ -203,14 +207,19 @@ fn join(t: *Threaded) void {
fn worker(t: *Threaded) void {
defer t.wait_group.finish();

const executor: Executor = .create();
defer executor.exit();

t.mutex.lock();
defer t.mutex.unlock();

while (true) {
while (t.run_queue.popFirst()) |closure_node| {
t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
executor.link(&closure.span);
closure.start(closure);
executor.unlink(&closure.span);
t.mutex.lock();
t.busy_count -= 1;
}
Expand Down Expand Up @@ -478,6 +487,7 @@ const AsyncClosure = struct {
.closure = .{
.cancel_tid = .none,
.start = start,
.span = log.thread_span,
},
.func = func,
.context_alignment = context_alignment,
Expand Down Expand Up @@ -623,6 +633,7 @@ const GroupClosure = struct {
// Even though we already know the task is canceled, we must still
// run the closure in case there are side effects.
}

current_closure = closure;
gc.func(group, gc.contextPointer());
current_closure = null;
Expand Down Expand Up @@ -664,6 +675,7 @@ const GroupClosure = struct {
.closure = .{
.cancel_tid = .none,
.start = start,
.span = log.thread_span,
},
.t = t,
.group = group,
Expand Down
239 changes: 239 additions & 0 deletions lib/std/log.zig
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
//! ```

const std = @import("std.zig");
const assert = std.debug.assert;
const builtin = @import("builtin");
const SourceLocation = std.builtin.SourceLocation;
const SpanUserdata = std.options.SpanUserdata;

pub const Level = enum {
/// Error: something has gone wrong. This might be recoverable or might
Expand Down Expand Up @@ -113,6 +116,229 @@ pub fn defaultLog(
stderr.print(format ++ "\n", args) catch return;
}

fn trace(
comptime level: log.Level,
comptime scope: @EnumLiteral(),
comptime src: SourceLocation,
comptime event: log.SpanEvent,
executor: log.Executor,
span: *log.Span,
) void {
if (comptime !logEnabled(level, scope)) return;
std.options.traceFn(level, scope, event, executor, span);
}

pub fn defaultTrace(
comptime level: log.Level,
comptime scope: @EnumLiteral(),
comptime src: SourceLocation,
comptime event: log.SpanEvent,
executor: log.Executor,
span: *log.Span,
) void {
_ = level;
_ = scope;
_ = event;
_ = executor;
_ = span;
}

/// This thread's currently running executor.
pub threadlocal var thread_executor: Executor = .none;

/// This thread's currently excuting span.
pub threadlocal var thread_span: Span = .empty;

/// An executor can be a thread, fiber, or whatever the std.Io implementation
/// decides it is. Internally it is represented by a monotonically increasing
/// integer, but that is an implementation detail, and should not be relied
/// upon.
pub const Executor = enum(u64) {
none = std.math.maxInt(u64),
_,

/// A globally unique, monotonically increasing executor identifier.
var next_id: std.atomic.Value(u64) = .init(0);

pub fn create() Executor {
return @enumFromInt(next_id.fetchAdd(1, .monotonic));
}

pub fn link(self: Executor, span_: *Span) void {
if (span_.id == .none) return;
span_.vtable.linkFn(span_, self);
}

pub fn unlink(self: Executor, span_: *Span) void {
if (span_.id == .none) return;
span_.vtable.unlinkFn(span_, self);
}
};

/// An execution span.
pub const Span = struct {
id: SpanId,
vtable: *const VTable,
userdata: SpanUserdata,

pub const empty: Span = .{
.id = .none,
.vtable = undefined,
.userdata = undefined,
};

pub const VTable = struct {
suspendFn: *const fn (self: *Span) void,
resumeFn: *const fn (self: *Span) void,
linkFn: *const fn (self: *Span, executor: Executor) void,
unlinkFn: *const fn (self: *Span, executor: Executor) void,
};

pub fn @"suspend"(self: *Span) Span {
if (self.id == .none) return empty;
return self.vtable.suspendFn(self);
}

pub fn @"resume"(self: *Span) void {
if (self.id == .none) return;
self.vtable.resumeFn(self);
}
};

/// Internally this is represented by a monotonically increasing integer, but
/// that is an implementation detail, and should not be relied upon.
pub const SpanId = enum(u64) {
none = std.math.maxInt(u64),
_,

/// A globally unique, monotonically increasing `Span` identifier.
var next_id: std.atomic.Value(u64) = .init(0);

/// Acquires a new `SpanId`.
pub fn createNext() SpanId {
return @enumFromInt(next_id.fetchAdd(1, .monotonic));
}
};

/// A tracing span that is generic over the log level and scope, which can be
/// configured via `std.Options`. When the scope or level has been disabled, this
/// type becomes zero sized and all methods become no-ops, allowing tracing to be
/// compiled out.
///
/// ```zig
/// const span = log.span(.info, "begin request", .{});
/// span.begin();
/// defer span.end();
/// ```
///
/// An initialized span can be moved where it needs to be, but once it has begun,
/// moving or copying a span is illegal behavior.
///
/// When dealing with concurrent execution, to properly track spans in the task
/// that is running concurrently, the original span must be linked and unlinked to
/// the new executor.
///
/// ```zig
/// std.thread.Spawn(.{}, struct {
/// fn myFn(span: *AnySpan) void {
/// span.link();
/// defer span.unlink();
///
/// // new spans on this thread are linked to the original
/// }
/// }.myFn, .{ &span.any });
/// ```
pub fn ScopedSpan(comptime level: Level, comptime scope: @EnumLiteral(), comptime src: std.builtin.SourceLocation) type {
return if (!logEnabled(level, scope)) struct {
const Self = @This();

pub fn begin() Self {
return .{};
}
pub fn end(_: *Self) void {}
} else struct {
const Self = @This();

id: SpanId,
prev: Span,

pub fn begin() Self {
const id: SpanId = .createNext();
const prev = thread_span;
thread_span = .{
.id = id,
.vtable = &.{
.suspendFn = @"suspend",
.resumeFn = @"resume",
.linkFn = link,
.unlinkFn = unlink,
},
.userdata = undefined,
};
trace(level, scope, src, .begin, thread_executor, &thread_span);
return .{ .id = id, .prev = prev };
}

pub fn end(self: *Self) void {
// Swap the previous span, that we stored from begin,
assert(thread_span.id != .none);
assert(thread_span.id == self.id);
trace(level, scope, src, .end, thread_executor, &thread_span);
thread_span = self.span;
self.* = undefined;
}

fn @"suspend"(span_: *Span) Span {
assert(span_.id != .none);
assert(thread_span.id != .none);
assert(thread_span.id == span_.id);
trace(level, scope, src, .@"suspend", thread_executor, &thread_span);
const suspended = span_.*;
thread_span = .empty;
return suspended;
}

fn @"resume"(span_: *Span) void {
assert(span_.id != .none);
assert(thread_span.id == .none);
thread_span = span_.*;
trace(level, scope, src, .@"resume", thread_executor, &thread_span);
}

fn link(span_: *Span, executor: Executor) void {
assert(executor != .none);
assert(span_.id != .none);
assert(thread_executor == .none);
assert(thread_span.id == .none);
thread_executor = executor;
trace(level, scope, src, .link, thread_executor, span_);
}

fn unlink(span_: *Span, executor: Executor) void {
assert(thread_executor != .none);
assert(thread_executor == executor);
assert(thread_span.id == .none);
trace(level, scope, src, .unlink, thread_executor, span_);
thread_executor = null;
}
};
}

pub const SpanEvent = enum {
/// An executor has begun work on this span.
begin,
/// An executor has completed work on this span.
end,
/// An executor has suspended work on this span.
@"suspend",
/// An executor has resumed work on this span.
@"resume",
/// An executor has started work requested within the span.
link,
/// An executor has stopped work requested within the span.
unlink,
};

/// Returns a scoped logging namespace that logs all messages using the scope
/// provided here.
pub fn scoped(comptime scope: @EnumLiteral()) type {
Expand Down Expand Up @@ -155,6 +381,15 @@ pub fn scoped(comptime scope: @EnumLiteral()) type {
) void {
log(.debug, scope, format, args);
}

/// Initialize a new tracing span. The span must be explicitly begun
/// and ended after initialization.
pub fn span(
comptime level: Level,
comptime src: SourceLocation,
) ScopedSpan(level, scope, src) {
return .begin();
}
};
}

Expand All @@ -180,3 +415,7 @@ pub const info = default.info;
/// Log a debug message using the default scope. This log level is intended to
/// be used for messages which are only useful for debugging.
pub const debug = default.debug;

/// Initialize a new tracing span using the default scope. The span must be
/// explicitly begun and ended after initialization.
pub const span = default.span;
12 changes: 12 additions & 0 deletions lib/std/std.zig
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ pub const Options = struct {
args: anytype,
) void = log.defaultLog,

/// Per-span userdata. Copied on every span/executor context change; keep it small.
SpanUserdata: type = void,

traceFn: fn (
comptime level: log.Level,
comptime scope: @EnumLiteral(),
comptime src: SourceLocation,
comptime event: log.SpanEvent,
executor: log.Executor,
span: *log.Span,
) void = log.defaultTrace,

/// Overrides `std.heap.page_size_min`.
page_size_min: ?usize = null,
/// Overrides `std.heap.page_size_max`.
Expand Down