Class OutboxStore

java.lang.Object
org.keycloak.events.outbox.OutboxStore

public class OutboxStore extends Object
DAO over OutboxEntryEntity. Every method takes the row's entryKind so the same store instance can serve multiple subsystems sharing the underlying OUTBOX_ENTRY table — the compound indexes on (ENTRY_KIND, ...) keep cross-kind traffic from interfering with each other's hot paths.

Read patterns (drainer, admin stats, retention purges) and write patterns (enqueue, transition, bulk delete) are split here so the runtime drainer / cleanup tasks compose primitives rather than inlining queries.

  • Field Details

    • MAX_LAST_ERROR_LENGTH

      public static final int MAX_LAST_ERROR_LENGTH
      Hard cap on the last_error column width — matches the VARCHAR(2048) defined in the changelog. Truncation is applied here so callers can pass arbitrarily long exception messages without worrying about persistence-layer rejection.
      See Also:
    • session

      protected final KeycloakSession session
  • Constructor Details

  • Method Details

    • getEntityManager

      protected jakarta.persistence.EntityManager getEntityManager()
    • enqueuePending

      public String enqueuePending(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata)
      Inserts a fresh PENDING row, deduplicating on (entryKind, ownerId, correlationId). Returns the id of the persisted (or pre-existing) row so the caller can correlate across at-least-once enqueue paths.
    • enqueueHeld

      public String enqueueHeld(String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata)
      Inserts a fresh HELD row — used when the upstream channel is in a paused state at enqueue time (e.g. SSF stream paused) and the row should not be drained until releaseHeldForOwner(java.lang.String, java.lang.String) is called. Same dedup contract as enqueuePending(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String).
    • enqueueInStatus

      protected String enqueueInStatus(OutboxEntryStatus status, String entryKind, String realmId, String ownerId, String containerId, String correlationId, String entryType, String payload, String metadata)
    • generateEntryId

      protected String generateEntryId()
    • findById

      public OutboxEntryEntity findById(String id)
    • findByOwnerAndCorrelationId

      public OutboxEntryEntity findByOwnerAndCorrelationId(String entryKind, String ownerId, String correlationId)
    • lockDueForDrain

      public List<OutboxEntryEntity> lockDueForDrain(String entryKind, int limit)
      Locks up to limit due PENDING rows for delivery in the current transaction. Uses FOR UPDATE SKIP LOCKED so cluster-aware drainers don't fight for the same rows.
    • markDelivered

      public void markDelivered(OutboxEntryEntity entity)
    • recordFailure

      public void recordFailure(OutboxEntryEntity entity, Instant nextAttemptAt, String lastError)
    • markDeadLetter

      public void markDeadLetter(OutboxEntryEntity entity, String lastError)
    • promoteStaleQueuedToDeadLetter

      public int promoteStaleQueuedToDeadLetter(String entryKind, Instant cutoff, String reason)
      Bulk-promotes every queued row in the given kind whose createdAt is older than the supplied cutoff to DEAD_LETTER. Used by the drainer as a backstop so rows that get stuck in PENDING/HELD eventually graduate to a terminal state and are caught by the dead-letter retention purge.

      Does not bump attempts: these rows didn't actually retry, they aged out. The last_error captures the reason.

    • countStatusesForRealm

      public Map<OutboxEntryStatus,Long> countStatusesForRealm(String entryKind, String realmId)
    • countStatusesForOwner

      public Map<OutboxEntryStatus,Long> countStatusesForOwner(String entryKind, String ownerId)
    • oldestCreatedAtPerStatusForRealm

      public Map<OutboxEntryStatus,Instant> oldestCreatedAtPerStatusForRealm(String entryKind, String realmId)
    • oldestCreatedAtPerStatusForOwner

      public Map<OutboxEntryStatus,Instant> oldestCreatedAtPerStatusForOwner(String entryKind, String ownerId)
    • lockPendingForOwner

      public List<OutboxEntryEntity> lockPendingForOwner(String entryKind, String ownerId, int limit)
      Locks up to limit PENDING rows for a receiver-driven read (e.g. SSF POLL). Uses FOR UPDATE SKIP LOCKED so a concurrent receiver request to the same owner doesn't block. Unlike lockDueForDrain(String, int) this does not gate on next_attempt_at — receiver-pulled rows are served on demand regardless of any backoff schedule.
    • countForOwnerByStatus

      public long countForOwnerByStatus(String entryKind, String ownerId, OutboxEntryStatus status)
      Counts an owner's rows in a given status. Used by receiver-driven read paths to decide whether to advertise more available items after returning a short batch.
    • ackPendingForOwner

      public Set<String> ackPendingForOwner(String entryKind, String ownerId, Collection<String> correlationIds)
      Receiver-driven ACK for the supplied correlation ids. Matching PENDING rows owned by the given owner transition to DELIVERED. Idempotent and silently scoped: ids the receiver doesn't own (different owner) and ids already terminal don't appear in the lookup result, so no error and no leakage of row existence.
      Returns:
      the set of correlation ids that were transitioned to DELIVERED.
    • nackPendingForOwner

      public Set<String> nackPendingForOwner(String entryKind, String ownerId, Map<String,String> reasonByCorrelationId)
      Receiver-driven NACK. Matching PENDING rows owned by the given owner transition to DEAD_LETTER carrying the receiver-supplied reason. For receiver-pulled flows, DEAD_LETTER is reached only via this explicit NACK path (no transmitter-side retry-exhaustion counter to bump). Idempotent and silently scoped, like ackPendingForOwner(java.lang.String, java.lang.String, java.util.Collection<java.lang.String>).
      Returns:
      the set of correlation ids that were transitioned to DEAD_LETTER.
    • releaseHeldForOwner

      public int releaseHeldForOwner(String entryKind, String ownerId)
      Bulk-transitions every HELD row for the owner back to PENDING with next_attempt_at = now so the drainer picks them up on its next tick. Symmetric to holdPendingForOwner(java.lang.String, java.lang.String).
      Returns:
      the number of rows that transitioned out of HELD.
    • holdPendingForOwner

      public int holdPendingForOwner(String entryKind, String ownerId)
      Bulk-transitions every PENDING row for the owner to HELD — "park" the queue when the upstream channel pauses (e.g. SSF stream paused / disabled).
      Returns:
      the number of rows that transitioned PENDING → HELD.
    • deadLetterQueuedForOwner

      public int deadLetterQueuedForOwner(String entryKind, String ownerId, String reason)
      Dead-letters every queued (PENDING + HELD) row for the owner with the supplied reason. Used when the upstream forbids holding (e.g. SSF stream disabled) and the rows must be discarded rather than parked.
    • deadLetterQueuedForOwnerNotMatchingTypes

      public int deadLetterQueuedForOwnerNotMatchingTypes(String entryKind, String ownerId, Collection<String> allowedTypes, String reason)
      Dead-letters queued rows for the owner whose entryType is not in allowedTypes. Used when the upstream narrows its accepted-type set (e.g. SSF receiver narrowing events_requested) so already-signed rows of dropped types stop being delivered without losing the audit trail.

      If allowedTypes is empty, this method falls back to deadLetterQueuedForOwner(java.lang.String, java.lang.String, java.lang.String) since SQL NOT IN () is implementation-defined.

    • migrateEntryKindForOwner

      public int migrateEntryKindForOwner(String currentKind, String newKind, String ownerId)
      Migrates queued rows for the owner from one entryKind to another (e.g. SSF receiver flipping push ↔ poll). Terminal rows (DELIVERED, DEAD_LETTER) are left under the previous kind — they are audit / dedup artifacts of the old channel.
      Returns:
      the number of rows whose entryKind was migrated.
    • deleteByRealm

      public int deleteByRealm(String entryKind, String realmId)
    • deleteByOwner

      public int deleteByOwner(String entryKind, String ownerId)
    • deleteByRealmAndStatus

      public int deleteByRealmAndStatus(String entryKind, String realmId, OutboxEntryStatus status)
    • deleteByRealmAndStatusOlderThan

      public int deleteByRealmAndStatusOlderThan(String entryKind, String realmId, OutboxEntryStatus status, Instant cutoff)
    • deleteByOwnerAndStatus

      public int deleteByOwnerAndStatus(String entryKind, String ownerId, OutboxEntryStatus status)
    • deleteByOwnerAndStatusOlderThan

      public int deleteByOwnerAndStatusOlderThan(String entryKind, String ownerId, OutboxEntryStatus status, Instant cutoff)
    • deleteQueuedByRealm

      public int deleteQueuedByRealm(String entryKind, String realmId)
      Bulk-deletes every queued row in the realm. Single-DML counterpart used by the realm-scoped "purge queued" admin endpoint.
    • deleteQueuedByOwner

      public int deleteQueuedByOwner(String entryKind, String ownerId)
    • purgeDeliveredOlderThan

      public int purgeDeliveredOlderThan(String entryKind, Instant cutoff)
    • purgeDeadLetterOlderThan

      public int purgeDeadLetterOlderThan(String entryKind, Instant cutoff)
    • truncateError

      protected String truncateError(String error)
      Truncates an error message to the column width with an ellipsis marker. Returns null unchanged so an explicit "no error" value (used by markDelivered) survives.