Producing values

Market Analyzer columns

Producing values

The value source is the working half of a column: one instance per (row, column) that wires up to a data origin, pushes cell values into the grid through a sink, and cleans up on disposal. This chapter covers the source lifecycle, the services it draws on, and the threading rules that keep live data callbacks fast and correct.

IColumnValueSource: the lifecycle

A source is owned by the row's actor. Its lifecycle hooks always run on that single actor thread — the engine never invokes two hooks on the same source concurrently. This is the same single-threaded discipline the Pipeline's indicator contract enforces, so you never need locks for your own state.

IColumnValueSource.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

public interface IColumnValueSource : IAsyncDisposable
{
    // Wire the source up against the engine + row context. Called
    // exactly once, before any value is observed. Long-running work
    // (subscribing to live feeds, requesting backfill) happens here or
    // is dispatched via the actor's scheduler; it MUST NOT block.
    Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken);
}
  • StartAsync is called exactly once, before any value is observed. Subscribe to your feed or kick off a backfill here. Do not block the caller — return promptly; if work is long-running, dispatch it. Honour the CancellationToken for any awaited work.
  • DisposeAsync (from IAsyncDisposable) is your teardown. Unhook every subscription, timer and event you registered. The source is disposed on row removal or panel dispose.
Everything you subscribe to must be unhooked in DisposeAsync. A live tick subscription that outlives its row leaks the row's memory and keeps publishing into a dead sink. Dispose is the single cleanup point — treat it as mandatory.

Publishing: IColumnValueSink & CellValue

The sink is the engine-owned callback through which a source publishes new cell payloads. You call Publish with an immutable CellValue; the engine queues it and returns immediately.

IColumnValueSink.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

public interface IColumnValueSink
{
    // Publish a new value for this column on this row. value may be
    // null to express "Initializing..." / "no data yet".
    void Publish(CellValue value);
}

public readonly record struct CellValue(CellState State, object? RawValue, string? ErrorMessage = null)
{
    public static CellValue Ready(object? value)     => new(CellState.Ready, value);
    public static CellValue Initializing             { get; } = new(CellState.Initializing, null);
    public static CellValue Error(string message)    => new(CellState.Error, null, message);
}

public enum CellState : byte
{
    Initializing = 0,   // source has not produced its first value yet (warming up)
    Ready        = 1,   // source has produced a valid value
    Error        = 2,   // source surfaced an error; cell shows an error indicator
}

The three factory members map to the three phases every cell goes through, and the grid renders each one distinctly — placeholder, value, or red badge — without any per-column branching from you:

Factory State When to use it
CellValue.Initializing Initializing Warming up — subscribed but no value yet. The grid shows a placeholder.
CellValue.Ready(value) Ready A valid value. Pass null for a deliberately blank cell (e.g. a venue that lacks this fact).
CellValue.Error(message) Error A configuration or data fault the user should see. The grid shows an error indicator carrying your message.
Blank vs. error. A missing host service or a venue that simply doesn't carry a fact is a blank cell — publish CellValue.Ready(null), not an error. Reserve Error for genuine misconfiguration the user must act on (no data provider wired, an unresolvable symbol).

Threading rules

The contract is deliberately generous about where Publish may be called from, and strict about what it must never do:

  • Safe from the source's actor thread — the normal path, e.g. from inside StartAsync.
  • Safe from arbitrary background threads — a live tick callback, a timer tick. The engine marshals the payload onto the row actor before any downstream effect fires, so you don't.
  • Never blocking — the engine queues the payload and returns. This keeps live data callbacks (Rithmic, crypto) latency-free. Don't do heavy work or take locks inside Publish; compute the value, then publish.
You never marshal threads yourself. Because the engine marshals, a tick handler can call sink.Publish(...) directly. Your own per-source mutable state (running sums, last bid/ask) is touched only from those callbacks, and because lifecycle hooks are serialized per source you can keep it as plain fields — the one caveat is that a feed callback and DisposeAsync can race, so guard against publishing after dispose if your feed can still fire mid-teardown.

IColumnContext: the service surface

The context is the engine service surface handed to every CreateSource call. Sources resolve only the services they need; an indicator column never touches the trading service, a market-data column never touches the indicator factory. The core services are first-class members; richer host services come through GetService so the SDK never references the engine or trading layers.

