NATS Messaging Patterns


NATS

NATS is a streaming message broker.

Their tagline is:

The Real-Time Communication Fabric for Distributed Applications

with a subtitle of:

From cloud to edge, NATS unifies messaging, streaming, and state into a single real-time system that runs anywhere

Let’s see what it is, how to use it, and some of the useful patterns it enables.

Runtime

NATS is “just a Go binary”. In the simplest form, it’s a messaging service that is very easy to run.

From there clustering “just works” out-of-the-box. Instances can form a cluster, and will automatically join into existing clusters.

NATS servers achieve this by gossiping about and connecting to, all of the servers they know, thus dynamically forming a full mesh

It is truly “cloud native” from the first install.

JetStream

The newest and most powerful feature of NATS is JetStream1.

From the docs:

NATS has a built-in persistence engine called JetStream which enables messages to be stored and replayed at a later time. Unlike NATS Core which requires you to have an active subscription to process messages as they happen, JetStream allows the NATS server to capture messages and replay them to consumers as needed.

This is very similar to other message brokers, like Kafka or RabbitMQ2

This feature allows for message persistence, and is deeply configurable. Think Kafka, but as a super lightweight Go binary.

With persistence enabled for messages, additional advanced features, quality-of-service, and use cases are unlocked.

How to use it

There are a few core messaging patterns that are natural on NATS as a message broker.

Let’s see an overview of some of these, to hint at powerful new paradigms.

Pub/Sub

Pub/Sub (Publisher/Subscriber) is a traditional “broadcast” messaging pattern.

A Producer can publish a message, and all subscribers will receive it.

Here are some example usages of this pattern:

Publisher

Posting a basic message to NATS is easy:

nc.publish("messages", sc.encode("js"));
nc.Publish("messages", []byte("go"))
await nc.publish("messages", b"python")
client.publish("messages", "rust".into()).await.ok();
$client->publish('messages', 'php');

In the most basic form publishing is “fire and forget”.

Subscribers

Message receivers can connect and listen for a subscription of messages.

The server will push messages to all listening clients.

const sub = nc.subscribe("messages");
for await (const msg of sub) {
console.log(`pubsub - consumer - received ${sc.decode(msg.data)}`);
}
nc.Subscribe("messages", func(msg *nats.Msg) {
log.Printf("pubsub - consumer - received %s", string(msg.Data))
})
sub = await nc.subscribe("messages")
async for msg in sub.messages:
print(f"pubsub - consumer - received {msg.data.decode()}", flush=True)
let mut sub = client.subscribe("messages").await.unwrap();
while let Some(msg) = sub.next().await {
println!(
"pubsub - consumer - received {}",
std::str::from_utf8(&msg.payload).unwrap()
);
}
$client = new Client($config);
$client->subscribe('messages', function ($message) {
echo "pubsub - consumer - received " . $message->body . PHP_EOL;
});
while (true) {
$client->process(1);
}

Most client libraries will use some kind of callback or hook to process messages.

Work Queues

Next, we can look at a very standard messaging pattern, worker queues.

Jobs are produced, and expected to be dispatched by downstream worker instances.

When we use work queue semantics, we expect one worker to take a task and work on it, one at a time.

Producer

Producers can create a Workqueue stream, which has exactly the semantics we need:

