Skip to main content

Error Handling

NatsError is a sealed ADT - every error zio-nats can produce is one of the variants below. All operations return IO[NatsError, A], so the compiler knows exactly which errors are possible at each call site.

Error variants

VariantFieldsWhen it occurs
ConnectionFailedmessage: String, cause: IOExceptionInitial TCP connection could not be established
ConnectionClosedmessage: StringOperation attempted on a closed or lost connection
AuthenticationFailedmessage: String, cause: IOExceptionServer rejected the credentials
Timeoutmessage: StringRequest-reply or flush exceeded its timeout
PublishFailedmessage: String, cause: ThrowableCore NATS publish rejected by the server
RequestFailedmessage: String, cause: ThrowableRequest-reply call failed
SubscriptionFailedmessage: String, cause: ThrowableSubscribe call rejected by the server
SerializationErrormessage: String, cause: ThrowableNatsCodec[A].encode failed
DecodingErrormessage: String, cause: ThrowableNatsCodec[A].decode failed on an incoming message
JetStreamApiErrormessage: String, errorCode: Int, apiErrorCode: Int, cause: JetStreamApiExceptionServer returned a JetStream API error
JetStreamPublishFailedmessage: String, cause: ThrowableServer rejected a JetStream publish
JetStreamConsumeFailedmessage: String, cause: ThrowableConsumer stream terminated unexpectedly
KeyValueOperationFailedmessage: String, cause: ThrowableKV operation failed (server error or connection issue)
KeyNotFoundkey: StringA KV operation required a key that does not exist
ObjectStoreOperationFailedmessage: String, cause: ThrowableObject Store operation failed
ObjectNotFoundname: StringObject Store get for a name that does not exist
ObjectAlreadyExistsname: StringObject Store put for a name that is already sealed
ServiceOperationFailedmessage: String, cause: ThrowableService framework runtime operation failed
ServiceStartFailedmessage: String, cause: ThrowableService framework failed to start
ServiceCallFailedmessage: String, code: IntRemote service handler responded with an error
GeneralErrormessage: String, cause: ThrowableCatch-all for unexpected jnats exceptions

All variants except KeyNotFound, ObjectNotFound, ObjectAlreadyExists, and ServiceCallFailed carry both a message: String and a cause: Throwable. ServiceCallFailed carries message and code: Int (the numeric error code from the remote service). Every variant is a case class, so they can be pattern-matched exhaustively.

Sub-sealed traits

Group errors by domain for broader pattern matches. Use these when you want to handle an entire feature area uniformly rather than listing each variant:

import zio.*
import zio.nats.NatsError

// Catch any JetStream error
someEffect.catchSome {
case e: NatsError.JetStreamError => ZIO.logError(s"JetStream: ${e.message}")
}

// Catch any KV error
kvEffect.catchSome {
case e: NatsError.KeyValueError => ZIO.logError(s"KV: ${e.message}")
}
Sub-sealed traitMembers
NatsError.JetStreamErrorJetStreamApiError, JetStreamPublishFailed, JetStreamConsumeFailed
NatsError.KeyValueErrorKeyValueOperationFailed, KeyNotFound
NatsError.ObjectStoreErrorObjectStoreOperationFailed, ObjectNotFound, ObjectAlreadyExists
NatsError.ServiceErrorServiceOperationFailed, ServiceStartFailed, ServiceCallFailed

Common handling patterns

Log and continue - use message which is defined on all variants:

import zio.*
import zio.nats.*

effect.catchAll(e => ZIO.logError(s"NATS error: ${e.message}"))

Retry on connection loss:

import zio.*
import zio.nats.*

def withRetry[A](effect: IO[NatsError, A]): IO[NatsError, A] =
effect.retry(
Schedule.recurWhile[NatsError] { case _: NatsError.ConnectionClosed => true; case _ => false }
&& Schedule.recurs(3)
&& Schedule.exponential(500.millis)
)

Map to a simpler error type:

import zio.*
import zio.nats.*

def publish(nats: Nats, subject: Subject, msg: String): IO[String, Unit] =
nats.publish(subject, msg).mapError(_.message)

Handle missing keys - KeyValue#get returns Option[KvEnvelope[A]] where None means the key was never written or was purged. KeyNotFound is raised by operations that require the key to already exist, such as KeyValue#update with an expected revision:

import zio.*
import zio.nats.*

// Returns None for a missing key - no KeyNotFound to handle
val value: IO[NatsError, Option[KvEnvelope[String]]] =
kv.get[String]("my-key")

// Raises KeyNotFound if the key does not exist
val updated: IO[NatsError, Long] =
kv.update("my-key", "new-value", expectedRevision = 5L)

Handle service call errors (typed) - use Nats#requestService with a shared ServiceEndpoint descriptor. Domain errors (Err) and transport failures (NatsError) both go into the ZIO error channel as IO[NatsError | Err, Out]. NatsCodec[Err] must be in scope:

import zio.*
import zio.nats.*

// Endpoint shared between server and client
val ep = ServiceEndpoint("do-thing")
.in[String]
.out[String]
.failsWith[String]

// IO[NatsError | String, String] - catchSome by type
val result: IO[NatsError, Option[String]] =
nats.requestService(ep, "input", 5.seconds)
.map(Some(_))
.catchSome { case _: String => ZIO.succeed(None) }

Handle service call errors (untyped) - when the endpoint descriptor is not available, Nats#request still detects the Nats-Service-Error header and fails with ServiceCallFailed. This covers infrastructure errors from Nats#requestService too:

import zio.*
import zio.nats.*

val result: IO[NatsError, Option[String]] =
nats.request[String, String](subject"my-service.do-thing", "input", 5.seconds)
.map(env => Some(env.value))
.catchSome {
case NatsError.ServiceCallFailed(msg, code) =>
ZIO.logWarning(s"Service error $code: $msg").as(None)
}