Quarkus Signals: Build In-Process Messaging Without a Broker
A hands-on NebulaTrack tutorial that shows publish, unicast work dispatch, typed request-reply, qualifiers, and tests in one small Quarkus app.
Quarkus has Signals now. That is easy to say, but it does not tell you why you would use them.
I was curious about what Signals replaces. Y’all probably ask the same question right away: why not CDI events?
If the answer were “CDI events, but async,” this would be a short article and not a very interesting feature. CDI already gives us multicast events and async observers. Signals matter because they pull three in-process messaging patterns into one API:
publish()for multicast notificationsend()for unicast work dispatchrequest()for typed request-reply
That is the gap. CDI events are still great for observer-style fan-out. Reactive Messaging is still the right tool when a broker, connectors, or backpressure are part of the problem. Vert.x EventBus can cover the same ground too, but with string addresses and a lower-level model. Signals sit in the middle: in-process, type-safe, async by default, and explicit about whether you are broadcasting, dispatching work, or asking another component for an answer.
Signals ship as a new experimental extension in Quarkus 3.36.0.
What we build
We build NebulaTrack, a small cloud cost monitor in a command-mode Quarkus app (no REST). It detects cost anomalies, fans out alerts, dispatches remediation work to one worker, and asks a pricing component for estimates.
When you finish, you have:
three signal patterns wired with
@Receivesreceiversqualifier lanes (
@Default,@Critical,@Any)metadata on emissions
programmatic receiver registration
@QuarkusTestproof for every behavior
Signals are not an HTTP feature. A command-mode app keeps that obvious.
Prerequisites
You need a current JDK, Maven or the Quarkus CLI, and about one ☕️.
JDK 25 (this project targets Java 25)
Maven 3.9+ or the Quarkus CLI
Familiarity with CDI injection
You can grab the full source code from my repository if you don’t want to follow along.
Project setup
Create the application without codestarts so we stay out of the web stack:
quarkus create app dev.quarkex:nebulatrack-signals \
--extension='quarkus-signals' \
--java=25 \
--no-codeUnder src/main/java, create the package dev.quarkex.nebulatrack and the subpackages used below (model, qualifier, service, support).
Add two test dependencies for AssertJ and Awaitility:
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.27.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>The only production extension we need is quarkus-signals. You do not need Vert.x on the classpath for blocking receivers. Vert.x only enters the story when you move into non-blocking execution or Uni-based receivers.
Experimental: treat API and semantics as subject to change until the extension graduates.
Verify
From the project root:
./mvnw testAn empty or placeholder test should compile and pass before we add behavior.
Domain records
Create src/main/java/dev/quarkex/nebulatrack/model/Severity.java:
package dev.quarkex.nebulatrack.model;
public enum Severity {
NORMAL,
CRITICAL
}Create CostAnomaly.java, RemediationRequest.java, EstimateRequest.java, and CostEstimate.java:
package dev.quarkex.nebulatrack.model;
public record CostAnomaly(String region, double hourlyDelta, Severity severity) {
}package dev.quarkex.nebulatrack.model;
public record RemediationRequest(String region, String action) {
}package dev.quarkex.nebulatrack.model;
public record EstimateRequest(String service, int units) {
}package dev.quarkex.nebulatrack.model;
import java.math.BigDecimal;
public record CostEstimate(String service, int units, BigDecimal monthlyCost) {
}For the “no matching receiver” test later, add UnmatchedEstimateRequest.java with no receivers:
package dev.quarkex.nebulatrack.model;
/**
* Signal type with no registered receivers — used to prove {@code request()} returns {@code null}.
*/
public record UnmatchedEstimateRequest(String service, int units) {
}Test ledger
Receivers run asynchronously, so tests need a place they can poll without guessing about timing. This bean is plain on purpose. Each receiver just records what happened:
package dev.quarkex.nebulatrack.support;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.enterprise.context.ApplicationScoped;
import dev.quarkex.nebulatrack.model.CostAnomaly;
@ApplicationScoped
public class InMemoryLedger {
private final CopyOnWriteArrayList<CostAnomaly> anomalyEvents = new CopyOnWriteArrayList<>();
private final AtomicInteger alertCount = new AtomicInteger();
private final AtomicInteger auditCount = new AtomicInteger();
private final AtomicInteger dashboardCount = new AtomicInteger();
private final AtomicInteger workerACount = new AtomicInteger();
private final AtomicInteger workerBCount = new AtomicInteger();
private final AtomicInteger defaultLaneCount = new AtomicInteger();
private final AtomicInteger criticalLaneCount = new AtomicInteger();
private final AtomicInteger catchAllCount = new AtomicInteger();
private final AtomicInteger pluginCount = new AtomicInteger();
private final CopyOnWriteArrayList<Map<String, Object>> metadataSnapshots = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<UUID> requestScopeIds = new CopyOnWriteArrayList<>();
public void recordAnomaly(CostAnomaly anomaly) {
anomalyEvents.add(anomaly);
}
public void recordAlert() {
alertCount.incrementAndGet();
}
public void recordAudit() {
auditCount.incrementAndGet();
}
public void recordDashboardRefresh() {
dashboardCount.incrementAndGet();
}
public void recordWorkerA() {
workerACount.incrementAndGet();
}
public void recordWorkerB() {
workerBCount.incrementAndGet();
}
public void recordDefaultLane() {
defaultLaneCount.incrementAndGet();
}
public void recordCriticalLane() {
criticalLaneCount.incrementAndGet();
}
public void recordCatchAll() {
catchAllCount.incrementAndGet();
}
public void recordPlugin() {
pluginCount.incrementAndGet();
}
public void recordMetadata(Map<String, Object> metadata) {
metadataSnapshots.add(Map.copyOf(metadata));
}
public void recordRequestScopeId(UUID id) {
requestScopeIds.add(id);
}
public List<CostAnomaly> anomalyEvents() {
return Collections.unmodifiableList(new ArrayList<>(anomalyEvents));
}
public int alertCount() {
return alertCount.get();
}
public int auditCount() {
return auditCount.get();
}
public int dashboardCount() {
return dashboardCount.get();
}
public int workerACount() {
return workerACount.get();
}
public int workerBCount() {
return workerBCount.get();
}
public int defaultLaneCount() {
return defaultLaneCount.get();
}
public int criticalLaneCount() {
return criticalLaneCount.get();
}
public int catchAllCount() {
return catchAllCount.get();
}
public int pluginCount() {
return pluginCount.get();
}
public List<Map<String, Object>> metadataSnapshots() {
return Collections.unmodifiableList(new ArrayList<>(metadataSnapshots));
}
public List<UUID> requestScopeIds() {
return Collections.unmodifiableList(new ArrayList<>(requestScopeIds));
}
public void reset() {
anomalyEvents.clear();
alertCount.set(0);
auditCount.set(0);
dashboardCount.set(0);
workerACount.set(0);
workerBCount.set(0);
defaultLaneCount.set(0);
criticalLaneCount.set(0);
catchAllCount.set(0);
pluginCount.set(0);
metadataSnapshots.clear();
requestScopeIds.clear();
}
}Pattern 1: Publish for multicast
CostMonitor emits an anomaly on the default lane:
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.model.Severity;
import io.quarkus.signals.Signal;
@ApplicationScoped
public class CostMonitor {
private final Signal<CostAnomaly> anomalySignal;
@Inject
public CostMonitor(Signal<CostAnomaly> anomalySignal) {
this.anomalySignal = anomalySignal;
}
public void detect() {
anomalySignal.publish(new CostAnomaly("us-east-1", 340.0, Severity.NORMAL));
}
}Add three receivers. AlertService and DashboardRefresher take the signal directly; AuditTrail already uses SignalContext, which pays off again when we attach metadata later:
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class AlertService {
private final InMemoryLedger ledger;
@Inject
public AlertService(InMemoryLedger ledger) {
this.ledger = ledger;
}
void onAnomaly(@Receives CostAnomaly anomaly) {
ledger.recordAlert();
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
import io.quarkus.signals.SignalContext;
@ApplicationScoped
public class AuditTrail {
private final InMemoryLedger ledger;
@Inject
public AuditTrail(InMemoryLedger ledger) {
this.ledger = ledger;
}
void onAnomaly(@Receives SignalContext<dev.quarkex.nebulatrack.model.CostAnomaly> ctx) {
ledger.recordAudit();
ledger.recordMetadata(ctx.metadata());
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class DashboardRefresher {
private final InMemoryLedger ledger;
@Inject
public DashboardRefresher(InMemoryLedger ledger) {
this.ledger = ledger;
}
void onAnomaly(@Receives CostAnomaly anomaly) {
ledger.recordDashboardRefresh();
}
}publish() delivers to every matching receiver, asynchronously. Each receiver gets its own CDI request context. That is a bigger shift than “observer on another thread.”
Verify
Add src/test/java/dev/quarkex/nebulatrack/NebulaTrackSignalsTest.java with @QuarkusTest, inject CostMonitor and InMemoryLedger, and implement publishNotifiesAllReceivers():
@Test
void publishNotifiesAllReceivers() {
costMonitor.detect();
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(ledger.alertCount()).isGreaterThanOrEqualTo(1);
assertThat(ledger.auditCount()).isGreaterThanOrEqualTo(1);
assertThat(ledger.dashboardCount()).isGreaterThanOrEqualTo(1);
});
}Before you run the test, predict the shape of the result. If publish() behaved like queue dispatch, only one counter would move. Here all three should move.
Run ./mvnw test -Dtest=NebulaTrackSignalsTest#publishNotifiesAllReceivers. All three counters should move.
Qualifier primer: @Default is not catch-all
Before more patterns, one CDI trap: a receiver with no qualifier is @Default, not “any signal of this type.”
void onAnomaly(@Receives CostAnomaly anomaly) {
// default lane only
}For every lane, use @Any:
void onAnyAnomaly(@Receives @Any CostAnomaly anomaly) {
// all qualifier lanes
}That difference matters enough to model directly:
package dev.quarkex.nebulatrack.qualifier;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Qualifier
@Retention(RUNTIME)
@Target({ FIELD, METHOD, PARAMETER, TYPE })
public @interface Critical {
final class Literal extends AnnotationLiteral<Critical> implements Critical {
public static final Literal INSTANCE = new Literal();
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class DefaultLaneReceiver {
private final InMemoryLedger ledger;
@Inject
public DefaultLaneReceiver(InMemoryLedger ledger) {
this.ledger = ledger;
}
void general(@Receives CostAnomaly anomaly) {
ledger.recordDefaultLane();
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.qualifier.Critical;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class CriticalLaneReceiver {
private final InMemoryLedger ledger;
@Inject
public CriticalLaneReceiver(InMemoryLedger ledger) {
this.ledger = ledger;
}
void critical(@Receives @Critical CostAnomaly anomaly) {
ledger.recordCriticalLane();
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class CatchAllAnomalyReceiver {
private final InMemoryLedger ledger;
@Inject
public CatchAllAnomalyReceiver(InMemoryLedger ledger) {
this.ledger = ledger;
}
void catchAll(@Receives @Any CostAnomaly anomaly) {
ledger.recordCatchAll();
}
}Emit critical anomalies from an emitter that injects both Signal<CostAnomaly> (default lane) and @Any Signal<CostAnomaly> (for select(Critical.Literal.INSTANCE)):
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.model.Severity;
import dev.quarkex.nebulatrack.qualifier.Critical;
import io.quarkus.signals.Signal;
@ApplicationScoped
public class CriticalAnomalyEmitter {
private final Signal<CostAnomaly> defaultAnomalySignal;
private final Signal<CostAnomaly> anyAnomalySignal;
@Inject
public CriticalAnomalyEmitter(
Signal<CostAnomaly> defaultAnomalySignal,
@Any Signal<CostAnomaly> anyAnomalySignal) {
this.defaultAnomalySignal = defaultAnomalySignal;
this.anyAnomalySignal = anyAnomalySignal;
}
public void publishCritical() {
anyAnomalySignal.select(Critical.Literal.INSTANCE)
.publish(new CostAnomaly("eu-west-1", 900.0, Severity.CRITICAL));
}
public void publishDefault() {
defaultAnomalySignal.publish(new CostAnomaly("us-west-2", 120.0, Severity.NORMAL));
}
}Publishing only on the @Any bean does not hit @Default receivers. That is the behavior we test next.
Verify
Before you run defaultReceiverIgnoresCriticalLane(), decide which counter should stay flat. If the answer is not defaultLaneCount, the CDI mental model is still in the driver’s seat.
defaultReceiverIgnoresCriticalLane() publishes only on the critical lane and asserts the default-lane counter stays at zero while the critical counter moves.
Pattern 2: Send for unicast work
Sometimes you want one worker, not fan-out. RemediationDispatcher uses send():
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.RemediationRequest;
import io.quarkus.signals.Signal;
@ApplicationScoped
public class RemediationDispatcher {
private final Signal<RemediationRequest> remediationSignal;
@Inject
public RemediationDispatcher(Signal<RemediationRequest> remediationSignal) {
this.remediationSignal = remediationSignal;
}
public void dispatch(String region, String action) {
remediationSignal.send(new RemediationRequest(region, action));
}
}The workers are deliberately boring:
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.RemediationRequest;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class RemediationWorkerA {
private final InMemoryLedger ledger;
@Inject
public RemediationWorkerA(InMemoryLedger ledger) {
this.ledger = ledger;
}
void handle(@Receives RemediationRequest request) {
ledger.recordWorkerA();
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.RemediationRequest;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class RemediationWorkerB {
private final InMemoryLedger ledger;
@Inject
public RemediationWorkerB(InMemoryLedger ledger) {
this.ledger = ledger;
}
void handle(@Receives RemediationRequest request) {
ledger.recordWorkerB();
}
}send() picks one receiver in round-robin order. This is the first pattern CDI events do not model cleanly.
Receivers default to blocking execution when they return a plain value. That keeps the “no Vert.x required” story honest for this walkthrough.
Verify
Before you run sendRoundRobinsBetweenWorkers(), predict the split. Six sends should not wake both workers six times. It should land close to 3/3.
sendRoundRobinsBetweenWorkers() sends six remediations and asserts worker A and B counts differ by at most one.
Pattern 3: Request for typed replies
BudgetService asks PricingEngine for a CostEstimate:
package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostEstimate;
import dev.quarkex.nebulatrack.model.EstimateRequest;
import io.quarkus.signals.Signal;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class BudgetService {
private final Signal<EstimateRequest> estimateSignal;
@Inject
public BudgetService(Signal<EstimateRequest> estimateSignal) {
this.estimateSignal = estimateSignal;
}
public CostEstimate estimateBlocking(String service, int units) {
return estimateSignal.request(new EstimateRequest(service, units), CostEstimate.class);
}
public Uni<CostEstimate> estimateReactive(String service, int units) {
return estimateSignal.reactive()
.request(new EstimateRequest(service, units), CostEstimate.class);
}
}Receiver:
package dev.quarkex.nebulatrack.service;
import java.math.BigDecimal;
import jakarta.enterprise.context.ApplicationScoped;
import dev.quarkex.nebulatrack.model.CostEstimate;
import dev.quarkex.nebulatrack.model.EstimateRequest;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class PricingEngine {
CostEstimate onEstimate(@Receives EstimateRequest request) {
BigDecimal monthlyCost = BigDecimal.valueOf(request.units()).multiply(BigDecimal.valueOf(0.12));
return new CostEstimate(request.service(), request.units(), monthlyCost);
}
}I use a blocking return type here so the app runs without Vert.x. A Uni-returning receiver defaults to non-blocking execution. When Vert.x is part of the runtime, that usually means the event loop.
Resolution also considers the response type. If nothing matches, request() returns null — worth testing explicitly, because publish() and send() stay silent in the same situation.
Verify
requestReturnsTypedEstimate()— blocking path, 500 units at 0.12 →60.00requestReturnsTypedEstimateReactive()—reactive().request(...)requestReturnsNullWhenNoReceiver()— injectSignal<UnmatchedEstimateRequest>with no receivers
Qualifiers in depth
The three lane receivers already show the rule. The last missing piece is the emitter side. Inject @Any Signal<CostAnomaly> when you need select() without carrying @Default:
anyAnomalySignal.select(Critical.Literal.INSTANCE)
.publish(new CostAnomaly("eu-west-1", 900.0, Severity.CRITICAL));Publish on the plain Signal<CostAnomaly> injection for the default lane only.
Verify
Before you run criticalLaneAndCatchAllReceiver(), count the expected catch-all hits first. It should see both emissions, not just the critical one.
criticalLaneAndCatchAllReceiver() publishes once on default and once on critical; default, critical, and catch-all counters all move.
Metadata with SignalContext
Attach metadata at emission time:
anomalySignal.withMetadata("traceId", traceId)
.withMetadata("tenant", tenant)
.publish(new CostAnomaly("us-east-1", 340.0, Severity.NORMAL));Read it in a receiver that takes SignalContext<CostAnomaly>:
void onAnomaly(@Receives SignalContext<CostAnomaly> ctx) {
String traceId = (String) ctx.metadata().get("traceId");
CostAnomaly anomaly = ctx.signal();
}Metadata is the honest built-in context story today. If you want automatic enrichment or interception, the extension already exposes SPI hooks such as SignalMetadataEnricher and ReceiverInterceptor. What it does not give you out of the box is automatic tracing or security propagation.
Verify
metadataVisibleInReceiver() after costMonitor.detectWithMetadata("abc-123", "acme").
Programmatic receivers
Runtime registration via Receivers:
package dev.quarkex.nebulatrack.service;
import java.util.function.Consumer;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.qualifier.Critical;
import dev.quarkex.nebulatrack.support.CostPlugin;
import io.quarkus.signals.Receivers;
import io.quarkus.signals.SignalContext;
@ApplicationScoped
public class PluginReceiverRegistrar {
private final Receivers receivers;
@Inject
public PluginReceiverRegistrar(Receivers receivers) {
this.receivers = receivers;
}
public Receivers.Registration register(CostPlugin plugin) {
return receivers.newReceiver(CostAnomaly.class)
.setQualifiers(Critical.Literal.INSTANCE)
.setExecutionModel(Receivers.ExecutionModel.BLOCKING)
.notify((Consumer<SignalContext<CostAnomaly>>) ctx -> plugin.process(ctx.signal()));
}
}The explicit Consumer cast avoids ambiguity between notify(Consumer) and notify(Function) overloads.
The guide documents runtime registration and unregistration, but it does not promise stronger concurrency semantics than that. For plugin-style infrastructure, treat registration as something you should test under load instead of assuming atomic visibility during concurrent emissions.
Verify
programmaticRegisterAndUnregister() — register, publish critical, assert delivery; unregister(), publish again, count unchanged.
Request context per receiver
Each receiver invocation activates a new CDI request context. That is easy to prove with two tiny classes:
package dev.quarkex.nebulatrack.support;
import java.util.UUID;
import jakarta.enterprise.context.RequestScoped;
@RequestScoped
public class InvocationTrace {
private final UUID id = UUID.randomUUID();
public UUID id() {
return id;
}
}package dev.quarkex.nebulatrack.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.model.CostAnomaly;
import dev.quarkex.nebulatrack.support.InMemoryLedger;
import dev.quarkex.nebulatrack.support.InvocationTrace;
import io.quarkus.signals.Receives;
@ApplicationScoped
public class RequestScopeProbe {
private final InMemoryLedger ledger;
private final InvocationTrace trace;
@Inject
public RequestScopeProbe(InMemoryLedger ledger, InvocationTrace trace) {
this.ledger = ledger;
this.trace = trace;
}
void onAnomaly(@Receives CostAnomaly anomaly) {
ledger.recordRequestScopeId(trace.id());
}
}Verify
receiversGetIsolatedRequestScope() — two publish() calls yield two different UUIDs in the ledger.
Optional command-mode entry point
NebulaTrackMain implements QuarkusApplication with @QuarkusMain and runs a short demo sequence:
package dev.quarkex.nebulatrack;
import jakarta.inject.Inject;
import dev.quarkex.nebulatrack.service.BudgetService;
import dev.quarkex.nebulatrack.service.CostMonitor;
import dev.quarkex.nebulatrack.service.RemediationDispatcher;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
@QuarkusMain
public class NebulaTrackMain implements QuarkusApplication {
private final CostMonitor costMonitor;
private final RemediationDispatcher remediationDispatcher;
private final BudgetService budgetService;
@Inject
public NebulaTrackMain(
CostMonitor costMonitor,
RemediationDispatcher remediationDispatcher,
BudgetService budgetService) {
this.costMonitor = costMonitor;
this.remediationDispatcher = remediationDispatcher;
this.budgetService = budgetService;
}
@Override
public int run(String... args) throws Exception {
costMonitor.detect();
remediationDispatcher.dispatch("us-east-1", "scale-down-idle-nodes");
var estimate = budgetService.estimateBlocking("s3", 500);
System.out.printf("NebulaTrack demo finished; sample estimate for %s: %s%n",
estimate.service(), estimate.monthlyCost());
return 0;
}
}Quarkus command-mode testing usually works best as a mix: @QuarkusMainTest for CLI behavior and @QuarkusTest for internals. This article stays on the internal side, so we inject beans directly.
Make it survive
Experimental extension — pin the Quarkus version in the article and in CI; expect API tweaks.
Silent fan-out — publish() and send() succeed when no receiver matches. Fine for notifications; dangerous if you assume someone handled the work.
Receiver failures — blocking publish() and send() log receiver failures instead of throwing them back to the caller. Blocking request() is different: it throws the receiver failure on the calling thread. Reactive emissions fail the returned Uni.
request() returns null — treat as part of the contract; test it.
Qualifier resolution — @Default vs @Any trips people coming from CDI observers. Emit from the injection point that matches the lane you intend.
Execution models — blocking receivers work without Vert.x. Uni receivers and NON_BLOCKING pull Vert.x into the execution story. Virtual-thread execution models need the right runtime support; in a minimal app, prefer blocking for receivers and programmatic hooks.
Programmatic registration — runtime registration is useful for plugins, but the public guide does not promise stronger visibility guarantees than register/unregister itself. If this matters under load, test it under load.
Metadata is not tracing — correlation IDs yes; automatic OpenTelemetry or security propagation no.
Prove it
From the module root:
./mvnw testAll tests in NebulaTrackSignalsTest should pass. That covers publish fan-out, send round-robin, blocking and reactive request, null request, qualifiers, metadata, programmatic register/unregister, and request-scope isolation.
When to reach for Signals
Use CDI events for classic observer multicast, especially when synchronous delivery or transactional observers matter.
Use Signals for in-process async coordination when you need publish, send, or typed request-reply between decoupled components.
Use Reactive Messaging when Kafka, AMQP, Pulsar, backpressure, or connectors are in scope.
Use Vert.x EventBus when you want address-based routing and the Vert.x programming model.
Signals are not “another event bus.” They are the layer between CDI convenience and broker-backed messaging.
That is the gap from the opening. CDI events still own classic observer fan-out. Signals give you one small in-process API for fan-out, unicast work, and typed request-reply without dragging in broker concerns.


