Package org.keycloak.events.outbox
Class OutboxStore
java.lang.Object
org.keycloak.events.outbox.OutboxStore
DAO over
OutboxEntryEntity. Every method takes the row's
entryKind so the same store instance can serve multiple
subsystems sharing the underlying OUTBOX_ENTRY table — the
compound indexes on (ENTRY_KIND, ...) keep cross-kind
traffic from interfering with each other's hot paths.
Read patterns (drainer, admin stats, retention purges) and write patterns (enqueue, transition, bulk delete) are split here so the runtime drainer / cleanup tasks compose primitives rather than inlining queries.
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intHard cap on thelast_errorcolumn width — matches the VARCHAR(2048) defined in the changelog.protected final KeycloakSession -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionackPendingForOwner(String entryKind, String ownerId, Collection<String> correlationIds) Receiver-driven ACK for the supplied correlation ids.longcountForOwnerByStatus(String entryKind, String ownerId, OutboxEntryStatus status) Counts an owner's rows in a given status.countStatusesForOwner(String entryKind, String ownerId) countStatusesForRealm(String entryKind, String realmId) intdeadLetterQueuedForOwner(String entryKind, String ownerId, String reason) Dead-letters every queued (PENDING + HELD) row for the owner with the supplied reason.intdeadLetterQueuedForOwnerNotMatchingTypes(String entryKind, String ownerId, Collection<String> allowedTypes, String reason) Dead-letters queued rows for the owner whoseentryTypeis not inallowedTypes.intdeleteByOwner(String entryKind, String ownerId) intdeleteByOwnerAndStatus(String entryKind, String ownerId, OutboxEntryStatus status) intdeleteByOwnerAndStatusOlderThan(String entryKind, String ownerId, OutboxEntryStatus status, Instant cutoff) intdeleteByRealm(String entryKind, String realmId) intdeleteByRealmAndStatus(String entryKind, String realmId, OutboxEntryStatus status) intdeleteByRealmAndStatusOlderThan(String entryKind, String realmId, OutboxEntryStatus status, Instant cutoff) intdeleteQueuedByOwner(String entryKind, String ownerId) intdeleteQueuedByRealm(String entryKind, String realmId) Bulk-deletes everyqueuedrow in the realm.enqueueHeld(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata) Inserts a fresh HELD row — used when the upstream channel is in a paused state at enqueue time (e.g.protected StringenqueueInStatus(OutboxEntryStatus status, String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata) enqueuePending(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata) Inserts a fresh PENDING row, deduplicating on(entryKind, ownerId, correlationId).findByOwnerAndCorrelationId(String entryKind, String ownerId, String correlationId) protected Stringprotected jakarta.persistence.EntityManagerintholdPendingForOwner(String entryKind, String ownerId) Bulk-transitions every PENDING row for the owner to HELD — "park" the queue when the upstream channel pauses (e.g.lockDueForDrain(String entryKind, int limit) Locks up tolimitdue PENDING rows for delivery in the current transaction.lockPendingForOwner(String entryKind, String ownerId, int limit) Locks up tolimitPENDING rows for a receiver-driven read (e.g.voidmarkDeadLetter(OutboxEntryEntity entity, String lastError) voidmarkDelivered(OutboxEntryEntity entity) intmigrateEntryKindForOwner(String currentKind, String newKind, String ownerId) Migrates queued rows for the owner from one entryKind to another (e.g.Receiver-driven NACK.oldestCreatedAtPerStatusForOwner(String entryKind, String ownerId) oldestCreatedAtPerStatusForRealm(String entryKind, String realmId) intpromoteStaleQueuedToDeadLetter(String entryKind, Instant cutoff, String reason) Bulk-promotes everyqueuedrow in the given kind whosecreatedAtis older than the supplied cutoff toDEAD_LETTER.intpurgeDeadLetterOlderThan(String entryKind, Instant cutoff) intpurgeDeliveredOlderThan(String entryKind, Instant cutoff) voidrecordFailure(OutboxEntryEntity entity, Instant nextAttemptAt, String lastError) intreleaseHeldForOwner(String entryKind, String ownerId) protected StringtruncateError(String error) Truncates an error message to the column width with an ellipsis marker.
-
Field Details
-
MAX_LAST_ERROR_LENGTH
public static final int MAX_LAST_ERROR_LENGTHHard cap on thelast_errorcolumn width — matches the VARCHAR(2048) defined in the changelog. Truncation is applied here so callers can pass arbitrarily long exception messages without worrying about persistence-layer rejection.- See Also:
-
session
-
-
Constructor Details
-
OutboxStore
-
-
Method Details
-
getEntityManager
protected jakarta.persistence.EntityManager getEntityManager() -
enqueuePending
public String enqueuePending(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata) Inserts a fresh PENDING row, deduplicating on(entryKind, ownerId, correlationId). Returns the id of the persisted (or pre-existing) row so the caller can correlate across at-least-once enqueue paths. -
enqueueHeld
public String enqueueHeld(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata) Inserts a fresh HELD row — used when the upstream channel is in a paused state at enqueue time (e.g. SSF stream paused) and the row should not be drained untilreleaseHeldForOwner(java.lang.String, java.lang.String)is called. Same dedup contract asenqueuePending(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String). -
enqueueInStatus
-
generateEntryId
-
findById
-
findByOwnerAndCorrelationId
public OutboxEntryEntity findByOwnerAndCorrelationId(String entryKind, String ownerId, String correlationId) -
lockDueForDrain
Locks up tolimitdue PENDING rows for delivery in the current transaction. UsesFOR UPDATE SKIP LOCKEDso cluster-aware drainers don't fight for the same rows. -
markDelivered
-
recordFailure
-
markDeadLetter
-
promoteStaleQueuedToDeadLetter
Bulk-promotes everyqueuedrow in the given kind whosecreatedAtis older than the supplied cutoff toDEAD_LETTER. Used by the drainer as a backstop so rows that get stuck in PENDING/HELD eventually graduate to a terminal state and are caught by the dead-letter retention purge.Does not bump
attempts: these rows didn't actually retry, they aged out. Thelast_errorcaptures the reason. -
countStatusesForRealm
-
countStatusesForOwner
-
oldestCreatedAtPerStatusForRealm
public Map<OutboxEntryStatus,Instant> oldestCreatedAtPerStatusForRealm(String entryKind, String realmId) -
oldestCreatedAtPerStatusForOwner
public Map<OutboxEntryStatus,Instant> oldestCreatedAtPerStatusForOwner(String entryKind, String ownerId) -
lockPendingForOwner
Locks up tolimitPENDING rows for a receiver-driven read (e.g. SSF POLL). UsesFOR UPDATE SKIP LOCKEDso a concurrent receiver request to the same owner doesn't block. UnlikelockDueForDrain(String, int)this does not gate onnext_attempt_at— receiver-pulled rows are served on demand regardless of any backoff schedule. -
countForOwnerByStatus
Counts an owner's rows in a given status. Used by receiver-driven read paths to decide whether to advertise more available items after returning a short batch. -
ackPendingForOwner
public Set<String> ackPendingForOwner(String entryKind, String ownerId, Collection<String> correlationIds) Receiver-driven ACK for the supplied correlation ids. Matching PENDING rows owned by the given owner transition to DELIVERED. Idempotent and silently scoped: ids the receiver doesn't own (different owner) and ids already terminal don't appear in the lookup result, so no error and no leakage of row existence.- Returns:
- the set of correlation ids that were transitioned to DELIVERED.
-
nackPendingForOwner
public Set<String> nackPendingForOwner(String entryKind, String ownerId, Map<String, String> reasonByCorrelationId) Receiver-driven NACK. Matching PENDING rows owned by the given owner transition to DEAD_LETTER carrying the receiver-supplied reason. For receiver-pulled flows, DEAD_LETTER is reached only via this explicit NACK path (no transmitter-side retry-exhaustion counter to bump). Idempotent and silently scoped, likeackPendingForOwner(java.lang.String, java.lang.String, java.util.Collection<java.lang.String>).- Returns:
- the set of correlation ids that were transitioned to DEAD_LETTER.
-
releaseHeldForOwner
Bulk-transitions everyHELDrow for the owner back toPENDINGwithnext_attempt_at = nowso the drainer picks them up on its next tick. Symmetric toholdPendingForOwner(java.lang.String, java.lang.String).- Returns:
- the number of rows that transitioned out of HELD.
-
holdPendingForOwner
Bulk-transitions every PENDING row for the owner to HELD — "park" the queue when the upstream channel pauses (e.g. SSF stream paused / disabled).- Returns:
- the number of rows that transitioned PENDING → HELD.
-
deadLetterQueuedForOwner
Dead-letters every queued (PENDING + HELD) row for the owner with the supplied reason. Used when the upstream forbids holding (e.g. SSF stream disabled) and the rows must be discarded rather than parked. -
deadLetterQueuedForOwnerNotMatchingTypes
public int deadLetterQueuedForOwnerNotMatchingTypes(String entryKind, String ownerId, Collection<String> allowedTypes, String reason) Dead-letters queued rows for the owner whoseentryTypeis not inallowedTypes. Used when the upstream narrows its accepted-type set (e.g. SSF receiver narrowingevents_requested) so already-signed rows of dropped types stop being delivered without losing the audit trail.If
allowedTypesis empty, this method falls back todeadLetterQueuedForOwner(java.lang.String, java.lang.String, java.lang.String)since SQLNOT IN ()is implementation-defined. -
migrateEntryKindForOwner
Migrates queued rows for the owner from one entryKind to another (e.g. SSF receiver flipping push ↔ poll). Terminal rows (DELIVERED, DEAD_LETTER) are left under the previous kind — they are audit / dedup artifacts of the old channel.- Returns:
- the number of rows whose entryKind was migrated.
-
deleteByRealm
-
deleteByOwner
-
deleteByRealmAndStatus
-
deleteByRealmAndStatusOlderThan
public int deleteByRealmAndStatusOlderThan(String entryKind, String realmId, OutboxEntryStatus status, Instant cutoff) -
deleteByOwnerAndStatus
-
deleteByOwnerAndStatusOlderThan
public int deleteByOwnerAndStatusOlderThan(String entryKind, String ownerId, OutboxEntryStatus status, Instant cutoff) -
deleteQueuedByRealm
Bulk-deletes everyqueuedrow in the realm. Single-DML counterpart used by the realm-scoped "purge queued" admin endpoint. -
deleteQueuedByOwner
-
purgeDeliveredOlderThan
-
purgeDeadLetterOlderThan
-
truncateError
Truncates an error message to the column width with an ellipsis marker. Returnsnullunchanged so an explicit "no error" value (used bymarkDelivered) survives.
-