Producing values
Producing values
The value source is the working half of a column: one instance per (row, column) that wires up to a data origin, pushes cell values into the grid through a sink, and cleans up on disposal. This chapter covers the source lifecycle, the services it draws on, and the threading rules that keep live data callbacks fast and correct.
On this page
IColumnValueSource: the lifecycle
A source is owned by the row's actor. Its lifecycle hooks always run on that single actor thread — the engine never invokes two hooks on the same source concurrently. This is the same single-threaded discipline the Pipeline's indicator contract enforces, so you never need locks for your own state.
IColumnValueSource.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
public interface IColumnValueSource : IAsyncDisposable
{
// Wire the source up against the engine + row context. Called
// exactly once, before any value is observed. Long-running work
// (subscribing to live feeds, requesting backfill) happens here or
// is dispatched via the actor's scheduler; it MUST NOT block.
Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken);
}
-
StartAsyncis called exactly once, before any value is observed. Subscribe to your feed or kick off a backfill here. Do not block the caller — return promptly; if work is long-running, dispatch it. Honour theCancellationTokenfor any awaited work. -
DisposeAsync(fromIAsyncDisposable) is your teardown. Unhook every subscription, timer and event you registered. The source is disposed on row removal or panel dispose.
DisposeAsync. A live tick
subscription that outlives its row leaks the row's memory and keeps publishing into a dead sink. Dispose is the
single cleanup point — treat it as mandatory.Publishing: IColumnValueSink & CellValue
The sink is the engine-owned callback through which a source publishes new cell payloads. You call
Publish with an immutable CellValue; the engine queues it and returns immediately.
IColumnValueSink.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
public interface IColumnValueSink
{
// Publish a new value for this column on this row. value may be
// null to express "Initializing..." / "no data yet".
void Publish(CellValue value);
}
public readonly record struct CellValue(CellState State, object? RawValue, string? ErrorMessage = null)
{
public static CellValue Ready(object? value) => new(CellState.Ready, value);
public static CellValue Initializing { get; } = new(CellState.Initializing, null);
public static CellValue Error(string message) => new(CellState.Error, null, message);
}
public enum CellState : byte
{
Initializing = 0, // source has not produced its first value yet (warming up)
Ready = 1, // source has produced a valid value
Error = 2, // source surfaced an error; cell shows an error indicator
}
The three factory members map to the three phases every cell goes through, and the grid renders each one distinctly — placeholder, value, or red badge — without any per-column branching from you:
| Factory | State | When to use it |
|---|---|---|
CellValue.Initializing |
Initializing |
Warming up — subscribed but no value yet. The grid shows a placeholder. |
CellValue.Ready(value) |
Ready |
A valid value. Pass null for a deliberately blank cell (e.g. a venue that lacks this fact). |
CellValue.Error(message) |
Error |
A configuration or data fault the user should see. The grid shows an error indicator carrying your message. |
CellValue.Ready(null), not an error. Reserve Error for genuine
misconfiguration the user must act on (no data provider wired, an unresolvable symbol).Threading rules
The contract is deliberately generous about where Publish may be called from, and strict about
what it must never do:
-
Safe from the source's actor thread — the normal path, e.g. from inside
StartAsync. - Safe from arbitrary background threads — a live tick callback, a timer tick. The engine marshals the payload onto the row actor before any downstream effect fires, so you don't.
-
Never blocking — the engine queues the payload and returns. This keeps live data callbacks
(Rithmic, crypto) latency-free. Don't do heavy work or take locks inside
Publish; compute the value, then publish.
sink.Publish(...) directly. Your own per-source mutable state (running sums, last bid/ask) is touched
only from those callbacks, and because lifecycle hooks are serialized per source you can keep it as plain fields
— the one caveat is that a feed callback and DisposeAsync can race, so guard against publishing
after dispose if your feed can still fire mid-teardown.IColumnContext: the service surface
The context is the engine service surface handed to every CreateSource call. Sources resolve only
the services they need; an indicator column never touches the trading service, a market-data column never
touches the indicator factory. The core services are first-class members; richer host services come through
GetService so the SDK never references the engine or trading layers.
IColumnContext.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
public interface IColumnContext
{
// Engine clock. Tests inject a fake; production uses UTC system time.
IMarketAnalyzerClock Clock { get; }
// Resolve the data provider that owns a row, or null when none is
// wired (publish a CellState.Error cell so the user sees it).
IDataProvider? ResolveDataProvider(InstrumentRef instrument);
// User-visible time-zone for daily aggregates. Defaults to UTC;
// the host overrides per workspace.
TimeZoneInfo SessionTimeZone { get; }
// Pluggable session-day boundary resolver.
ISessionResolver SessionResolver { get; }
// Resolve an OPTIONAL host service by type, or null when the host
// doesn't offer it. Prefer the typed GetService() extension.
object? GetService(Type serviceType) => null;
// Translate the user-typed symbol to the provider-native form for
// backfill / live subscription. Default: identity. Idempotent.
string ResolveBackfillSymbol(InstrumentRef instrument) => instrument.SymbolId;
}
// Typed convenience over GetService.
public static class ColumnContextExtensions
{
public static T? GetService(this IColumnContext context) where T : class;
}
// Abstraction over DateTime.UtcNow; tests inject a controllable fake.
public interface IMarketAnalyzerClock
{
DateTime UtcNow { get; }
}
| Member | Use it for |
|---|---|
Clock.UtcNow |
Any "now" you need — never call DateTime.UtcNow directly, so tests can drive a fake clock. |
ResolveDataProvider(instrument) |
Get the IDataProvider for the row's live ticks, backfill and feeds. Returns null when nothing is wired — publish an Error cell. |
SessionTimeZone |
The user-visible time-zone for rendering / bucketing daily aggregates. |
SessionResolver |
Find the start of the current trading session for a session-anchored column. |
GetService(Type) / GetService<T>()
|
Optional richer services (descriptions, calendars, your own feed). May be null. |
ResolveBackfillSymbol(instrument) |
Translate a user-typed root ("ES") into the provider-native contract ("ESM6") before any backfill or subscribe. |
InstrumentRef & symbol translation
The row a source serves is identified by an InstrumentRef — a value-equality key of a
provider-native symbol plus an optional provider key.
InstrumentRef.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
public readonly record struct InstrumentRef(string SymbolId, string? ProviderKey = null)
{
public override string ToString(); // "rithmic:MNQM6" or just "MNQM6" when ProviderKey is null
// Throwing validator; null/whitespace symbols never reach the actor model.
public static InstrumentRef Validated(string symbolId, string? providerKey = null);
}
-
SymbolIdis the provider-native identifier exactly as the provider's tick/quote streams emit it —"MNQM6","BTC-USD". Treat it as opaque. -
ProviderKeyis the stableIDataProvider.Key("rithmic","coinbase");nullmeans "resolve through the registry's default provider".
ResolveBackfillSymbol(instrument) to feeds, not
instrument.SymbolId. Rows can be added by paths (toolbar picker, bulk paste, workspace restore)
that hold a user-typed root rather than a front-month contract. Routing through the context's translation keeps
every source honest with one call — and it is idempotent, so passing an already-native symbol is safe.Sessions: ISessionResolver & the clock
Session-aware columns (today's open/high/low/volume, net change vs. previous close, session VWAP) must anchor to the trading-session boundary — never compute it themselves. The resolver maps an instrument plus a moment to the UTC instant the current session began.
ISessionResolver.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
public interface ISessionResolver
{
// The UTC start of the trading session that momentUtc falls inside,
// for instrument. Pure function: same inputs => same output, no
// hidden state. Two timestamps in the same session return the same
// start; the first timestamp past the next session's start returns
// that later start.
DateTime GetSessionStartUtc(InstrumentRef instrument, DateTime momentUtc);
}
The host ships a generic resolver covering CME ETH, NYSE RTH, FX 24-hour weeks and crypto 24/7, plugged in by configuration. To detect a session roll, compare the session start for the current tick's timestamp against the one you cached; when it advances, reset your running session state (sums, highs, lows) and start over.
Optional services via GetService
GetService<T>() exposes optional host seams without coupling your plugin to the host
assembly. It returns null when the host doesn't offer the service — a missing service is a
host-configuration fact, never an exception. One such SDK-visible seam is the instrument description source:
IInstrumentInfoSources.csnamespace TradeStrike.Pipeline.MarketAnalyzer;
// Optional host service: human-readable instrument descriptions
// (e.g. "Micro E-mini Nasdaq-100").
public interface IInstrumentDescriptionSource
{
// Description for a NATIVE symbol (post root->contract translation),
// or null when unknown.
string? GetDescription(string nativeSymbol);
}
DescriptionSource.cspublic Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken)
{
var descriptions = _context.GetService<IInstrumentDescriptionSource>();
if (descriptions is null)
{
sink.Publish(CellValue.Ready(null)); // host offers no descriptions: blank cell, not an error
return Task.CompletedTask;
}
string native = _context.ResolveBackfillSymbol(_instrument);
sink.Publish(CellValue.Ready(descriptions.GetDescription(native))); // null -> blank
return Task.CompletedTask;
}
Ready(null) cell, not an Error — the user sees an empty column, the
same way NinjaTrader shows blanks for fundamentals on futures.A complete session-VWAP source
This source ties the whole chapter together: it resolves the data provider, translates the symbol, subscribes
to live ticks on a background-thread feed, accumulates a session-anchored VWAP, resets cleanly on each session
roll via the ISessionResolver, and unhooks in DisposeAsync. Pair it with a definition
whose DataType is ColumnDataType.Price (as in
the previous chapter).
SessionVwapSource.csusing System;
using System.Threading;
using System.Threading.Tasks;
using TradeStrike.Pipeline.MarketAnalyzer;
using TradeStrike.Pipeline.Providers;
using TradeStrike.Pipeline.Ticks;
namespace MyPlugin.Columns;
internal sealed class SessionVwapSource : IColumnValueSource
{
private readonly InstrumentRef _instrument;
private readonly IColumnContext _context;
private IDisposable? _subscription;
private DateTime _sessionStartUtc = DateTime.MinValue;
private double _priceVolume; // running sum of price * size
private long _volume; // running sum of size
private volatile bool _disposed;
public SessionVwapSource(InstrumentRef instrument, IColumnContext context)
{
_instrument = instrument;
_context = context;
}
public Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken)
{
IDataProvider? provider = _context.ResolveDataProvider(_instrument);
if (provider?.LiveTicks is null)
{
sink.Publish(CellValue.Error("No live tick feed for this instrument."));
return Task.CompletedTask;
}
sink.Publish(CellValue.Initializing); // warming up
string symbol = _context.ResolveBackfillSymbol(_instrument);
// Live feeds may call back on any thread; sink.Publish marshals for us.
_subscription = provider.LiveTicks.Subscribe(symbol, tick =>
{
if (_disposed) return;
if ((tick.Flags & TickFlags.Trade) == 0) return; // VWAP counts trades only
// Reset the accumulator when the trading session rolls.
DateTime sessionStart =
_context.SessionResolver.GetSessionStartUtc(_instrument, tick.ExchangeTimestampUtc);
if (sessionStart != _sessionStartUtc)
{
_sessionStartUtc = sessionStart;
_priceVolume = 0;
_volume = 0;
}
_priceVolume += tick.Price * tick.Size;
_volume += tick.Size;
if (_volume > 0)
sink.Publish(CellValue.Ready(_priceVolume / _volume));
});
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
_disposed = true;
_subscription?.Dispose();
return ValueTask.CompletedTask;
}
}
Notice what the source does not do: no locks (its fields are touched only from the serialized feed
callback), no manual thread marshalling (the sink handles it), no hand-rolled session math (the resolver owns
it), and no raw DateTime.UtcNow (it works off the tick's exchange timestamp). That is the shape of
a well-behaved value source.