IColumnContext.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

public interface IColumnContext
{
    // Engine clock. Tests inject a fake; production uses UTC system time.
    IMarketAnalyzerClock Clock { get; }

    // Resolve the data provider that owns a row, or null when none is
    // wired (publish a CellState.Error cell so the user sees it).
    IDataProvider? ResolveDataProvider(InstrumentRef instrument);

    // User-visible time-zone for daily aggregates. Defaults to UTC;
    // the host overrides per workspace.
    TimeZoneInfo SessionTimeZone { get; }

    // Pluggable session-day boundary resolver.
    ISessionResolver SessionResolver { get; }

    // Resolve an OPTIONAL host service by type, or null when the host
    // doesn't offer it. Prefer the typed GetService() extension.
    object? GetService(Type serviceType) => null;

    // Translate the user-typed symbol to the provider-native form for
    // backfill / live subscription. Default: identity. Idempotent.
    string ResolveBackfillSymbol(InstrumentRef instrument) => instrument.SymbolId;
}

// Typed convenience over GetService.
public static class ColumnContextExtensions
{
    public static T? GetService(this IColumnContext context) where T : class;
}

// Abstraction over DateTime.UtcNow; tests inject a controllable fake.
public interface IMarketAnalyzerClock
{
    DateTime UtcNow { get; }
}
Member Use it for
Clock.UtcNow Any "now" you need — never call DateTime.UtcNow directly, so tests can drive a fake clock.
ResolveDataProvider(instrument) Get the IDataProvider for the row's live ticks, backfill and feeds. Returns null when nothing is wired — publish an Error cell.
SessionTimeZone The user-visible time-zone for rendering / bucketing daily aggregates.
SessionResolver Find the start of the current trading session for a session-anchored column.
GetService(Type) / GetService<T>() Optional richer services (descriptions, calendars, your own feed). May be null.
ResolveBackfillSymbol(instrument) Translate a user-typed root ("ES") into the provider-native contract ("ESM6") before any backfill or subscribe.

InstrumentRef & symbol translation

The row a source serves is identified by an InstrumentRef — a value-equality key of a provider-native symbol plus an optional provider key.

InstrumentRef.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

public readonly record struct InstrumentRef(string SymbolId, string? ProviderKey = null)
{
    public override string ToString();   // "rithmic:MNQM6" or just "MNQM6" when ProviderKey is null

    // Throwing validator; null/whitespace symbols never reach the actor model.
    public static InstrumentRef Validated(string symbolId, string? providerKey = null);
}
  • SymbolId is the provider-native identifier exactly as the provider's tick/quote streams emit it — "MNQM6", "BTC-USD". Treat it as opaque.
  • ProviderKey is the stable IDataProvider.Key ("rithmic", "coinbase"); null means "resolve through the registry's default provider".
Always pass ResolveBackfillSymbol(instrument) to feeds, not instrument.SymbolId. Rows can be added by paths (toolbar picker, bulk paste, workspace restore) that hold a user-typed root rather than a front-month contract. Routing through the context's translation keeps every source honest with one call — and it is idempotent, so passing an already-native symbol is safe.

Sessions: ISessionResolver & the clock

