Skip to main content

Service Framework

The NATS Service Framework (also called the Micro protocol) turns any set of request-reply handlers into a discoverable microservice with zero extra infrastructure. Services announce themselves over standard NATS subjects, respond to ping, info, and stats queries from any client on the cluster, and load-balance requests across multiple instances automatically via queue groups. There are no sidecars, no service registries, and no additional network hops - discovery and routing are built into NATS itself.

Defining endpoints

An endpoint is built via a short builder chain. Start with ServiceEndpoint(name), fix the request type with .in[In], fix the reply type with .out[Out], and optionally declare domain error types with one or more .failsWith[Err] calls. Each step changes the Scala type, so the compiler enforces the full contract before you ever write handler logic:

ServiceEndpoint("name")          // NamedEndpoint  - no types yet
.in[StockRequest] // EndpointIn - In fixed
.out[StockReply] // ServiceEndpoint[StockRequest, Nothing, StockReply]
.failsWith[StockError] // ServiceEndpoint[StockRequest, StockError, StockReply]
.failsWith[ValidationError] // ServiceEndpoint[StockRequest, StockError | ValidationError, StockReply]

The resulting ServiceEndpoint[In, Err, Out] is an inert descriptor - it carries the name, codecs, and error type, but no handler logic. This makes it easy to share the same descriptor between the server (which binds a handler) and the client (which uses it with Nats#requestService).

A concrete infallible endpoint looks like this:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val stockEndpoint = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]

Implementing a handler

ServiceEndpoint#handle binds a handler function In => IO[Err, Out] to the descriptor and returns a BoundEndpoint ready for registration. The handler receives only the decoded payload - the framework takes care of encoding, decoding, and error responses:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val bound = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.handle { req =>
ZIO.succeed(StockReply(available = 42, reserved = 3))
}

When you need the raw request envelope - to read a header, inspect the originating subject, or propagate a trace ID - use ServiceEndpoint#handleWith instead:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val boundWithMeta = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.handleWith { req =>
val traceId = req.headers.get("X-Trace-Id").headOption.getOrElse("none")
ZIO.debug(s"Handling ${req.value.itemId} trace=$traceId") *>
ZIO.succeed(StockReply(available = 42, reserved = 3))
}

Handling errors

When your handler can fail, add ServiceEndpoint#failsWith[E] before ServiceEndpoint#handle. The error type is now part of the descriptor - the same codec that encodes domain errors on the server decodes them on the client. A universal ServiceErrorMapper[E] (e.toString, code 500) is always in scope, so no extra setup is required for most types. Built-in mappers for String and NatsError are also provided:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val fallibleEndpoint = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.failsWith[String]
.handle { req =>
if (req.itemId.isEmpty) ZIO.fail("itemId must not be empty")
else ZIO.succeed(StockReply(available = 42, reserved = 3))
}

Custom ServiceErrorMapper

To control the HTTP-style status code or error message format, provide a given ServiceErrorMapper[E] in scope before building the endpoint. The full form uses with:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

enum StockError:
case NotFound(id: String)
case InvalidRequest(msg: String)

