Serialization
All Nats#publish, Nats#subscribe, and Nats#request methods are generic over [A: NatsCodec]. The codec is resolved at compile time - no casting, no runtime lookup, and no separate encode/decode step in your code. zio-nats comes batteries-included with zio-blocks, jsoniter-scala, and play-json integrations, and if none of those fit you can bring your own in a few lines.
Built-in codecs
Two codecs are always available with no setup - they cover the cases where encoding is handled externally or the payload is already in wire form:
| Type | Behaviour |
|---|---|
String | UTF-8 encode/decode |
Chunk[Byte] | Identity - bytes pass through unchanged |
Use them to publish raw events or forward payloads without touching the content:
import zio.*
import zio.nats.*
val builtins: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
for {
_ <- nats.publish(subject"shop.events", "order-placed")
_ <- nats.publish(subject"shop.events", Chunk.fromArray("order-placed".getBytes))
} yield ()
}
For domain types like OrderPlaced or PriceUpdate, read on.
zio-blocks
zio-blocks derives codecs from a compile-time Schema. It is included in the batteries-included zio-nats artifact and is the recommended approach for domain types - define the schema once and every format (JSON, Avro, MsgPack) becomes available with no extra code per type.
zio-blocks is not yet stable. The schema derivation API that zio-nats uses appears to be settling, and our surface with the library is intentionally small, but breaking changes are possible as zio-blocks evolves toward a stable release. Check the zio-blocks releases when upgrading.
Setup
Derive a Schema for your type and build a codec from a format. We use OrderPlaced as the domain type throughout this section:
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat
case class OrderPlaced(orderId: String, customerId: String, total: Double)
object OrderPlaced {
given Schema[OrderPlaced] = Schema.derived
}
val codecs = NatsCodec.fromFormat(JsonFormat)
import codecs.derived // NatsCodec[OrderPlaced] is now in implicit scope
What's happening:
Schema.derived- zio-blocks derives a schema forOrderPlacedat compile time.NatsCodec.fromFormat(JsonFormat)- creates aBuilderthat can deriveNatsCodec[A]for anyAwith aSchema[A]in scope.import codecs.derived- this is where the codec forOrderPlacedis compiled and cached. If theSchemais missing or the format cannot handle the type, you get an error here - not buried in a laterpublishcall.
Publish and subscribe
With the codec in scope, Nats#publish and Nats#subscribe accept OrderPlaced directly:
import zio.*
import zio.nats.*
val typedPubSub: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
for {
_ <- nats.publish(subject"shop.orders", OrderPlaced("ord-1", "cust-42", 59.99))
_ <- nats.subscribe[OrderPlaced](subject"shop.orders")
.map(_.value)
.tap(o => ZIO.debug(s"Order ${o.orderId} from ${o.customerId}: £${o.total}"))
.runDrain
} yield ()
}
Renaming fields
The @Modifier.rename annotation overrides the serialized key name for a specific field at the schema level, so it applies across all formats (JSON, Avro, MsgPack) without any codec-specific configuration. This is useful when publishing events to an external system that expects snake_case field names - a shipping carrier's webhook, a payment gateway, or a third-party analytics platform - while keeping your Scala field names in camelCase.
Annotate the fields that need a different wire name:
import zio.blocks.schema.{Schema, Modifier}
case class ShipmentEvent(
@Modifier.rename("order_id") orderId: String,
@Modifier.rename("carrier_code") carrierCode: String,
@Modifier.rename("tracking_no") trackingNo: String
)
object ShipmentEvent {
given Schema[ShipmentEvent] = Schema.derived
}
Since import codecs.derived is already in scope from the setup above, NatsCodec[ShipmentEvent] is derived automatically. We can confirm the wire format:
val encoded = NatsCodec[ShipmentEvent].encode(
ShipmentEvent("ord-1", "DHL", "Z123")
)
// encoded: Chunk[Byte] = IndexedSeq(
// 123,
// 34,
// 111,
// 114,
// 100,
// 101,
// 114,
// 95,
// 105,
// 100,
// 34,
// 58,
// 34,
// 111,
// 114,
// 100,
// 45,
// 49,
// 34,
// 44,
// 34,
// 99,
// 97,
// 114,
// 114,
// 105,
// 101,
// 114,
// 95,
// 99,
// 111,
// 100,
// 101,
// 34,
// 58,
// 34,
// 68,
// 72,
// 76,
// 34,
// 44,
// 34,
// 116,
// 114,
// 97,
// 99,
// 107,
// 105,
// ...
new String(encoded.toArray)
// res0: String = "{\"order_id\":\"ord-1\",\"carrier_code\":\"DHL\",\"tracking_no\":\"Z123\"}"
The JSON keys are snake_case; the Scala fields remain camelCase. Subscribers with the same schema decode the snake_case JSON back into the camelCase Scala type automatically. For the full range of schema modifiers and format options, see the zio-blocks documentation.
Available formats
zio-nats (batteries-included) brings in JSON transitively. For other formats, add the corresponding artifact:
libraryDependencies += "dev.zio" %% "zio-blocks-schema-avro" % "<zio-blocks-version>"
libraryDependencies += "dev.zio" %% "zio-blocks-schema-msgpack" % "<zio-blocks-version>"
libraryDependencies += "dev.zio" %% "zio-blocks-schema-thrift" % "<zio-blocks-version>"
libraryDependencies += "dev.zio" %% "zio-blocks-schema-bson" % "<zio-blocks-version>"
libraryDependencies += "dev.zio" %% "zio-blocks-schema-toon" % "<zio-blocks-version>"
libraryDependencies += "dev.zio" %% "zio-blocks-schema-xml" % "<zio-blocks-version>"
Each format follows the same pattern - swap out the import and pass the format object to NatsCodec.fromFormat. For example, XML uses import zio.blocks.schema.xml.XmlFormat in place of import zio.blocks.schema.json.JsonFormat. See the zio-blocks reference for format-specific configuration.
If you do not want the zio-blocks dependency, replace zio-nats with zio-nats-core in your build.sbt. The built-in String and Chunk[Byte] codecs are still available; you provide the rest. See Custom codecs below.
jsoniter-scala
jsoniter-scala is a high-performance JSON library that generates codecs at compile time with minimal overhead. Use zio-nats-jsoniter when a subject carries high-frequency messages - such as live price updates - where every microsecond of serialization overhead matters, or when you are already using jsoniter elsewhere and want to reuse existing codecs.
Add to build.sbt:
libraryDependencies += "io.github.pietersp" %% "zio-nats-jsoniter" % "0.1.1"
Automatic bridging
Place a given JsonValueCodec[A] in scope and import with import zio.nats.{given, *}. The library bridges it to NatsCodec[A] automatically - no builder step required:
import zio.*
import zio.nats.{given, *}
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
case class PriceUpdate(itemId: String, price: Double)
object PriceUpdate {
given JsonValueCodec[PriceUpdate] = JsonCodecMaker.make
}
val priceUpdates: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
for {
_ <- nats.publish(subject"shop.pricing", PriceUpdate("item-456", 12.99))
_ <- nats.subscribe[PriceUpdate](subject"shop.pricing").map(_.value).runDrain
} yield ()
}
// priceUpdates: ZIO[Nats, NatsError, Unit] = Stateful(
// trace = "repl.MdocSession.MdocApp1.priceUpdates(02-serialization.md:133)",
// onState = zio.FiberRef$unsafe$PatchFiber$$Lambda/0x00007fad4605c358@39fe503f
// )
A NotGiven[NatsCodec[A]] guard ensures the bridge never shadows built-in codecs or any explicit given NatsCodec[A] already in scope.
Explicit one-off codec
Use NatsCodecJsoniter.fromJsoniter when you need a codec for a single call without placing it in implicit scope:
import zio.nats.*
import zio.nats.NatsCodecJsoniter
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
val priceUpdateCodec: NatsCodec[PriceUpdate] =
NatsCodecJsoniter.fromJsoniter(JsonCodecMaker.make[PriceUpdate])
If jsoniter is your only serialization layer, you can drop the zio-blocks transitive dependency entirely. Pair zio-nats-jsoniter with zio-nats-core instead of zio-nats and your build stays lean.
play-json
play-json works the same way as jsoniter - place a given Format[A] in scope and it is bridged to NatsCodec[A] automatically. This is useful when your project already defines play-json Format instances and you want to reuse them without any additional codec setup.
Add to build.sbt:
libraryDependencies += "io.github.pietersp" %% "zio-nats-play-json" % "0.1.1"
Place the Format in scope and import with import zio.nats.{given, *}:
import zio.*
import zio.nats.{given, *}
import play.api.libs.json.{Format, Json}
case class ShipmentStatus(orderId: String, status: String)
object ShipmentStatus {
given Format[ShipmentStatus] = Json.format[ShipmentStatus]
}
val shipmentEvents: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
nats.publish(subject"shop.shipments", ShipmentStatus("ord-1", "dispatched"))
}
// shipmentEvents: ZIO[Nats, NatsError, Unit] = Stateful(
// trace = "repl.MdocSession.MdocApp2.shipmentEvents(02-serialization.md:183)",
// onState = zio.FiberRef$unsafe$PatchFiber$$Lambda/0x00007fad4605c358@6477cb9f
// )
Use NatsCodecPlayJson.fromPlayJson(format) for an explicit one-off codec.
Already on play-json and not planning to add zio-blocks? Swap zio-nats for zio-nats-core + zio-nats-play-json and keep the dependency tree minimal.
zio-json
zio-json is the ZIO ecosystem's own JSON library. It uses compile-time derivation via Scala 3 Mirror and is co-published alongside ZIO, making it a natural fit if your project is already in the ZIO stack. Use zio-nats-zio-json when your project defines JsonEncoder[A] and JsonDecoder[A] instances for its domain types and you want to reuse them without any additional codec setup.
Add to build.sbt:
libraryDependencies += "io.github.pietersp" %% "zio-nats-zio-json" % "0.1.1"
Automatic bridging
Derive JsonEncoder[A] and JsonDecoder[A] for your type — or use a combined JsonCodec[A] — and import with import zio.nats.{given, *}. The library bridges them to NatsCodec[A] automatically:
import zio.*
import zio.nats.{given, *}
import zio.json.{DeriveJsonEncoder, DeriveJsonDecoder, JsonEncoder, JsonDecoder}
case class StockQuote(ticker: String, price: Double)
object StockQuote {
given JsonEncoder[StockQuote] = DeriveJsonEncoder.gen[StockQuote]
given JsonDecoder[StockQuote] = DeriveJsonDecoder.gen[StockQuote]
}
val stockQuotes: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
for {
_ <- nats.publish(subject"shop.quotes", StockQuote("ACME", 42.50))
_ <- nats.subscribe[StockQuote](subject"shop.quotes").map(_.value).runDrain
} yield ()
}
// stockQuotes: ZIO[Nats, NatsError, Unit] = Stateful(
// trace = "repl.MdocSession.MdocApp3.stockQuotes(02-serialization.md:218)",
// onState = zio.FiberRef$unsafe$PatchFiber$$Lambda/0x00007fad4605c358@7621f19b
// )
A combined JsonCodec[A] (which extends both JsonEncoder[A] and JsonDecoder[A]) works equally well:
import zio.json.{DeriveJsonCodec, JsonCodec}
case class Heartbeat(serviceId: String, ts: Long)
object Heartbeat {
given JsonCodec[Heartbeat] = DeriveJsonCodec.gen[Heartbeat]
}
// NatsCodec[Heartbeat] resolved automatically via the fromZioJson bridge
A NotGiven[NatsCodec[A]] guard ensures the bridge never shadows built-in codecs or any explicit given NatsCodec[A] already in scope.
Explicit one-off codec
Use NatsCodecZioJson.fromZioJson when you need a codec for a single call without placing it in implicit scope:
import zio.nats.*
import zio.nats.NatsCodecZioJson
import zio.json.{DeriveJsonEncoder, DeriveJsonDecoder}
val stockQuoteCodec: NatsCodec[StockQuote] =
NatsCodecZioJson.fromZioJson(
DeriveJsonEncoder.gen[StockQuote],
DeriveJsonDecoder.gen[StockQuote]
)
Already on zio-json and not planning to add zio-blocks? Swap zio-nats for zio-nats-core + zio-nats-zio-json and keep the dependency tree minimal.
Mixing codecs
All four integration styles can coexist in the same project. Each type resolves its codec independently - zio-blocks for most types, jsoniter for the ones that need maximum throughput:
import zio.*
import zio.nats.*
import zio.blocks.schema.Schema
import zio.blocks.schema.json.JsonFormat
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
// Most domain types use zio-blocks
case class OrderConfirmed(orderId: String, ts: Long)
object OrderConfirmed {
given Schema[OrderConfirmed] = Schema.derived
}
val defaultCodecs = NatsCodec.fromFormat(JsonFormat)
import defaultCodecs.derived
// High-frequency type uses jsoniter for performance
case class InventoryTick(itemId: String, delta: Int)
object InventoryTick {
given JsonValueCodec[InventoryTick] = JsonCodecMaker.make
}
// NatsCodec[InventoryTick] resolved via the jsoniter bridge
// NatsCodec[OrderConfirmed] resolved via zio-blocks
Custom codecs
NatsCodec[A] is a two-method trait that lives in zio-nats-core with no external dependencies. Implementing it directly is the right choice when you are heavily invested in a serialization library not covered by the built-in integrations - circe, µPickle, Protocol Buffers, a proprietary binary format - and want to bridge it without adopting zio-blocks. It is also the right choice when you want the smallest possible dependency footprint.
In either case, replace zio-nats with zio-nats-core in build.sbt. The two artifacts are mutually exclusive - zio-nats-core has no zio-blocks dependency, while zio-nats (batteries-included) brings it in transitively:
libraryDependencies += "io.github.pietersp" %% "zio-nats-core" % "0.1.1"
The two methods to implement are encode, which produces bytes, and decode, which recovers the value or returns a NatsDecodeError. To show how compact this can be, here is a binary protocol for high-frequency stock ticks - each message is exactly 10 bytes:
import java.nio.ByteBuffer
import zio.Chunk
import zio.nats.*
case class StockTick(itemId: Int, priceCents: Int, quantity: Short)
given NatsCodec[StockTick] = new NatsCodec[StockTick] {
private val Size = 10 // 4 + 4 + 2 bytes
def encode(t: StockTick): Chunk[Byte] = {
val buf = ByteBuffer.allocate(Size)
buf.putInt(t.itemId)
buf.putInt(t.priceCents)
buf.putShort(t.quantity)
Chunk.fromArray(buf.array())
}
def decode(bytes: Chunk[Byte]): Either[NatsDecodeError, StockTick] =
if (bytes.length != Size)
Left(NatsDecodeError(s"Expected $Size bytes, got ${bytes.length}"))
else {
val buf = ByteBuffer.wrap(bytes.toArray)
Right(StockTick(buf.getInt(), buf.getInt(), buf.getShort()))
}
}
We can verify the payload size stays at exactly 10 bytes regardless of the values:
NatsCodec[StockTick].encode(StockTick(42, 1299, 100)).length
// res4: Int = 10
With the given in scope, StockTick works exactly like any other type:
import java.nio.ByteBuffer
import zio.*
import zio.Chunk
import zio.nats.*
case class StockTick(itemId: Int, priceCents: Int, quantity: Short)
given NatsCodec[StockTick] = new NatsCodec[StockTick] {
private val Size = 10
def encode(t: StockTick): Chunk[Byte] = {
val buf = ByteBuffer.allocate(Size)
buf.putInt(t.itemId)
buf.putInt(t.priceCents)
buf.putShort(t.quantity)
Chunk.fromArray(buf.array())
}
def decode(bytes: Chunk[Byte]): Either[NatsDecodeError, StockTick] =
if (bytes.length != Size) Left(NatsDecodeError(s"Expected $Size bytes, got ${bytes.length}"))
else { val buf = ByteBuffer.wrap(bytes.toArray); Right(StockTick(buf.getInt(), buf.getInt(), buf.getShort())) }
}
val ticks: ZIO[Nats, NatsError, Unit] =
ZIO.serviceWithZIO[Nats] { nats =>
for {
_ <- nats.publish(subject"shop.ticks", StockTick(42, 1299, 100))
_ <- nats.subscribe[StockTick](subject"shop.ticks").map(_.value).runDrain
} yield ()
}
Next steps
- JetStream guide - persistent messaging with typed payloads
- Modules reference - which artifact to add for each integration