Skip to content

Events API

dcc_mcp_core.EventBus

EventBus

Thread-safe publish/subscribe event bus backed by DashMap.

Constructor

python
from dcc_mcp_core import EventBus
bus = EventBus()

Methods

MethodReturnsDescription
subscribe(event_name, callback)intSubscribe a callable. Returns subscriber ID
before(event_name, callback)intRegister a blocking veto hook for a supported lifecycle event
unsubscribe(event_name, subscriber_id)boolUnsubscribe by ID. Returns True if found
unsubscribe_before(event_name, subscriber_id)boolRemove a before hook by ID
publish(event_name, **kwargs)NoneCall all matching subscribers with kwargs
emit(event_name, source=None, correlation=None, attributes=None)dictEmit a structured event envelope and pass it to all matching subscribers
veto(reason, code="vetoed")dictBuild a veto payload for a before hook
vetoable_events()list[str]Return lifecycle events that accept before hooks

Dunder Methods

MethodDescription
__repr__EventBus(subscriptions=N)

Behavior

  • Subscribers receive keyword arguments from publish(event_name, **kwargs)
  • Subscribers receive one event-envelope dict from emit(...)
  • Event names support exact matches, prefix.* wildcards, and a catch-all *
  • Exceptions in subscribers are logged via tracing but do not propagate
  • Before hooks support only skill.loading, tool.dispatched, resource.subscribed, and client.initialize
  • Before hooks return None/False to allow, or a string/dict/veto(...) payload to reject
  • Callbacks are collected before invocation to avoid DashMap deadlocks
  • Multiple subscribers per event are supported
  • Subscriber IDs are monotonically increasing (starting at 1)

Example

python
bus = EventBus()

def on_action(action_name=None, **kwargs):
    print(f"Action: {action_name}")

sid = bus.subscribe("action.executed", on_action)
bus.publish("action.executed", action_name="create_sphere")
bus.unsubscribe("action.executed", sid)

Structured Event Envelope

python
events = []
bus.subscribe("tool.*", lambda event: events.append(event))

event = bus.emit(
    "tool.completed",
    source={"dcc_type": "maya"},
    correlation={"request_id": "req-123"},
    attributes={"tool_slug": "maya_scene__open", "result_success": True},
)

assert event["schema_version"] == 1
assert events == [event]

Before Hook Veto

python
def policy(event):
    if event["attributes"]["tool_slug"] == "delete_scene":
        return EventBus.veto("destructive tools are disabled", "policy_denied")
    return None

sid = bus.before("tool.dispatched", policy)
bus.unsubscribe_before("tool.dispatched", sid)

Tool vetoes surface as EVENT_VETOED dispatch errors and tool.failed events with error_kind="event_vetoed", veto_code, and veto_reason.

Envelope fields:

FieldTypeDescription
schema_versionintEvent envelope schema version, currently 1
namestrDotted event name
idstrOpaque event id prefixed with ev_
timestamp_nsintUnix timestamp in nanoseconds
sourcedictEmitter identity such as dcc_type
correlationdictRequest/session/trace correlation fields when available
attributesdictEvent-specific payload

For tool.completed, attributes.result_success follows the handler output's success boolean when present. If the output has no success field, a handler that returned normally is treated as successful.

Standalone Server Webhooks

dcc-mcp-server can forward structured EventBus envelopes to HTTP webhooks. Set DCC_MCP_WEBHOOKS_CONFIG to a YAML file containing webhooks entries with name, url, events, optional headers, optional delivery retry settings, optional dotted-path filters, and optional payload_template. Delivery is asynchronous and bounded; when all attempts fail the server emits webhook.delivery_failed on the same EventBus.


ToolRecorder

Records per-tool execution time and success/failure counters. Use this to collect performance telemetry for any tools your code executes.

Constructor

python
from dcc_mcp_core import ToolRecorder

recorder = ToolRecorder("my-service")
ParameterTypeDescription
scopestrLogical name for this recorder instance (e.g. service or module name)

Methods

MethodReturnsDescription
start(action_name, dcc_name)RecordingGuardStart timing a tool; returns a RAII guard
metrics(action_name)ToolMetrics | NoneAggregated metrics for a specific tool; None if no data
all_metrics()list[ToolMetrics]Aggregated metrics for all recorded actions
reset()NoneClear all in-memory statistics

Example

python
from dcc_mcp_core import ToolRecorder

recorder = ToolRecorder("maya-skill-server")

# Manual guard usage
guard = recorder.start("create_sphere", "maya")
try:
    # ... do work ...
    guard.finish(success=True)
except Exception:
    guard.finish(success=False)
    raise

# Context manager usage (success=True if no exception)
with recorder.start("delete_mesh", "maya"):
    pass  # work here

# Query metrics
m = recorder.metrics("create_sphere")
if m:
    print(f"calls={m.invocation_count}, success_rate={m.success_rate():.2%}")
    print(f"avg={m.avg_duration_ms:.1f}ms  p95={m.p95_duration_ms:.1f}ms")

RecordingGuard

RAII guard returned by ToolRecorder.start(). Automatically records the duration and outcome.

Methods

MethodReturnsDescription
finish(success)NoneCommit the recording with the given success flag
__enter__RecordingGuardContext manager entry
__exit__NoneContext manager exit (success=True when no exception was raised)

ToolMetrics

Read-only snapshot of per-tool performance metrics. Obtained from ToolRecorder.metrics() or ToolRecorder.all_metrics().

Properties

PropertyTypeDescription
action_namestrTool this metric belongs to
invocation_countintTotal number of calls recorded
success_countintNumber of successful calls
failure_countintNumber of failed calls
avg_duration_msfloatMean execution time in milliseconds
p95_duration_msfloat95th-percentile execution time in milliseconds
p99_duration_msfloat99th-percentile execution time in milliseconds

Methods

MethodReturnsDescription
success_rate()floatSuccess ratio in [0.0, 1.0]

Example

python
recorder = ToolRecorder("server")

for _ in range(10):
    with recorder.start("ping", "maya"):
        pass

all_m = recorder.all_metrics()
for m in all_m:
    print(
        f"{m.action_name}: "
        f"{m.invocation_count} calls, "
        f"{m.success_rate():.0%} success, "
        f"avg {m.avg_duration_ms:.1f}ms"
    )

Released under the MIT License.