const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "JOBS",
subjects: ["job.*"],
retention: RetentionPolicy.Workqueue,
});
console.log("workqueue - producer - ready");
const js = nc.jetstream();
while (true) {
for (let i = 0; i < 5; i++) {
const ack = await js.publish("job.js", sc.encode("js"));
console.log(`workqueue - producer - published ${ack.seq}`);
}
await new Promise((resolve) => setTimeout(resolve, 10000));
}
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
Name: "JOBS",
Subjects: []string{"job.*"},
Retention: jetstream.WorkQueuePolicy,
})
if err != nil {
log.Fatalf("Failed to create JOBS stream: %v", err)
}
log.Println("workqueue - producer - ready")
go func() {
for {
for i := 0; i < 5; i++ {
ack, err := js.Publish(context.Background(), "job.go", []byte("go"))
if err != nil {
log.Printf("workqueue - producer - error %v", err)
} else {
log.Printf("workqueue - producer - published %d", ack.Sequence)
}
}
time.Sleep(10 * time.Second)
}
}()
js = nc.jetstream()
await js.add_stream(
StreamConfig(
name="JOBS",
subjects=["job.*"],
retention=RetentionPolicy.WORK_QUEUE,
)
)
print("workqueue - producer - ready", flush=True)
asyncio.create_task(_workqueue_produce(js))
async def _workqueue_produce(js):
while True:
for _ in range(5):
ack = await js.publish("job.python", b"python")
print(f"workqueue - producer - published {ack.seq}", flush=True)
await asyncio.sleep(10)
let js = jetstream::new(client);
js.get_or_create_stream(jetstream::stream::Config {
name: "JOBS".to_string(),
subjects: vec!["job.*".to_string()],
retention: RetentionPolicy::WorkQueue,
..Default::default()
})
.await
.expect("Failed to create JOBS stream");
println!("workqueue - producer - ready");
loop {
for _ in 0..5 {
match js.publish("job.rust", "rust".into()).await {
Ok(ack_future) => match ack_future.await {
Ok(ack) => println!("workqueue - producer - published {}", ack.sequence),
Err(e) => println!("workqueue - producer - error {}", e),
},
Err(e) => println!("workqueue - producer - error {}", e),
}
}
sleep(Duration::from_secs(10)).await;
}
$stream = $client->getApi()->getStream('JOBS');
$stream->getConfiguration()
->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)
->setSubjects(['job.*'])
->setDenyDelete(false)
->setAllowRollupHeaders(false);
try {
$stream->create();
} catch (\Exception $e) {
// Stream may already exist from another service
}
echo "workqueue - producer - ready" . PHP_EOL;
while (true) {
for ($i = 0; $i < 5; $i++) {
$ack = $stream->publish('job.php', 'php');
echo "workqueue - producer - published " . $ack->seq . PHP_EOL;
}
sleep(10);
}

Workers

We can create worker instances as a consumer of the work queue.

Consumer groups are encoded on the server. As worker instances join the consumer group, they are included in the same pool of instances. Jobs in the workqueue are then forwarded to one member of the workqueue at a time.

Once a job is complete, we can ACK it, and the job will be removed from the queue.