object StockError { given Schema[StockError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

given ServiceErrorMapper[StockError] with {
def toErrorResponse(e: StockError): (String, Int) = e match {
case StockError.NotFound(id) => (s"Item $id not found", 404)
case StockError.InvalidRequest(msg) => (msg, 400)
}
}

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val ep = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.failsWith[StockError]

Because ServiceErrorMapper[E] has a single abstract method, Scala 3 also accepts a lambda (SAM syntax), which is more concise:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

enum StockError:
case NotFound(id: String)
case InvalidRequest(msg: String)

object StockError { given Schema[StockError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

given ServiceErrorMapper[StockError] = { e =>
e match {
case StockError.NotFound(id) => (s"Item $id not found", 404)
case StockError.InvalidRequest(msg) => (msg, 400)
}
}

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val ep = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.failsWith[StockError]

Both forms produce identical behaviour. The (message, code) pair is sent to callers via the Nats-Service-Error and Nats-Service-Error-Code headers, following the NATS Micro protocol convention.

Union error types

When a handler can fail with more than one distinct error type, chain multiple failsWith calls — one per error member. The framework encodes the fully-qualified class name of the concrete error as a Nats-Service-Error-Type header on each reply, and the client dispatches decoding to the matching member codec using that tag:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class OrderRequest(itemId: String, qty: Int)
case class OrderReply(orderId: String)
case class ValidationError(field: String, reason: String)
case class PaymentError(code: Int)

object OrderRequest { given Schema[OrderRequest] = Schema.derived }
object OrderReply { given Schema[OrderReply] = Schema.derived }
object ValidationError { given Schema[ValidationError] = Schema.derived }
object PaymentError { given Schema[PaymentError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val orderEndpoint = ServiceEndpoint("order")
.in[OrderRequest]
.out[OrderReply]
.failsWith[ValidationError]
.failsWith[PaymentError]

The resulting descriptor has type ServiceEndpoint[OrderRequest, ValidationError | PaymentError, OrderReply]. On the client side, Nats#requestService returns IO[NatsError | ValidationError | PaymentError, OrderReply] — each member is decoded with its own codec and surfaces directly in the ZIO error channel:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class OrderRequest(itemId: String, qty: Int)
case class OrderReply(orderId: String)
case class ValidationError(field: String, reason: String)
case class PaymentError(code: Int)

object OrderRequest { given Schema[OrderRequest] = Schema.derived }
object OrderReply { given Schema[OrderReply] = Schema.derived }
object ValidationError { given Schema[ValidationError] = Schema.derived }
object PaymentError { given Schema[PaymentError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val orderEndpoint = ServiceEndpoint("order")
.in[OrderRequest]
.out[OrderReply]
.failsWith[ValidationError]
.failsWith[PaymentError]

val placeOrder: ZIO[Nats, NatsError | ValidationError | PaymentError, OrderReply] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.requestService(orderEndpoint, OrderRequest("item-1", 2), 5.seconds)
}

Chain failsWith calls to accumulate as many error members as you need:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class OrderRequest(itemId: String, qty: Int)
case class OrderReply(orderId: String)
case class ValidationError(field: String, reason: String)
case class PaymentError(code: Int)
case class InventoryError(shortage: Int)
case class AuthError(reason: String)
case class RateLimitError(retryAfterSeconds: Int)
case class ConflictError(id: String)

object OrderRequest { given Schema[OrderRequest] = Schema.derived }
object OrderReply { given Schema[OrderReply] = Schema.derived }
object ValidationError { given Schema[ValidationError] = Schema.derived }
object PaymentError { given Schema[PaymentError] = Schema.derived }
object InventoryError { given Schema[InventoryError] = Schema.derived }
object AuthError { given Schema[AuthError] = Schema.derived }
object RateLimitError { given Schema[RateLimitError] = Schema.derived }
object ConflictError { given Schema[ConflictError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

// Three members
val ep3 = ServiceEndpoint("order")
.in[OrderRequest].out[OrderReply]
.failsWith[ValidationError]
.failsWith[PaymentError]
.failsWith[InventoryError]

// Any number of members via chaining
val ep6 = ServiceEndpoint("order-full")
.in[OrderRequest].out[OrderReply]
.failsWith[ValidationError]
.failsWith[PaymentError]
.failsWith[InventoryError]
.failsWith[AuthError]
.failsWith[RateLimitError]
.failsWith[ConflictError]

ServiceEndpoint#failsWith accumulates each chained error type into the endpoint error channel, so you can keep adding members as needed. Repeating the same error type is also allowed.

If you prefer a single domain error type shared across your service boundary, you can still model the errors as a sealed enum and use the single-type failsWith[E] overload:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

enum OrderError:
case ValidationError(field: String, reason: String)
case PaymentError(code: Int)
case InventoryError(shortage: Int)
case AuthError(reason: String)
case RateLimitError(retryAfterSeconds: Int)
case ConflictError(id: String)

object OrderError { given Schema[OrderError] = Schema.derived }

given ServiceErrorMapper[OrderError] = {
case OrderError.ValidationError(_, _) => ("Validation failed", 400)
case OrderError.PaymentError(_) => ("Payment declined", 402)
case OrderError.InventoryError(_) => ("Insufficient stock", 409)
case OrderError.AuthError(_) => ("Unauthorized", 401)
case OrderError.RateLimitError(_) => ("Too many requests", 429)
case OrderError.ConflictError(_) => ("Conflict", 409)
}

case class OrderRequest(itemId: String, qty: Int)
case class OrderReply(orderId: String)

object OrderRequest { given Schema[OrderRequest] = Schema.derived }
object OrderReply { given Schema[OrderReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val orderEndpoint = ServiceEndpoint("order")
.in[OrderRequest]
.out[OrderReply]
.failsWith[OrderError]

A single NatsCodec[OrderError] covers all cases and a single ServiceErrorMapper pattern-matches to emit fine-grained status codes.

Each member's ServiceErrorMapper is resolved independently for union overloads, so you can provide a custom status code for one type and rely on the universal fallback for others. Scala 3 union types also compose silently across multiple Nats#requestService calls — if you call two union-typed endpoints in the same comprehension, the error channels widen into a single union type with no manual merging.

Starting a service

ServiceConfig names the service and declares its version. Pass one or more bound endpoints to Nats#service to register them and start the service. The returned NatsService handle gives access to live stats and a reset operation:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val startService: ZIO[Nats & Scope, NatsError, NatsService] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.service(
ServiceConfig(
name = "inventory",
version = "1.0.0",
description = Some("Stock level queries")
),
ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.handle { req => ZIO.succeed(StockReply(available = 42, reserved = 3)) }
)
}

The Scope in the return type ties the service lifetime to the enclosing scope - when the scope closes, the service shuts down and stops accepting requests. Requests arrive on the subject <service-name>.<endpoint-name> by default - in this case inventory.stock-check. Multiple instances started with the same config share a queue group automatically, so NATS distributes requests across them with no additional configuration.

Grouping endpoints. Call NamedEndpoint#inGroup to prepend a subject prefix. All endpoints in the same group share a common namespace:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)
case class PriceRequest(itemId: String)
case class PriceReply(cents: Long)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }
object PriceRequest { given Schema[PriceRequest] = Schema.derived }
object PriceReply { given Schema[PriceReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val startGrouped: ZIO[Nats & Scope, NatsError, NatsService] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.service(
ServiceConfig(name = "catalog-service", version = "1.0.0"),
ServiceEndpoint("stock").inGroup("catalog")
.in[StockRequest].out[StockReply]
.handle(_ => ZIO.succeed(StockReply(42, 3))),
ServiceEndpoint("price").inGroup("catalog")
.in[PriceRequest].out[PriceReply]
.handle(_ => ZIO.succeed(PriceReply(1299)))
)
}

Both endpoints are now reachable under the catalog prefix: catalog.stock and catalog.price. We now have a running service with two typed, discoverable endpoints.

Calling a service

We have three ways to call a service endpoint: a typed call with Nats#requestService that surfaces both domain and infrastructure errors in the ZIO error channel, a raw call with Nats#request using ServiceEndpoint#effectiveSubject for infallible endpoints, and an untyped Nats#request for when the endpoint descriptor is not available.

Typed calls with requestService

Nats#requestService is the preferred way to call a fallible service endpoint. It uses the ServiceEndpoint descriptor as the complete contract - the subject, input and output codecs, and error codec are all captured there. Both domain errors (Err) and transport failures (NatsError) surface directly in the ZIO error channel as IO[NatsError | Err, Out]:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)
case class StockError(reason: String)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }
object StockError { given Schema[StockError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

// Shared endpoint descriptor - the complete typed contract
val stockEndpoint = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]
.failsWith[StockError]

val checkStock: ZIO[Nats, NatsError | StockError, Int] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.requestService(stockEndpoint, StockRequest("item-456"), 5.seconds)
.map(_.available)
}

Nats#requestService produces:

  • out: Out when the handler succeeds
  • Fails with err: Err when the handler returns a domain error
  • Fails with NatsError.ServiceCallFailed for infrastructure errors (codec crash, connection issues)
  • Fails with NatsError.Timeout if no reply arrives in time

Because the error type is NatsError | Err, Scala 3 union types compose naturally across multiple calls - no manual error merging needed:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)
case class PriceRequest(itemId: String)
case class PriceReply(cents: Long)
case class StockError(reason: String)
case class PriceError(reason: String)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }
object PriceRequest { given Schema[PriceRequest] = Schema.derived }
object PriceReply { given Schema[PriceReply] = Schema.derived }
object StockError { given Schema[StockError] = Schema.derived }
object PriceError { given Schema[PriceError] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val stockEp = ServiceEndpoint("stock").in[StockRequest].out[StockReply].failsWith[StockError]
val priceEp = ServiceEndpoint("price").in[PriceRequest].out[PriceReply].failsWith[PriceError]

// Error type widens to NatsError | StockError | PriceError automatically
val checkBoth: ZIO[Nats, NatsError | StockError | PriceError, (StockReply, PriceReply)] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.requestService(stockEp, StockRequest("item-1"), 5.seconds)
.zipPar(nats.requestService(priceEp, PriceRequest("item-1"), 5.seconds))
}

ServiceEndpoint#failsWith only requires a NatsCodec[E] in scope — the same codec encodes the error on the server and decodes it on the client. NatsCodec[E] is derived automatically from the Schema[E] when using NatsCodec.fromFormat. A ServiceErrorMapper[E] is resolved automatically via the universal fallback; provide a specific instance to customise the Nats-Service-Error header value or status code. For multiple distinct error types, chain failsWith calls as described in the union error types section above.

Calling infallible endpoints

For infallible endpoints (Err = Nothing) there is no domain error to decode, so Nats#requestService does not apply. Use Nats#request with ServiceEndpoint#effectiveSubject to get the correct subject from the descriptor:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val stockEndpoint = ServiceEndpoint("stock-check")
.in[StockRequest]
.out[StockReply]

val checkStock: ZIO[Nats, NatsError, StockReply] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.request[StockRequest, StockReply](
stockEndpoint.effectiveSubject,
StockRequest("item-456"),
timeout = 5.seconds
).payload
}

Untyped calls with Nats#request

Nats#request works with any subject, including service endpoints, and does not require knowledge of the endpoint's error type. When a service handler fails, it still detects the Nats-Service-Error header and fails with NatsError.ServiceCallFailed(message, code). Use this when you do not share the endpoint descriptor or when building lower-level tooling:

import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat

case class StockRequest(itemId: String)
case class StockReply(available: Int, reserved: Int)

object StockRequest { given Schema[StockRequest] = Schema.derived }
object StockReply { given Schema[StockReply] = Schema.derived }

val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived

val checkStock: ZIO[Nats, NatsError, Option[StockReply]] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.request[StockRequest, StockReply](
subject"inventory.stock-check",
StockRequest("item-456"),
timeout = 5.seconds
)
.map(env => Some(env.value))
.catchSome {
case NatsError.ServiceCallFailed(msg, code) =>
ZIO.logWarning(s"Service error $code: $msg").as(None)
}
}

The code field mirrors the HTTP status convention used by the NATS Micro protocol - a custom ServiceErrorMapper can emit any code.

Service discovery

ServiceDiscovery queries all running instances of a service across the cluster by broadcasting to the $SRV.* subjects that NATS reserves for service metadata. It is a read-only client - use it from monitoring dashboards, health checks, or admin tooling, not from handlers themselves.

ServiceDiscovery.live builds the layer from an existing Nats connection. Call ServiceDiscovery#ping, ServiceDiscovery#info, or ServiceDiscovery#stats with or without a service name to scope the query:

import zio.*
import zio.nats.*

val discoverAll: ZIO[ServiceDiscovery, NatsError, Unit] =
ZIO.serviceWithZIO[ServiceDiscovery] { sd =>
for {
responses <- sd.info("inventory")
_ <- ZIO.foreach(responses) { r =>
ZIO.debug(s"${r.name} v${r.version} - ${r.endpoints.map(_.name).mkString(", ")}")
}
} yield ()
}

The three discovery operations each have three overloads:

CallReturns
ping()List[PingResponse] - all services in the cluster
ping(name)List[PingResponse] - all instances of the named service
ping(name, id)Option[PingResponse] - a specific instance by ID
info(...)Same shapes, returns InfoResponse with endpoint list
stats(...)Same shapes, returns StatsResponse with per-endpoint counters

NatsService#stats returns the same StatsResponse from the server side - useful for exposing metrics from within the service process itself.

Next steps

  • Key-Value guide - persistent key-value storage built on JetStream
  • Pub/Sub guide - the request-reply mechanics that service endpoints are built on