Architecting Multi-Agent AI Systems in Java with Quarkus and Langchain4j
How to orchestrate scalable AI workflows using message-driven agents and local LLMs with Kafka
In this tutorial, you’ll build a scalable, multi-agent AI system using Quarkus, Langchain4j, and Quarkus Messaging. We’ll walk through how to orchestrate multiple AI agents using a message-passing architecture without needing to spin up Kafka. Instead, we’ll use quarkus-messaging-kafka
extension to keep the feedback loop fast and local.
This pattern is a powerful approach for developers building AI-infused services that need to coordinate multiple specialized LLM prompts, tools, or agents while maintaining a clean separation of concerns.
What You'll Build
A Supervisor Service that orchestrates workflows by dispatching tasks to specialized AI agents
A set of AI Agent Workers that perform dedicated tasks (Research, Analysis, Summary)
An in-memory messaging backbone using Quarkus Messaging
A REST API to trigger workflows and collect results
You can grab the running example from my Github repository and start from there. But you are also more than welcome to just follow along: Let’s get started.
Project Setup
Generate your Quarkus project with the required extensions:
mvn io.quarkus.platform:quarkus-maven-plugin:create \
-DprojectGroupId=com.example \
-DprojectArtifactId=quarkus-supervisor-ai \
-Dextensions="quarkus-rest-jackson,quarkus-langchain4j-ollama, quarkus-messaging-kafka"
cd quarkus-supervisor-ai
This gives us REST support, AI integration via a local Ollama model, and a messaging layer using in-memory channels.
Define Message Types
Create AgentTask.java
and AgentResult.java
in com.example.messaging
. These represent the messages passed between components.
package com.example.messaging;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class AgentTask {
public String taskId;
public String agentType;
public String content;
public int priority;
public Map<String, Object> metadata = new HashMap<>();
public AgentTask() {
}
public AgentTask(String agentType, String content, int priority) {
this.taskId = UUID.randomUUID().toString();
this.agentType = agentType;
this.content = content;
this.priority = priority;
}
}
package com.example.messaging;
public class AgentResult {
public String taskId;
public String agentId;
public String agentType;
public String result;
public String status;
public long executionTime;
public String error;
public AgentResult() {
}
public AgentResult(String taskId, String agentId, String agentType, String result, String status) {
this.taskId = taskId;
this.agentId = agentId;
this.agentType = agentType;
this.result = result;
this.status = status;
}
}
Define AI Agent Interfaces
Each agent is a Langchain4j service that specializes in a different type of task. Create com/example/agents/ResearchAgent.java
package com.example.agents;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.UserMessage;
import io.quarkiverse.langchain4j.RegisterAiService;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RegisterAiService
public interface ResearchAgent {
@SystemMessage("You are a research specialist. Analyze and explain topics in depth.")
String processTask(@UserMessage String content);
default String getAgentType() {
return "RESEARCH";
}
}
Repeat for AnalysisAgent
package com.example.agents;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.UserMessage;
import io.quarkiverse.langchain4j.RegisterAiService;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RegisterAiService
public interface AnalysisAgent {
@SystemMessage("You are a data analysis expert. Your job is to analyze data, identify patterns, and provide insights. Focus on statistical analysis and trend identification.")
String processTask(@UserMessage String content);
default String getAgentType() {
return "ANALYSIS";
}
}
and SummaryAgent
:
package com.example.agents;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.UserMessage;
import io.quarkiverse.langchain4j.RegisterAiService;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RegisterAiService
public interface SummaryAgent {
@SystemMessage("You are a summarization expert. Your job is to create concise, accurate summaries of complex information. Focus on key points and clarity.")
String processTask(@UserMessage String content);
default String getAgentType() {
return "SUMMARY";
}
}
Create Agent Workers
Each worker listens to a shared input channel but only processes tasks matching its agentType
. It then publishes results to the result channel. Create the package com.example.agents.impl
.
BaseAgentWorker.java
This abstract class contains the common logic for all workers. It filters messages by agent type and handles the core processing loop.
package com.example.agents.impl;
import java.util.UUID;
import org.eclipse.microprofile.reactive.messaging.Message;
import com.example.messaging.AgentResult;
import com.example.messaging.AgentTask;
import io.quarkus.logging.Log;
public abstract class BaseAgentWorker<T> {
protected final String agentId;
protected T agent;
public BaseAgentWorker(T agent) {
this.agentId = UUID.randomUUID().toString();
this.agent = agent;
}
// Abstract methods to be implemented by subclasses
protected abstract String getAgentType();
protected abstract String processTaskWithAgent(String content);
// Method to be overridden in concrete classes with messaging annotations
protected Message<AgentResult> process(Message<AgentTask> message) {
AgentTask task = message.getPayload();
// Only process tasks for this agent type
if (!getAgentType().equals(task.agentType)) {
message.ack(); // Acknowledge the message to avoid reprocessing
return null; // Don't forward to the outgoing channel
}
Log.infof("Agent %s (%s) processing task %s", getAgentType(), agentId, task.taskId);
long startTime = System.currentTimeMillis();
try {
String result = processTaskWithAgent(task.content);
long executionTime = System.currentTimeMillis() - startTime;
AgentResult agentResult = new AgentResult(task.taskId, agentId, getAgentType(), result, "COMPLETED");
agentResult.executionTime = executionTime;
Log.infof("Agent %s completed task %s in %dms", getAgentType(), task.taskId, executionTime);
return Message.of(agentResult).withAck(message::ack);
} catch (Exception e) {
Log.errorf(e, "Agent %s failed to process task %s", getAgentType(), task.taskId);
AgentResult errorResult = new AgentResult(task.taskId, agentId, getAgentType(), null, "FAILED");
errorResult.error = e.getMessage();
return Message.of(errorResult).withAck(message::ack);
}
}
}
Then implement concrete workers, for example:
package com.example.agents.impl;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import com.example.agents.ResearchAgent;
import com.example.messaging.AgentResult;
import com.example.messaging.AgentTask;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class ResearchAgentWorker extends BaseAgentWorker<ResearchAgent> {
@Inject
ResearchAgent agent;
public ResearchAgentWorker() {
super(null); // Will be set via field injection
}
@Override
protected String getAgentType() {
return agent.getAgentType();
}
@Override
protected String processTaskWithAgent(String content) {
return agent.processTask(content);
}
@Incoming("agent-tasks-in")
@Outgoing("agent-results-out")
@Override
public Message<AgentResult> process(Message<AgentTask> message) {
return super.process(message);
}
}
Do the same for AnalysisAgentWorker
package com.example.agents.impl;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import com.example.agents.AnalysisAgent;
import com.example.messaging.AgentResult;
import com.example.messaging.AgentTask;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class AnalysisAgentWorker extends BaseAgentWorker<AnalysisAgent> {
@Inject
AnalysisAgent agent;
public AnalysisAgentWorker() {
super(null); // Will be set via field injection
}
@Override
protected String getAgentType() {
return agent.getAgentType();
}
@Override
protected String processTaskWithAgent(String content) {
return agent.processTask(content);
}
@Incoming("agent-tasks-in")
@Outgoing("agent-results-out")
@Override
public Message<AgentResult> process(Message<AgentTask> message) {
return super.process(message);
}
}
and SummaryAgentWorker
.
package com.example.agents.impl;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import com.example.agents.SummaryAgent;
import com.example.messaging.AgentResult;
import com.example.messaging.AgentTask;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class SummaryAgentWorker extends BaseAgentWorker<SummaryAgent> {
@Inject
SummaryAgent agent;
public SummaryAgentWorker() {
super(null); // Will be set via field injection
}
@Override
protected String getAgentType() {
return agent.getAgentType();
}
@Override
protected String processTaskWithAgent(String content) {
return agent.processTask(content);
}
@Incoming("agent-tasks-in")
@Outgoing("agent-results-out")
@Override
public Message<AgentResult> process(Message<AgentTask> message) {
return super.process(message);
}
}
Supervisor Logic
Now build the supervisor service to send tasks and collect results. Create the package com.example.supervisor
.
package com.example.supervisor;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import com.example.messaging.AgentResult;
import com.example.messaging.AgentTask;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class SupervisorService {
private static final Logger LOG = Logger.getLogger(SupervisorService.class);
@Inject
@Channel("agent-tasks-out")
Emitter<AgentTask> taskEmitter;
// In-memory store for tracking workflows. Key is workflowId.
private final Map<String, WorkflowExecution> activeWorkflows = new ConcurrentHashMap<>();
// This is a simplified workflow request.
public record WorkflowRequest(List<AgentTask> tasks) {
}
// Main method to start a workflow
public Uni<List<AgentResult>> executeWorkflow(WorkflowRequest request) {
String workflowId = UUID.randomUUID().toString();
LOG.infof("Starting workflow %s with %d tasks", workflowId, request.tasks().size());
WorkflowExecution execution = new WorkflowExecution(workflowId, request.tasks().size());
activeWorkflows.put(workflowId, execution);
// Dispatch all tasks to Kafka
request.tasks().forEach(task -> {
// Add workflowId to task metadata for tracking
task.metadata.put("workflowId", workflowId);
// Track this task in the execution
execution.trackTask(task.taskId);
// Send the task directly
taskEmitter.send(task);
LOG.infof("Dispatched task %s for agent %s", task.taskId, task.agentType);
});
// Return a Uni that will complete when results are ready or time out
return execution.getCompletionUni()
.ifNoItem().after(Duration.ofSeconds(60)).fail() // Timeout
.eventually(() -> activeWorkflows.remove(workflowId)); // Cleanup
}
// Kafka consumer for results
@Incoming("agent-results-in")
public void handleAgentResult(AgentResult result) {
if (result.taskId == null)
return; // Ignore invalid messages
// Find the workflow this result belongs to
activeWorkflows.values().stream()
.filter(exec -> exec.hasTask(result.taskId))
.findFirst()
.ifPresent(execution -> {
LOG.infof("Received result for task %s in workflow %s", result.taskId, execution.getWorkflowId());
execution.addResult(result);
});
}
}
We also need the helper class WorkflowExecution
to manage the state of each running workflow:
package com.example.supervisor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.example.messaging.AgentResult;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
public class WorkflowExecution {
private final String workflowId;
private final int expectedTaskCount;
private final AtomicInteger completedTasks = new AtomicInteger(0);
private final List<AgentResult> results = new ArrayList<>();
private final Map<String, Boolean> taskTracker = new ConcurrentHashMap<>();
private UniEmitter<? super List<AgentResult>> completionEmitter;
public WorkflowExecution(String workflowId, int expectedTaskCount) {
this.workflowId = workflowId;
this.expectedTaskCount = expectedTaskCount;
}
public String getWorkflowId() {
return workflowId;
}
public boolean hasTask(String taskId) {
return taskTracker.containsKey(taskId);
}
public void trackTask(String taskId) {
taskTracker.put(taskId, false);
}
public synchronized void addResult(AgentResult result) {
if (taskTracker.containsKey(result.taskId) && !taskTracker.get(result.taskId)) {
taskTracker.put(result.taskId, true);
results.add(result);
int completed = completedTasks.incrementAndGet();
if (completed >= expectedTaskCount && completionEmitter != null) {
completionEmitter.complete(new ArrayList<>(results));
}
}
}
public Uni<List<AgentResult>> getCompletionUni() {
return Uni.createFrom().emitter(emitter -> {
this.completionEmitter = emitter;
// Check if already completed
if (completedTasks.get() >= expectedTaskCount) {
emitter.complete(new ArrayList<>(results));
}
});
}
}
Note: The simplified
hasTask
method works for this example, but in a production system, you'd want to store the initial task IDs inWorkflowExecution
for precise matching.
Expose the API
Create a REST controller in com.example.rest
:
package com.example.rest;
import java.util.List;
import com.example.messaging.AgentTask;
import com.example.supervisor.SupervisorService;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
@Path("/supervisor")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class SupervisorResource {
@Inject
SupervisorService supervisorService;
// A simple request body for the /analyze endpoint
public record AnalysisRequest(String content) {
}
@POST
@Path("/analyze")
public Uni<Response> analyzeContent(AnalysisRequest request) {
var workflowRequest = new SupervisorService.WorkflowRequest(
List.of(
new AgentTask("RESEARCH", request.content(), 1),
new AgentTask("ANALYSIS", request.content(), 1),
new AgentTask("SUMMARY", request.content(), 1)));
return supervisorService.executeWorkflow(workflowRequest)
.map(results -> Response.ok(results).build());
}
@POST
@Path("/custom-workflow")
public Uni<Response> customWorkflow(SupervisorService.WorkflowRequest request) {
return supervisorService.executeWorkflow(request)
.map(results -> Response.ok(results).build());
}
}
Configure Langchain4j and Kafka
You can use Dev Services to auto-start Ollama, or configure it manually. Kafka Dev Services start a Redpanda instance locally and wire it into Quarkus without further configuration. You do need to define the channels though.
# application.properties
quarkus.langchain4j.ollama.chat-model.model-id=llama3.2:latest
quarkus.langchain4j.ollama.log-requests=true
quarkus.langchain4j.ollama.log-responses=true
quarkus.langchain4j.ollama.timeout=60s
# Outgoing channel for supervisor to dispatch tasks to agents
mp.messaging.outgoing.agent-tasks-out.connector=smallrye-kafka
mp.messaging.outgoing.agent-tasks-out.topic=agent-tasks
# Incoming channel for agents to receive tasks - BROADCAST to all agents
mp.messaging.incoming.agent-tasks-in.connector=smallrye-kafka
mp.messaging.incoming.agent-tasks-in.topic=agent-tasks
mp.messaging.incoming.agent-tasks-in.auto.offset.reset=earliest
mp.messaging.incoming.agent-tasks-in.broadcast=true
# Outgoing channel for agents to send results - MERGE from all agents
mp.messaging.outgoing.agent-results-out.connector=smallrye-kafka
mp.messaging.outgoing.agent-results-out.topic=agent-results
mp.messaging.outgoing.agent-results-out.merge=true
# Incoming channel for supervisor to receive results from agents
mp.messaging.incoming.agent-results-in.connector=smallrye-kafka
mp.messaging.incoming.agent-results-in.topic=agent-results
mp.messaging.incoming.agent-results-in.auto.offset.reset=earliest
Run and Test
Start the application:
./mvnw quarkus:dev
Then test with curl. I am using jq to format the results a little:
# Pretty print with colors
curl -s -X POST http://localhost:8080/supervisor/analyze \
-H "Content-Type: application/json" \
-d '{"content": "Explain the advantages of microservices architecture"}' \
| jq -C '.'
You'll get results from each AI agent, processed independently and assembled by the supervisor.
Show a summary of all agent results:
# Show a summary of all agent results
curl -s -X POST http://localhost:8080/supervisor/analyze \
-H "Content-Type: application/json" \
-d '{"content": "What are the benefits of using Apache Kafka for event streaming?"}' \
| jq 'map({agent: .agentType, status: .status, time: (.executionTime/1000 | tostring + "s")}) | sort_by(.agent)'
And the result:
[
{
"agent": "ANALYSIS",
"status": "COMPLETED",
"time": "8.497s"
},
{
"agent": "RESEARCH",
"status": "COMPLETED",
"time": "6.966s"
},
{
"agent": "SUMMARY",
"status": "COMPLETED",
"time": "10.843s"
}
]
Final Thoughts
This architecture gives you a strong foundation for building AI services that scale to Agent based systems. Using Kafka based orchestration and messaging allows you to use different agents and execute in depth research beyond a single LLM, Tool or Service..
You’ve now built a self-contained multi-agent AI orchestrator in Java with Quarkus, Langchain4j, and Kafka.
Next Steps: Where to Go from Here
You’ve built a functional, message-driven multi-agent AI system with Quarkus and Langchain4j. If you want to extend this further, think about:
Integrate OpenTelemetry with Quarkus to trace workflows end to end. This will help you monitor latency across agents, detect failures early, and improve system reliability.
Introduce more agent roles, such as validators, translators, or tool-using agents, to simulate a true AI task force. Use Langchain4j's tool calling to give agents access to search, code execution, or external APIs.
Use different models for specific agents.
Swap the message-passing pattern for a graph-based approach using LangGraph4j. It simplifies defining conditional flows, retries, and memory-sharing across agents.