const jsm = await nc.jetstreamManager();
await jsm.consumers.add("JOBS", {
durable_name: "js-workers",
filter_subject: "job.js",
ack_policy: "explicit",
});
const js = nc.jetstream();
const consumer = await js.consumers.get("JOBS", "js-workers");
const messages = await consumer.consume();
for await (const msg of messages) {
// simulate doing some work
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log(`workqueue - worker ${id} - processed ${msg.seq}`);
msg.ack();
}
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
cons, err := js.CreateOrUpdateConsumer(context.Background(), "JOBS", jetstream.ConsumerConfig{
Durable: "go-workers",
FilterSubject: "job.go",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
go func() {
iter, err := cons.Messages()
if err != nil {
log.Fatalf("Failed to start consuming: %v", err)
}
for {
msg, err := iter.Next()
if err != nil {
continue
}
// simulate doing some work
time.Sleep(1 * time.Second)
meta, _ := msg.Metadata()
log.Printf("workqueue - worker %d - processed %d", id, meta.Sequence.Stream)
msg.Ack()
}
}()
jsm = nc.jsm()
await jsm.add_consumer("JOBS", ConsumerConfig(
durable_name="python-workers",
filter_subject="job.python",
ack_policy=AckPolicy.EXPLICIT,
))
js = nc.jetstream()
sub = await js.pull_subscribe_bind("python-workers", "JOBS")
asyncio.create_task(_workqueue_consume(sub, id))
async def _workqueue_consume(sub, id):
while True:
try:
msgs = await sub.fetch(1, timeout=5)
for msg in msgs:
# simulate doing some work
await asyncio.sleep(1)
print(
f"workqueue - worker {id} - processed {msg.metadata.sequence.stream}",
flush=True,
)
await msg.ack()
except Exception:
await asyncio.sleep(1)
let js = jetstream::new(client);
let stream = js
.get_stream("JOBS")
.await
.expect("Failed to get JOBS stream");
let consumer = stream
.get_or_create_consumer(
"rust-workers",
jetstream::consumer::pull::Config {
durable_name: Some("rust-workers".to_string()),
filter_subject: "job.rust".to_string(),
..Default::default()
},
)
.await
.expect("Failed to create consumer");
let mut messages = consumer.messages().await.expect("Failed to get messages");
while let Some(Ok(msg)) = messages.next().await {
// simulate doing some work
sleep(Duration::from_secs(1)).await;
let info = msg.info().unwrap();
println!(
"workqueue - worker {} - processed {}",
id, info.stream_sequence
);
msg.ack().await.ok();
}
$stream = $client->getApi()->getStream('JOBS');
$consumer = $stream->getConsumer('php-workers');
$consumer->getConfiguration()->setSubjectFilter('job.php');
// Consumer may already exist from another worker
try {
$consumer->create();
} catch (\Exception $e) {}
$consumer->handle(function ($payload, $replyTo) use ($id) {
// simulate doing some work
sleep(1);
$parts = explode('.', $replyTo);
$seq = $parts[5] ?? '?';
echo "workqueue - worker $id - processed $seq" . PHP_EOL;
});

Messages are delivered to one member of a consumer group. Workers are dispatched messages round-robin. We can use explicit ACKs to allow for retries and re-deliveries. And we can even dispatch the stream to multiple consumer groups, as an advanced pattern.

A note, the workqueue is using Subject-Based messaging here. Let’s not go into further details here, but this is a powerful core feature.

Streams

Streams are the most flexible and powerful feature of NATS and JetStream.

Messages can be stored within JetStream (optionally with persistence to disk) and replayed ad-hoc.

Producers

An example producer creates a stream of state messages.

const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "STATE",
subjects: ["state.*"],
max_msgs_per_subject: 10,
});
console.log("state - producer - ready");
const js = nc.jetstream();
while (true) {
const ack = await js.publish(
"state.js",
sc.encode(String(Math.floor(Date.now() / 1000))),
);
console.log(`state - producer - published ${ack.seq}`);
await new Promise((resolve) => setTimeout(resolve, 60000));
}
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
_, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
Name: "STATE",
Subjects: []string{"state.*"},
MaxMsgsPerSubject: 10,
})
if err != nil {
log.Fatalf("Failed to create STATE stream: %v", err)
}
log.Println("state - producer - ready")
go func() {
for {
payload := fmt.Sprintf("%d", time.Now().Unix())
ack, err := js.Publish(context.Background(), "state.go", []byte(payload))
if err != nil {
log.Printf("state - producer - error %v", err)
} else {
log.Printf("state - producer - published %d", ack.Sequence)
}
time.Sleep(60 * time.Second)
}
}()
js = nc.jetstream()
await js.add_stream(
StreamConfig(
name="STATE",
subjects=["state.*"],
max_msgs_per_subject=10,
)
)
print("state - producer - ready", flush=True)
asyncio.create_task(_state_produce(js))
async def _state_produce(js):
while True:
ack = await js.publish("state.python", str(int(time.time())).encode())
print(f"state - producer - published {ack.seq}", flush=True)
await asyncio.sleep(60)
let js = jetstream::new(client);
js.get_or_create_stream(jetstream::stream::Config {
name: "STATE".to_string(),
subjects: vec!["state.*".to_string()],
max_messages_per_subject: 10,
..Default::default()
})
.await
.expect("Failed to create STATE stream");
println!("state - producer - ready");
loop {
let payload: String = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string();
match js.publish("state.rust", payload.into()).await {
Ok(ack_future) => match ack_future.await {
Ok(ack) => println!("state - producer - published {}", ack.sequence),
Err(e) => println!("state - producer - error {}", e),
},
Err(e) => println!("state - producer - error {}", e),
}
sleep(Duration::from_secs(60)).await;
}
$stream = $client->getApi()->getStream('STATE');
$stream->getConfiguration()
->setSubjects(['state.*'])
->setMaxMessagesPerSubject(10)
->setDenyDelete(false)
->setAllowRollupHeaders(false);
try {
$stream->create();
} catch (\Exception $e) {
// Stream may already exist from another service
}
echo "state - producer - ready" . PHP_EOL;
while (true) {
$ack = $stream->publish('state.php', (string)time());
echo "state - producer - published " . $ack->seq . PHP_EOL;
sleep(60);
}

A stream of state messages is produced with a payload of the current timestamp.

To make sure the demo does not use too much memory, the stream is configured to only keep the last ten state messages.

State Replay

Services can start up from an empty state, and replay state messages from the stream.

