KurrentDB Projections Core Architecture
KurrentDB Projections Core Architecture
This document describes the internal architecture of the projections subsystem in KurrentDB, covering management endpoints, event processing flow, reader infrastructure, and event emission.
Table of Contents
- Projections Management Endpoints and Flow
- Event Processing Pipeline
- Readers and Event Retrieval
- How Projections Produce New Events
1. Projections Management Endpoints and Flow
1.1 HTTP Endpoints
Defined in src/KurrentDB.Projections.Core/Services/Http/ProjectionsController.cs. All routes are registered in SubscribeCore():
| Method | Route | Operation |
|---|---|---|
| GET | /projections | List all projections |
| POST | /projections/restart | Restart projection subsystem |
| GET | /projections/any | List all projections (any mode) |
| GET | /projections/all-non-transient | List non-transient projections |
| GET | /projections/transient | List transient projections |
| GET | /projections/onetime | List one-time projections |
| GET | /projections/continuous | List continuous projections |
| POST | /projections/transient | Create transient projection |
| POST | /projections/onetime | Create one-time projection |
| POST | /projections/continuous | Create continuous projection |
| GET | /projection/{name}/query | Get projection query/definition |
| PUT | /projection/{name}/query | Update projection query |
| GET | /projection/{name} | Get projection status |
| DELETE | /projection/{name} | Delete projection |
| GET | /projection/{name}/statistics | Get projection statistics |
| GET | /projection/{name}/state | Get projection state (partition optional) |
| GET | /projection/{name}/result | Get projection result (partition optional) |
| POST | /projection/{name}/command/disable | Disable projection |
| POST | /projection/{name}/command/enable | Enable projection |
| POST | /projection/{name}/command/reset | Reset projection |
| POST | /projection/{name}/command/abort | Abort projection |
| POST | /projection/{name}/config | Update projection config |
| GET | /projection/{name}/config | Get projection config |
1.2 gRPC Endpoints
Defined across partial classes in src/KurrentDB.Projections.Core/Services/Grpc/ProjectionManagement.*.cs:
Createβ Create a new projectionUpdateβ Update projection query/configDeleteβ Delete a projectionEnableβ Enable a projectionDisableβ Disable a projectionResetβ Reset a projectionRestartSubsystemβ Restart the projection subsystemResultβ Get projection resultStatisticsβ Get projection statistics
Proto definition: src/Protos/Grpc/projections.proto. The Statistics RPC is server-streaming (stream StatisticsResp), all others are unary. The Disable RPC supports a write_checkpoint option.
The gRPC service (ProjectionManagement) publishes ProjectionManagementMessage.Command.* messages onto the internal bus, the same messages used by the HTTP controller.
1.3 Management Message Types
All defined in src/KurrentDB.Projections.Core/Messages/ProjectionManagementMessage.cs:
Command messages (requests from API):
Command.Postβ Create a new projection (with mode, name, handler type, query, emit settings)Command.PostBatchβ Create multiple projections atomicallyCommand.UpdateQueryβ Update query text and emit settingsCommand.Deleteβ Delete with options for checkpoint/state/emitted stream cleanupCommand.Enable/Command.Disable/Command.Abort/Command.ResetCommand.GetStatistics/Command.GetState/Command.GetResult/Command.GetQueryCommand.GetConfig/Command.UpdateConfig
Response messages:
Updated,Statistics,ProjectionState,ProjectionResult,ProjectionQuery,ProjectionConfigOperationFailed,NotFound,NotAuthorized,Conflict
1.4 Management Architecture and Flow
βββββββββββββββββββββββ ββββββββββββββββββββββββ
β HTTP Controller β β gRPC Service β
β ProjectionsControllerβ β ProjectionManagement β
ββββββββββ¬βββββββββββββ ββββββββββ¬ββββββββββββββ
β β
β ProjectionManagementMessage.Command.*
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β ProjectionManager β
β (Services/Management/ProjectionManager.cs) β
β β
β - Handles all Command.* messages β
β - Maintains Dictionary<string, ManagedProjection>β
β - Persists state to $projections-$master stream β
β - Routes to appropriate ManagedProjection β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β ManagedProjection β
β (Services/Management/ManagedProjection.cs) β
β β
β - State machine per projection β
β - Persists config to $projections-{name} stream β
β - Publishes CoreProjectionManagementMessage.* β
β to ProjectionCoreService via worker queues β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β ProjectionCoreService β
β (Services/Processing/ProjectionCoreService.cs) β
β β
β - Creates/manages CoreProjection instances β
β - Handles CreateAndPrepare, Start, Stop, Kill β
β - Maintains Dictionary<Guid, CoreProjection> β
βββββββββββββββββββββββββββββββββββββββββββββββββββ1.5 ManagedProjection State Machine
States defined in ManagedProjectionState enum:
Creating β Loading β Loaded β Preparing β Prepared β Starting β Running
β
Stopping β Stopped
Aborting β Aborted
β Completed
β Faulted
β Deleting
LoadingStoppedEach state has a corresponding handler class in Services/Management/ManagedProjectionStates/:
CreatingLoadingLoadedStateβ Handles initial loading from persisted statePreparingState/PreparedStateβ Projection definition compiled, waiting to startStartingStateβ Sending start command to coreRunningStateβ Actively processing eventsStoppingState/StoppedStateβ Graceful shutdownAbortingState/AbortedStateβ Forced shutdownCompletedStateβ One-time/query projection finishedFaultedStateβ Error occurredDeletingStateβ Being removed
1.6 Projection Modes
- Transient β Ad-hoc queries, ephemeral, no persistence. Can be created/controlled by normal users.
- OneTime β One-time batch projections with optional checkpoints. Stops at EOF.
- Continuous β Long-running, persistent projections with checkpoints. Requires Admin/Operations role.
1.7 Projection Configuration Parameters
Configurable via Command.UpdateConfig or at creation time:
| Parameter | Description | Default |
|---|---|---|
EmitEnabled | Allow event emission | false |
TrackEmittedStreams | Record emitted stream names for cleanup | false |
CheckpointAfterMs | Min time between checkpoints (ms) | from ProjectionConsts |
CheckpointHandledThreshold | Checkpoint after N events handled | from ProjectionConsts |
CheckpointUnhandledBytesThreshold | Checkpoint after N bytes unhandled | from ProjectionConsts |
PendingEventsThreshold | Max pending events before backpressure | from ProjectionConsts |
MaxWriteBatchLength | Max events per write batch | from ProjectionConsts |
MaxAllowedWritesInFlight | Max concurrent write operations | from ProjectionConsts |
ProjectionExecutionTimeout | JS execution timeout (ms) | from config |
1.8 Worker Queue Distribution
The ProjectionManager distributes projections across worker queues (_queues). Each worker queue runs its own ProjectionCoreService and EventReaderCoreService. The ProjectionManagerMessageDispatcher routes CoreProjectionStatusMessage.* responses back to the ProjectionManager from worker threads.
2. Event Processing Pipeline
2.1 High-Level Flow
ββββββββββββββββββββββ
β Event Store β
β (TF / Streams) β
ββββββββββ¬ββββββββββββ
β Read operations
βΌ
ββββββββββββββββββββββ
β EventReader β
β (Stream/TF/Multi/ β
β EventByType) β
ββββββββββ¬ββββββββββββ
β ReaderSubscriptionMessage
β .CommittedEventDistributed
βΌ
ββββββββββββββββββββββ
βEventReaderCoreServiceβ
β routes to β
β IReaderSubscription β
ββββββββββ¬ββββββββββββ
β
βΌ
ββββββββββββββββββββββ
β ReaderSubscription β
β (filtering, tagging,β
β checkpoint suggest)β
ββββββββββ¬ββββββββββββ
β EventReaderSubscriptionMessage
β .CommittedEventReceived
βΌ
ββββββββββββββββββββββββββββββ
β ReaderSubscriptionDispatcherβ
β (routes by subscriptionId) β
ββββββββββ¬ββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββ
βEventSubscriptionBasedProjection β
βProcessingPhase β
β(EventProcessingProjectionPhase) β
β β
β β Creates CommittedEventWorkItem β
β β Enqueues to CoreProjectionQueue β
ββββββββββββββββ¬ββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββ
β StagedProcessingQueue β
β (multi-stage work items) β
β β
β Stage 0: RecordEventOrder β
β Stage 1: GetStatePartition β
β Stage 2: Load (partition state) β
β Stage 3: ProcessEvent β
β Stage 4: WriteOutput β
ββββββββββββββββ¬ββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββ
β IProjectionStateHandler β
β (JS runtime / native handler) β
β β
β β ProcessEvent() returns: β
β - newState β
β - emittedEvents[] β
β - projectionResult β
ββββββββββββββββββββββββββββββββββββ2.2 CoreProjection State Machine
CoreProjection (Services/Processing/CoreProjection.cs) manages the lifecycle of a single running projection instance:
Initial β LoadStateRequested β StateLoaded β Subscribed β Running
β
βββββββββββββββΌβββββββββββββββ
βΌ βΌ βΌ
Stopping FaultedStopping CompletingPhase
βΌ βΌ βΌ
Stopped Faulted PhaseCompleted
β
(next phase or Stop)Key transitions:
- Initial: Initialize caches, checkpoint reader, and emitted stream trackers
- LoadStateRequested:
CheckpointReader.BeginLoadState()reads from the checkpoint stream - StateLoaded: Checkpoint loaded;
BeginPhase()initializes the processing phase - Subscribed: Reader subscription established; transitions to Running
- Running:
ProcessingPhase.ProcessEvent()is called via tick mechanism - CompletingPhase: For multi-phase projections (query then write results)
- PhaseCompleted: Moves to next phase or stops
2.3 Processing Phases
Defined in Services/Processing/Phases/:
EventProcessingProjectionProcessingPhaseβ The main phase that processes events through the state handler. HandlesCommittedEventReceivedmessages and createsCommittedEventWorkIteminstances.WriteQueryEofProjectionProcessingPhaseβ Writes final results when a query reaches EOF.WriteQueryResultProjectionProcessingPhaseβ Writes intermediate/final results for queries.
Phase state enum (PhaseState): Unknown, Stopped, Starting, Running
Subscription state enum (PhaseSubscriptionState): Unknown, Unsubscribed, Subscribing, Subscribed, Failed
2.4 Processing Strategies
ProcessingStrategySelector (Services/Processing/Strategies/ProcessingStrategySelector.cs) creates the appropriate strategy:
QueryProcessingStrategyβ For one-time/transient projections (StopOnEof = true). Creates two phases: event processing + write EOF results.ContinuousProjectionProcessingStrategyβ For continuous projections. Creates a singleEventProcessingProjectionProcessingPhasethat runs indefinitely.
Both extend EventReaderBasedProjectionProcessingStrategy β DefaultProjectionProcessingStrategy β ProjectionProcessingStrategy.
2.5 CommittedEventWorkItem Stages
The CommittedEventWorkItem (Services/Processing/WorkItems/CommittedEventWorkItem.cs) processes through these stages in the StagedProcessingQueue:
- RecordEventOrder β Records the event ordering for consistency
- GetStatePartition β Calls
StatePartitionSelector.GetStatePartition()to determine which partition this event belongs to (e.g., stream name forforeachStream) - Load β Loads the partition state from cache or reads from the event store via
BeginGetPartitionStateAt() - ProcessEvent β Calls
IProjectionStateHandler.ProcessEvent()(JS runtime) which returns new state and emitted events - WriteOutput β Calls
FinalizeEventProcessing()which writes results, accounts partitions, and emits events through theResultWriter
2.6 The Tick Mechanism
CoreProjection.EnsureTickPending() publishes a ProjectionCoreServiceMessage.CoreTick that triggers CoreProjection.Tick(). This calls ProcessingPhase.ProcessEvent() which drains the StagedProcessingQueue. The tick mechanism ensures cooperative multitasking β projections yield control between work items, preventing any single projection from starving others.
2.7 Backpressure
The CoreProjectionQueue implements backpressure between readers and processing:
- When buffered events exceed
PendingEventsThreshold, aReaderSubscriptionManagement.Pausemessage is published - The
EventReaderCoreServicepauses the reader (or forks a new paused reader if currently on the heading reader) - When the queue drains below the threshold,
ReaderSubscriptionManagement.Resumerestarts reading - This prevents unbounded memory growth when processing is slower than reading
2.8 Partition State Management
The PartitionStateCache provides in-memory caching of partition states with locking:
- Locking: States are locked from the first event processed in a partition until the corresponding checkpoint completes. This prevents eviction of uncommitted state.
- Unlock:
CheckpointCompletedtriggersPartitionStateCache.Unlock(tag), allowing eviction of states older than the checkpoint position - Root partition: Projections with
RequiresRootPartition = truemaintain a root state (key"") that is serialized with each checkpoint and available to all partitions - Bi-state projections: Support both per-partition state and shared state via
IProjectionStateHandler.LoadShared()/InitializeShared()
2.9 Error Handling and Restart
Failure sources:
- Reader failures:
EventReaderSubscriptionMessage.Failedβ propagated to phase βCoreProjection.SetFaulted() - Handler exceptions: Caught in
SafeProcessEventByHandler()βSetFaulting()with detailed error message - Checkpoint write conflicts:
WrongExpectedVersionβCoreProjectionProcessingMessage.RestartRequested
Restart sequence (on RestartRequested):
EnsureUnsubscribed()β Tears down reader subscriptionGoToState(Initial)β Reinitializes caches and trackersStart()β Reloads checkpoint and resumes from last saved position
Faulted vs FaultedStopping: SetFaulting() transitions to FaultedStopping (waits for checkpoint) then Faulted. SetFaulted() goes directly to Faulted.
3. Readers and Event Retrieval
3.1 EventReaderCoreService
EventReaderCoreService (Services/Processing/EventReaderCoreService.cs) manages the lifecycle of all event readers for a worker thread:
- Maintains
Dictionary<Guid, IReaderSubscription>(subscriptions) - Maintains
Dictionary<Guid, IEventReader>(readers) - Maps subscriptions β readers via
_subscriptionEventReaders/_eventReaderSubscriptions - Optionally runs a
HeadingEventReaderfor efficient live-tail reading
Key message handlers:
ReaderSubscriptionManagement.Subscribeβ Creates a subscription + reader pairReaderSubscriptionManagement.Unsubscribeβ Disposes reader, removes subscriptionReaderSubscriptionManagement.Pause/Resumeβ Pauses/resumes readingReaderSubscriptionMessage.CommittedEventDistributedβ Routes events from readers to subscriptionsReaderSubscriptionMessage.EventReaderEofβ Handles end-of-streamReaderSubscriptionMessage.EventReaderNotAuthorizedβ Handles auth failures
3.2 Reader Types
All readers implement IEventReader and are created by ReaderStrategy.CreatePausedEventReader():
TransactionFileEventReader
Source: Services/Processing/TransactionFile/TransactionFileEventReader.cs
- Reads from the transaction file (all events,
$allstream) - Used when
fromAll()is specified with no event type filter - Reads by TF position (commit/prepare position)
- Creates
ClientMessage.ReadAllEventsForwardinternal messages - Batch size: 250 events per read
StreamEventReader
Source: Services/Processing/SingleStream/StreamEventReader.cs
- Reads from a single named stream
- Used for
fromStream('name')or single-categoryfromCategory('name')(reads$ce-{name}) - Reads by stream sequence number
- Creates
ClientMessage.ReadStreamEventsForwardinternal messages - Batch size: 111 events per read
- Handles stream deletion, not-found, and
$maxAge/$maxCounttrimming - Produces
EventReaderPartitionDeletednotifications when configured
MultiStreamEventReader
Source: Services/Processing/MultiStream/MultiStreamEventReader.cs
- Reads from multiple named streams simultaneously
- Used for
fromStreams(['a', 'b', 'c']) - Maintains per-stream buffers for event queuing
- Reads each stream independently (111 events per batch per stream)
- Ordering logic: processes events by lowest position across all streams
- Waits for EOFs on all streams before proceeding with delay to avoid tight loops
EventByTypeIndexEventReader
Source: Services/Processing/EventByType/EventByTypeIndexEventReader.cs
- Reads from event type index streams (
$et-{eventType}) - Used for
fromAll().when({EventType: ...})with specific event type filters - Has two modes: IndexBased (reads from
$et-streams) and TfBased (falls back to TF when index is behind) - Resolves link events to get original event data
- Batch size: 50 events per read
- Handles
$deletedevent type for stream deletion notifications
HeadingEventReader
Source: Services/Processing/TransactionFile/HeadingEventReader.cs
- Caches recent events in memory (configurable
eventCacheSize) - Shared across subscriptions for efficient live-tail
- Subscriptions can be "joined" to the heading reader when they catch up
- Distributes cached events to new subscriptions for fast catch-up
- Validates strict event ordering
3.3 ReaderStrategy β Reader Selection Logic
ReaderStrategy.Create() in Services/Processing/Strategies/ReaderStrategy.cs selects the reader type based on the projection source definition:
if (allStreams && specific eventTypes) β EventByTypeIndexEventReader
if (allStreams) β TransactionFileEventReader
if (single stream) β StreamEventReader
if (single category) β StreamEventReader (on $ce-{category})
if (multiple streams) β MultiStreamEventReader3.4 Event Filters and Position Taggers
Each reader strategy creates matching filter and tagger pairs:
| Source | EventFilter | PositionTagger |
|---|---|---|
fromAll() + event types | EventByTypeIndexEventFilter | EventByTypeIndexPositionTagger |
fromAll() | TransactionFileEventFilter | TransactionFilePositionTagger |
fromStream(s) | StreamEventFilter | StreamPositionTagger |
fromCategory(c) | CategoryEventFilter | StreamPositionTagger (on $ce-c) |
fromStreams([...]) | MultiStreamEventFilter | MultiStreamPositionTagger |
EventFilter decides which events pass through to the projection (by event type, stream, link resolution).
PositionTagger creates CheckpointTag values that track the reader's position for checkpointing and resumption.
CheckpointTag modes (determined by tagger type):
Positionβ TF commit/prepare position (for$allreading)Streamβ Single stream sequence numberMultiStreamβ Per-stream sequence number dictionaryEventTypeIndexβ TF position + per-event-type stream sequence numbersPreparePositionβ Prepare position only (for event reordering with lag)Phaseβ Multi-phase projection phase number
3.5 ReaderSubscription
ReaderSubscription (Services/Processing/Subscriptions/ReaderSubscription.cs) sits between the reader and the projection phase:
- Receives
CommittedEventDistributedfrom the reader - Applies event filter (
PassesSource,Passes) - Validates content type (optional JSON validation)
- Updates position tracker with checkpoint tags
- Converts to
EventReaderSubscriptionMessage.CommittedEventReceived - Suggests checkpoints based on thresholds (bytes, event count, time)
The EventReorderingReaderSubscription variant adds event reordering with a configurable processing lag for multi-stream sources.
3.6 Subscription Flow
EventReader EventReaderCoreService ReaderSubscription
β β β
βββCommittedEventDistributedβββββββΊβ β
β βββCommittedEventDistributedβββΊβ
β β βββfilter + tagβββ
β β ββββββββββββββββββ
β β β
β β CommittedEventReceived β
β ββββββββββββββββββββββββββββββββ
β β β
β (via ReaderSubscriptionDispatcher) β
β β β
β βΌ β
β EventProcessingProjection β
β ProcessingPhase β4. How Projections Produce New Events
4.1 Emission Entry Points
Projections produce events through two mechanisms:
emit()/linkTo()/linkStreamTo()/copyTo()β Called from JavaScript projection handlers, producingEmittedEventEnvelope[]returned fromIProjectionStateHandler.ProcessEvent()- Result updates β When projection state changes, the
ResultWriteremitsResultorResultRemovedevents to result streams
4.1.1 JavaScript Runtime (Jint)
The JintProjectionStateHandler (Services/Interpreted/JintProjectionStateHandler.cs) executes user-defined JavaScript projections using the Jint engine. It registers global functions available to projection code:
// Available in projection JavaScript:
emit(streamId, eventType, eventBody, metadata?) // β EmittedDataEvent
linkTo(streamId, event, metadata?) // β EmittedDataEvent with $> type
linkStreamTo(streamId, linkedStreamId, metadata?) // β EmittedDataEvent linking streams
copyTo(streamId, event, metadata?) // β EmittedDataEvent copying eventemit()creates anEmittedDataEventwith the provided data and adds it to the_emittedlistlinkTo()creates anEmittedDataEventwith type$>and data"{sequenceNumber}@{streamId}"(the standard link format). Uses a two-phase callback: the data event'sOnCommittedsets the link's target event number- All emitted events are collected during
ProcessEvent()and returned asEmittedEventEnvelope[]
The handler also supports ProcessPartitionCreated() which can emit events when a new partition is first seen.
4.2 Emitted Event Types
Defined in Services/Processing/Emitting/EmittedEvents/:
EmittedDataEventβ A regular data event written to a target stream (fromemit())EmittedLinkToβ A$>link event pointing to another event (fromlinkTo())EmittedLinkToWithRecategorizationβ A link with category rewriting (used by$by_category)EmittedEventEnvelopeβ Wraps anEmittedEventwith stream metadata
Each emitted event carries:
- Target
StreamId CausedByTag(CheckpointTag of the source event)- Event type, data, metadata
- Optional
SetTargetEventNumbercallback for cross-referencing
4.3 Emission Pipeline
IProjectionStateHandler.ProcessEvent()
β
β returns EmittedEventEnvelope[]
βΌ
CommittedEventWorkItem.WriteOutput()
β
β calls FinalizeEventProcessing()
βΌ
ββββββββββββββββββββββββββββββββββββββββ
β ResultWriter β
β (Services/Processing/Strategies/ β
β ResultWriter.cs) β
β β
β WriteRunningResult() βββ if state β
β β changed, emits Result events β
β β β
β EventsEmitted() βββ user emit() β
β β calls, forwarded to checkpoint β
β β β
β AccountPartition() βββ registers β
β new partitions in catalog stream β
βββββββββββββββββ¬βββββββββββββββββββββββ
β
β calls IEmittedEventWriter.EventsEmitted()
βΌ
ββββββββββββββββββββββββββββββββββββββββ
β ICoreProjectionCheckpointManager β
β (ProjectionCheckpoint as β
β IEmittedEventWriter) β
β β
β ValidateOrderAndEmitEvents() β
β β β
β Groups events by target stream β
β Creates EmittedStream per target β
βββββββββββββββββ¬βββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββββββ
β EmittedStream β
β (per target stream instance) β
β β
β - Validates event ordering β
β - Batches writes (MaxWriteBatchLength)β
β - Manages expected version β
β - Handles write retries β
β - Tracks emitted stream names β
βββββββββββββββββ¬βββββββββββββββββββββββ
β
β IODispatcher.WriteEvents()
βΌ
ββββββββββββββββββββββββββββββββββββββββ
β EmittedStreamsWriter β
β (IEmittedStreamsWriter) β
β β
β WriteEvents(streamId, expectedVersionβ
β events[], writeAs, callback) β
β β
β β Publishes ClientMessage.WriteEventsβ
β to the internal bus β
ββββββββββββββββββββββββββββββββββββββββ4.4 Write Concurrency Control
The ProjectionCheckpoint manages write concurrency:
_maximumAllowedWritesInFlightβ Configurable limit on concurrent writes (default fromProjectionConsts.MaxAllowedWritesInFlight)- Write queue IDs β Each
EmittedStreamis assigned to a write queue via round-robin (_emittedStreams.Count % _maximumAllowedWritesInFlight) QueuedEmittedStreamsWriterβ WrapsEmittedStreamsWriterwith a queue that limits in-flight writes per queue IDAllowedWritesInFlight.Unboundedβ Special value that disables queuing
4.5 ResultEventEmitter
ResultEventEmitter (Services/Processing/Emitting/ResultEventEmitter.cs) creates events for projection results:
- Root partition (no partition key): Emits a single
ResultorResultRemovedevent to$projections-{name}-result - Named partition: Emits a
Result/ResultRemovedevent to$projections-{name}-{partition}-resultAND a$>link from$projections-{name}-resultto the partition result stream
4.6 Checkpoint and Emit Coordination
The checkpoint mechanism ensures exactly-once emission semantics:
- Events are emitted to
EmittedStreaminstances within aProjectionCheckpoint - Checkpoint is requested when thresholds are met (bytes, events, time)
ProjectionCheckpoint.Prepare(position)tells allEmittedStreaminstances to flush- Each
EmittedStreamcallsCheckpoint()β flushes pending writes β signals completion - Once all streams complete,
OnCheckpointCompleted()notifies theCheckpointManager CoreProjectionCheckpointWriterwrites the checkpoint tag + state to$projections-{name}-checkpoint- On restart, the projection resumes from the last checkpoint, and
EmittedStreamuses expected versions to deduplicate any re-emitted events
Checkpoint loading (CoreProjectionCheckpointReader): Reads the last 10 events backward from the checkpoint stream, finds the first $ProjectionCheckpoint event, extracts the CheckpointTag from metadata and state data from the event body, validates epoch/version compatibility, then publishes CheckpointLoaded.
Checkpoint writing (CoreProjectionCheckpointWriter): Writes a $ProjectionCheckpoint event to the checkpoint stream with the serialized CheckpointTag as metadata and root partition state as data. Retries with exponential backoff (up to 12 attempts). Warns if checkpoint exceeds 8 MB.
4.6.1 EmittedStream Recovery Mode
When a projection restarts, EmittedStream enters recovery mode to ensure exactly-once semantics:
- Reads existing events backward from the target stream
- Compares already-committed events with pending events using
CausedByTagand event type - For matching events: fires
OnCommittedcallbacks and dequeues (event already written) - For link events pointing to non-existent sources: skips silently (idempotent)
- For mismatches: throws
InvalidEmittedEventSequenceException - Only truly new events are written after recovery completes
Each emitted event carries metadata:
{
"$causedBy": "<source-event-guid>",
"$correlationId": "<correlation-id>",
"checkpoint_tag": { ... },
"projection_version": { ... }
}Write retries use exponential backoff (up to 12 attempts, max 256 seconds).
4.7 EmittedStreamsTracker
EmittedStreamsTracker (Services/Processing/Emitting/EmittedStreamsTracker.cs) maintains a record of all streams a projection has written to. This is stored in a tracking stream ($projections-{name}-emittedstreams) and is used during projection deletion to clean up emitted streams when deleteEmittedStreams = true.
4.8 System Projections β Native Emitters
The built-in system projections use native handlers (not JavaScript) that produce events via the same pipeline:
$by_categoryβ Reads$streamsand emits$>links to$ce-{category}streams$by_event_typeβ Reads$alland emits$>links to$et-{eventType}streams$stream_by_categoryβ Emits links grouping streams by category$streamsβ Emits a record for each new stream
Appendix: Key File Locations
| Component | Path |
|---|---|
| HTTP endpoints | Services/Http/ProjectionsController.cs |
| gRPC endpoints | Services/Grpc/ProjectionManagement.*.cs |
| Management messages | Messages/ProjectionManagementMessage.cs |
| ProjectionManager | Services/Management/ProjectionManager.cs |
| ManagedProjection | Services/Management/ManagedProjection.cs |
| ManagedProjection states | Services/Management/ManagedProjectionStates/ |
| ProjectionCoreService | Services/Processing/ProjectionCoreService.cs |
| CoreProjection | Services/Processing/CoreProjection.cs |
| EventReaderCoreService | Services/Processing/EventReaderCoreService.cs |
| ReaderStrategy | Services/Processing/Strategies/ReaderStrategy.cs |
| ProcessingStrategySelector | Services/Processing/Strategies/ProcessingStrategySelector.cs |
| Processing phases | Services/Processing/Phases/ |
| Reader subscriptions | Services/Processing/Subscriptions/ |
| Event readers | Services/Processing/SingleStream/, MultiStream/, EventByType/, TransactionFile/ |
| Emitting pipeline | Services/Processing/Emitting/ |
| Emitted event types | Services/Processing/Emitting/EmittedEvents/ |
| Checkpoint management | Services/Processing/Checkpointing/ |
| Work items | Services/Processing/WorkItems/ |
| ResultWriter | Services/Processing/Strategies/ResultWriter.cs |
| StagedProcessingQueue | Services/Processing/StagedProcessingQueue.cs |
| ProjectionManagerNode (wiring) | ProjectionManagerNode.cs |
All paths relative to src/KurrentDB.Projections.Core/.