Session-aware columns (today's open/high/low/volume, net change vs. previous close, session VWAP) must anchor to the trading-session boundary — never compute it themselves. The resolver maps an instrument plus a moment to the UTC instant the current session began.

ISessionResolver.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

public interface ISessionResolver
{
    // The UTC start of the trading session that momentUtc falls inside,
    // for instrument. Pure function: same inputs => same output, no
    // hidden state. Two timestamps in the same session return the same
    // start; the first timestamp past the next session's start returns
    // that later start.
    DateTime GetSessionStartUtc(InstrumentRef instrument, DateTime momentUtc);
}

The host ships a generic resolver covering CME ETH, NYSE RTH, FX 24-hour weeks and crypto 24/7, plugged in by configuration. To detect a session roll, compare the session start for the current tick's timestamp against the one you cached; when it advances, reset your running session state (sums, highs, lows) and start over.

Optional services via GetService

GetService<T>() exposes optional host seams without coupling your plugin to the host assembly. It returns null when the host doesn't offer the service — a missing service is a host-configuration fact, never an exception. One such SDK-visible seam is the instrument description source:

IInstrumentInfoSources.csnamespace TradeStrike.Pipeline.MarketAnalyzer;

// Optional host service: human-readable instrument descriptions
// (e.g. "Micro E-mini Nasdaq-100").
public interface IInstrumentDescriptionSource
{
    // Description for a NATIVE symbol (post root->contract translation),
    // or null when unknown.
    string? GetDescription(string nativeSymbol);
}
DescriptionSource.cspublic Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken)
{
    var descriptions = _context.GetService<IInstrumentDescriptionSource>();
    if (descriptions is null)
    {
        sink.Publish(CellValue.Ready(null));   // host offers no descriptions: blank cell, not an error
        return Task.CompletedTask;
    }

    string native = _context.ResolveBackfillSymbol(_instrument);
    sink.Publish(CellValue.Ready(descriptions.GetDescription(native)));   // null -> blank
    return Task.CompletedTask;
}
Degrade gracefully. Absence of an optional service is the norm on headless or test hosts. Publish a blank Ready(null) cell, not an Error — the user sees an empty column, the same way NinjaTrader shows blanks for fundamentals on futures.

A complete session-VWAP source

This source ties the whole chapter together: it resolves the data provider, translates the symbol, subscribes to live ticks on a background-thread feed, accumulates a session-anchored VWAP, resets cleanly on each session roll via the ISessionResolver, and unhooks in DisposeAsync. Pair it with a definition whose DataType is ColumnDataType.Price (as in the previous chapter).

SessionVwapSource.csusing System;
using System.Threading;
using System.Threading.Tasks;
using TradeStrike.Pipeline.MarketAnalyzer;
using TradeStrike.Pipeline.Providers;
using TradeStrike.Pipeline.Ticks;

namespace MyPlugin.Columns;

internal sealed class SessionVwapSource : IColumnValueSource
{
    private readonly InstrumentRef _instrument;
    private readonly IColumnContext _context;

    private IDisposable? _subscription;
    private DateTime _sessionStartUtc = DateTime.MinValue;
    private double _priceVolume;   // running sum of price * size
    private long   _volume;        // running sum of size
    private volatile bool _disposed;

    public SessionVwapSource(InstrumentRef instrument, IColumnContext context)
    {
        _instrument = instrument;
        _context = context;
    }

    public Task StartAsync(IColumnValueSink sink, CancellationToken cancellationToken)
    {
        IDataProvider? provider = _context.ResolveDataProvider(_instrument);
        if (provider?.LiveTicks is null)
        {
            sink.Publish(CellValue.Error("No live tick feed for this instrument."));
            return Task.CompletedTask;
        }

        sink.Publish(CellValue.Initializing);                 // warming up
        string symbol = _context.ResolveBackfillSymbol(_instrument);

        // Live feeds may call back on any thread; sink.Publish marshals for us.
        _subscription = provider.LiveTicks.Subscribe(symbol, tick =>
        {
            if (_disposed) return;
            if ((tick.Flags & TickFlags.Trade) == 0) return;  // VWAP counts trades only

            // Reset the accumulator when the trading session rolls.
            DateTime sessionStart =
                _context.SessionResolver.GetSessionStartUtc(_instrument, tick.ExchangeTimestampUtc);
            if (sessionStart != _sessionStartUtc)
            {
                _sessionStartUtc = sessionStart;
                _priceVolume = 0;
                _volume = 0;
            }

            _priceVolume += tick.Price * tick.Size;
            _volume      += tick.Size;
            if (_volume > 0)
                sink.Publish(CellValue.Ready(_priceVolume / _volume));
        });

        return Task.CompletedTask;
    }

    public ValueTask DisposeAsync()
    {
        _disposed = true;
        _subscription?.Dispose();
        return ValueTask.CompletedTask;
    }
}

Notice what the source does not do: no locks (its fields are touched only from the serialized feed callback), no manual thread marshalling (the sink handles it), no hand-rolled session math (the resolver owns it), and no raw DateTime.UtcNow (it works off the tick's exchange timestamp). That is the shape of a well-behaved value source.