const consumer = await jsm.consumers.add("STATE", {
ack_policy: "none",
deliver_policy: "all",
inactive_threshold: 5000000000,
filter_subject: "state.js",
});
const c = await js.consumers.get("STATE", consumer.name);
const msgs = await c.fetch({ max_messages: 10, expires: 2000 });
let count = 0;
for await (const msg of msgs) {
console.log(
`state - consumer - replay ${msg.seq} ${sc.decode(msg.data)}`,
);
count++;
}
console.log(`state - consumer - replay complete ${count}`);
try {
await jsm.consumers.delete("STATE", consumer.name);
} catch (_) {}
cons, err := js.CreateConsumer(context.Background(), "STATE", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckNonePolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
InactiveThreshold: 5 * time.Second,
FilterSubject: "state.go",
})
if err != nil {
log.Printf("state - consumer - error %v", err)
continue
}
msgs, err := cons.Fetch(10, jetstream.FetchMaxWait(2*time.Second))
if err != nil {
log.Printf("state - consumer - error %v", err)
continue
}
count := 0
for msg := range msgs.Messages() {
meta, _ := msg.Metadata()
log.Printf("state - consumer - replay %d %s", meta.Sequence.Stream, string(msg.Data()))
count++
}
log.Printf("state - consumer - replay complete %d", count)
if info := cons.CachedInfo(); info != nil {
js.DeleteConsumer(context.Background(), "STATE", info.Name)
}
consumer_info = await jsm.add_consumer("STATE", ConsumerConfig(
ack_policy=AckPolicy.NONE,
deliver_policy=DeliverPolicy.ALL,
inactive_threshold=5_000_000_000,
filter_subject="state.python",
))
sub = await js.pull_subscribe_bind(consumer_info.name, "STATE")
try:
msgs = await sub.fetch(10, timeout=2)
for msg in msgs:
print(
f"state - consumer - replay {msg.metadata.sequence.stream} {msg.data.decode()}",
flush=True,
)
except Exception:
pass
await sub.unsubscribe()
try:
await jsm.delete_consumer("STATE", consumer_info.name)
except Exception:
pass
let stream = match js.get_stream("STATE").await {
Ok(s) => s,
Err(e) => {
println!("state - consumer - error {}", e);
continue;
}
};
let consumer = match stream
.create_consumer(jetstream::consumer::pull::Config {
deliver_policy: jetstream::consumer::DeliverPolicy::All,
ack_policy: jetstream::consumer::AckPolicy::None,
inactive_threshold: Duration::from_secs(5),
filter_subject: "state.rust".to_string(),
..Default::default()
})
.await
{
Ok(c) => c,
Err(e) => {
println!("state - consumer - error {}", e);
continue;
}
};
let mut msgs = match consumer
.fetch()
.max_messages(10)
.expires(Duration::from_secs(2))
.messages()
.await
{
Ok(m) => m,
Err(e) => {
println!("state - consumer - error {}", e);
continue;
}
};
let consumer_name = consumer.cached_info().name.clone();
let mut count = 0;
while let Some(Ok(msg)) = msgs.next().await {
let info = msg.info().unwrap();
println!(
"state - consumer - replay {} {}",
info.stream_sequence,
std::str::from_utf8(&msg.payload).unwrap()
);
count += 1;
}
println!("state - consumer - replay complete {}", count);
stream.delete_consumer(&consumer_name).await.ok();
$stream = $client->getApi()->getStream('STATE');
$consumerConfig = new \Basis\Nats\Consumer\Configuration('STATE');
$consumerConfig
->setSubjectFilter('state.php')
->setDeliverPolicy('all')
->setAckPolicy('none')
->setInactiveThreshold(5_000_000_000);
$consumer = $stream->createEphemeralConsumer($consumerConfig);
$count = 0;
$consumer->setExpires(2);
$consumer->setIterations(1);
$consumer->handle(function ($payload, $replyTo) use (&$count) {
$parts = explode('.', $replyTo);
$seq = $parts[5] ?? '?';
$count++;
echo "state - consumer - replay $seq $payload" . PHP_EOL;
if ($count >= 10) {
return true;
}
}, null, false);
echo "state - consumer - replay complete $count" . PHP_EOL;
try {
$consumer->delete();
} catch (\Exception $e) {
// Cleanup best-effort
}

This is a trivial example, but shows how state values can be updated, and the service can converge on a system state.

Event Sourcing

With a persistent stream of messages, we can perform Event Sourcing. This approach allows for stateless and scalable services.

Event Sourcing ensures that all changes to application state are stored as a sequence of events. Not just can we query these events, we can also use the event log to reconstruct past states, and as a foundation to automatically adjust the state to cope with retroactive changes.

Many new advanced patterns and approaches are enabled by using Event-Sourcing. We are able to replay the entire history of a service, perform auditing, edit history, monitor state, and even migrate event structures and schemas. And that’s not even the half of it.

The Demo

I’ve published the examples from these messaging paradigms as a Demo/Sandbox: djmetzle/nats-demo

Demo Screen Cap

Feel free to explore, operate, and “play with” these implementations at your leisure.

The Technology

NATS has some very interesting technical features included as well.

These extra features can empower further integration patterns across distributed systems and the web.

Let’s look at a few.

Data Storage

JetStream is a persistence mechanism.

Messages are stored within NATS, and message persistence is configurable, per stream.

As such, we can build stateful storage on top of this persistence.

NATS comes with a KV store, and Object Store (think S3) built into it “for free” based on this persistence.

Consider that most SQL DBs are implemented as materializations of a binary log. That symmetry is not accidental. A persisted stream of events is a robust way to build distributed databases.

Websockets

The NATS server can expose a websocket interface.

This allows for clients like the browser to connect to NATS.

We can use pubsub and stateful messaging directly from and to web applications.

Realtime streaming, anyone?

Edge/embedded

Websockets enable browser to NATS integration, but even deeper integrations are possible for services and IoT.

Since the NATS server is a small-ish Go binary, it can be embedded within Go applications.

Applications running on the Edge or IoT can connect into the server mesh directly, in process. This can simplify application integration, skipping service discovery and the network entirely. Instead, connecting an embedded server into a cluster can become a deployment concern.

The ability for Edge services to run a local server can allow for powerful integration patterns. For example, an edge worker might replay recent realtime messages in order to respond to client requests.

Superclusters

Since NATS forms a “full mesh” cluster by default, there could be scaling and security concerns for very large deployments.

To address this, NATS allows for creating “Super-clusters” by using Gateways.

Gateways enable connecting one or more clusters together into a full mesh; they allow the formation of superclusters from smaller clusters.

Gateways allow for a configurable bridge between deployment environments. Multi-cluster, multi-region, multi-environment, and multi-cloud are all possible scaling frontiers.

Functionality across clusters works exactly the same from the perspective of applications. Messages from one cloud can transparently be delivered to another, distant, deployment of a service.

Note that the use of Gateways prevents “full mesh” chatter between clusters. This simplifies service discovery and security, and the segmentation minimizes bandwidth usage between environments.

Services

Another feature enabled by the stateful streaming patterns provided by NATS is service infrastructure.

That refers to “capital-S” Service infrastructure.

First class Service abstractions are built into the protocol. Applications can register services into a cluster, including metadata and endpoints. Each endpoint can document its own inputs and outputs.

The server exposes a Service Catalog, as well as service and endpoint level statistics and info.

Cloud Hosted

Synadia, the main company behind the development of NATS, hosts NATS Cloud, among other services.

This is a globally deployed and highly available Super-cluster. That’s a great option for using it as a managed service, and even allows for connecting in your own private clusters as well.

Takeaways

NATS is proposed as a “dial tone” for applications. And the messaging paradigms naturally facilitate Event-Driven Architecture.

Arbitrary messaging patterns between application services can be created. What this means in practice is that NATS can be a “universal platform” for distributed messaging. By using embedded servers and websockets, this messaging mesh can extend all the way to the edge, IoT, embedded, and clients (browsers).

An example is WASMCloud, another CNCF project, which is built on NATS as a connection layer3.

We can always start with a single application using a single NATS instance to prototype. Any communication through the broker will automatically be distributed in a robust way. With the “full mesh” and “Super-cluster” scaling abilities, your application can scale from “a single instance on my laptop” practically “to infinity”, without substantial changes.

The next time you need to build an application, NATS can be an enabler for getting started in a scalable way. If every “system design” talk eventually leads to Kafka, maybe consider NATS.

Happy hacking!

Footnotes

  1. Historical note: this feature was called “NATS streaming” while it was being developed, but now that it is stable, they’ve called it JetStream.

  2. It does however use a binary protocol, like Kafka, instead of more traditional AMQP

  3. However, WASMCloud does seem to be changing their NATS integration in their roadmap

Share: