Listening to the Fediverse with Java: A Real-Time Quarkus Experiment
How I built a streaming ingestion pipeline with SSE, batching, and PostgreSQL to track Java-related conversations.
I spend a lot of time on the Fediverse. Not as a passive consumer, but as someone who genuinely enjoys the slower, more technical, more opinionated conversations that happen there, especially around Java, Quarkus, and modern backend architecture.
Every now and then, I’d notice a post about Java performance, a discussion about frameworks, or a short rant about enterprise complexity. Sometimes it felt like Java was everywhere. Other times, it felt oddly quiet. And I realized something: I had no real intuition for what was actually being talked about, at scale, across the Fediverse.
So I did what I usually do when I’m curious: I built something.
Not a product. Not a platform. Just a small system that listens.
The idea was simple: connect to the public Fediverse stream, watch what flows by, and get a rough, data-backed feeling for which hashtags appear, how often Java shows up, and what kind of conversations cluster around it. No dashboards full of vanity metrics. No AI hype. Just raw signals.
This tutorial is the result of that curiosity.
Along the way, it turned into a great excuse to explore a few things I care about deeply as a Java developer: streaming data ingestion, backpressure, batching, normalized persistence models, and building something observable and boring enough to actually trust.
If you’ve ever wondered “What is really being discussed out there?” or if you simply enjoy building small, purposeful systems with Quarkus, this one’s for you.
Let’s build it.
What you will learn
By the end of this tutorial, you will understand how to:
Consume Server-Sent Events (SSE) using Vert.x HTTP client
Design a buffered ingestion pipeline to handle high-volume streams
Implement a normalized database schema with Hibernate Panache
Batch database inserts safely with duplicate handling
Expose monitoring endpoints using SmallRye Health
Structure a Quarkus application for streaming workloads
Architecture overview
At a high level, the system works like this:
Quarkus connects to https://fedi.buzz/api/v1/streaming/public
Incoming SSE update events contain Mastodon status JSON
Each status is inspected for configured hashtags (or all hashtags if none configured)
Matching statuses are buffered in memory with their hashtag mentions
A scheduled task flushes buffered events into PostgreSQL in batches
A dashboard UI exposes stored mentions and statistics
A health endpoint reports stream and database status
This separation is important: stream ingestion must never block on database writes.
Prerequisites
Java 17+ (Java 21 recommended)
Maven
A container runtime (Podman or Docker) for Quarkus Dev Services
Create the Quarkus project
We start with a minimal Quarkus application and explicitly choose extensions that match our needs. You can check the full tutorial on my Github repository.
quarkus create app com.example:hashtag-monitor \
--extension='rest-jackson,rest-qute,jdbc-postgresql,hibernate-orm-panache,scheduler,smallrye-health' \
--no-code
cd hashtag-monitorWhy these extensions?
REST Jackson – JSON serialization for API responses
REST Qute – templating engine for the dashboard UI
Hibernate ORM Panache – simple, readable persistence
Scheduler – batch flushing from buffer to database
SmallRye Health – operational visibility
JDBC PostgreSQL – backed by Dev Services in dev mode
Configuration with Dev Services
Modify src/main/resources/application.properties:
# --- Database (Dev Services) ---
quarkus.datasource.db-kind=postgresql
quarkus.hibernate-orm.schema-management.strategy=drop-and-create
quarkus.hibernate-orm.log.sql=false
# --- Configure Hashtags. If none set, all hashtags will be collected. ---
app.hashtags=
# SSE endpoint
fedibuzz.streaming.url=https://fedi.buzz/api/v1/streaming/public
# Stream control
app.stream.auto-start=true
# Buffering
app.buffer.max-size=5000
app.buffer.flush-interval-seconds=5s
# Logging
quarkus.log.level=INFO
quarkus.log.category."com.example".level=INFOWhy no JDBC URL?
In dev and test mode, Quarkus Dev Services automatically:
Starts PostgreSQL
Creates credentials
Injects the datasource configuration
This removes all local environment friction.
Flexible hashtag filtering: Leave app.hashtags empty to collect all hashtags from the stream, or specify a comma-separated list to filter specific ones.
Domain model: normalized schema
The implementation uses a normalized database schema with two entities to avoid data duplication.
Status entity
Create src/main/java/com/example/entity/Status.java:
package com.example.entity;
import java.time.Instant;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Index;
import jakarta.persistence.Table;
import jakarta.persistence.UniqueConstraint;
@Entity
@Table(name = "statuses", uniqueConstraints = @UniqueConstraint(columnNames = "statusId"), indexes = {
@Index(columnList = "instance"),
@Index(columnList = "createdAt"),
@Index(columnList = "accountUsername")
})
public class Status extends PanacheEntity {
@Column(nullable = false, unique = true, columnDefinition = "VARCHAR(1000)")
public String statusId;
@Column(nullable = false, columnDefinition = "VARCHAR(500)")
public String accountUsername;
@Column(nullable = false, columnDefinition = "VARCHAR(500)")
public String instance;
@Column(columnDefinition = "TEXT")
public String content;
@Column(columnDefinition = "TEXT")
public String url;
@Column(nullable = false)
public Instant createdAt;
@Column(nullable = false)
public Instant recordedAt;
}HashtagMention entity
Create src/main/java/com/example/entity/HashtagMention.java:
package com.example.entity;
import java.time.Instant;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.Index;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.Table;
import jakarta.persistence.UniqueConstraint;
@Entity
@Table(name = "hashtag_mentions",
uniqueConstraints = @UniqueConstraint(columnNames = {"status_id", "hashtag"}),
indexes = {
@Index(columnList = "hashtag"),
@Index(columnList = "recordedAt"),
@Index(columnList = "status_id")
})
public class HashtagMention extends PanacheEntity {
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "status_id", nullable = false)
public Status status;
@Column(nullable = false)
public String hashtag;
@Column(nullable = false)
public Instant recordedAt;
}Normalized schema: Status data is stored once, with multiple hashtag mentions referencing it
Composite unique constraint: Prevents duplicate hashtag mentions for the same status
Lazy loading: Status is loaded only when needed
Flexible column definitions:
TEXTcolumns handle large content,VARCHARwith explicit lengths for indexed fields
Modeling the Mastodon payload
The SSE stream sends JSON payloads compatible with Mastodon’s API.
Create src/main/java/com/example/model/MastodonStatus.java:
package com.example.model;
import java.time.Instant;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public record MastodonStatus(
String id,
String url,
@JsonProperty("created_at") Instant createdAt,
Account account,
String content,
List<Tag> tags) {
public record Account(String username, String acct) {
}
public record Tag(String name) {
}
public String instance() {
return acctContainsInstance()
? account.acct().split("@", 2)[1]
: "unknown";
}
private boolean acctContainsInstance() {
return account != null && account.acct() != null && account.acct().contains("@");
}
}We keep the model minimal: only fields we actually use.
SSE streaming with Vert.x
The implementation uses Vert.x HTTP client for SSE streaming, providing fine-grained control over connection handling.
Custom SSE parser
Create src/main/java/com/example/service/stream/SseStreamParser.java:
package com.example.service.stream;
import java.util.function.Consumer;
import io.vertx.core.buffer.Buffer;
public class SseStreamParser {
private final StringBuilder currentLine = new StringBuilder();
private String eventType;
private String eventData;
public void parseChunk(Buffer buffer, Consumer<String> onUpdateEvent) {
String chunk = buffer.toString();
for (char c : chunk.toCharArray()) {
if (c == '\n') {
processLine(currentLine.toString().trim(), onUpdateEvent);
currentLine.setLength(0);
} else if (c != '\r') {
currentLine.append(c);
}
}
}
private void processLine(String line, Consumer<String> onUpdateEvent) {
if (line.isEmpty()) {
if (eventData != null && "update".equals(eventType)) {
onUpdateEvent.accept(eventData);
}
eventType = null;
eventData = null;
} else if (line.startsWith("event:")) {
eventType = line.substring(6).trim();
} else if (line.startsWith("data:")) {
eventData = line.substring(5).trim();
}
}
}This parser handles the SSE protocol format, extracting event types and data from the stream.
Service layer architecture
The application separates concerns into focused service classes for better maintainability.
Status matching service
Create src/main/java/com/example/service/matching/StatusMatcher.java:
package com.example.service.matching;
import java.util.List;
import java.util.Optional;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import com.example.model.MastodonStatus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class StatusMatcher {
private final Optional<List<String>> configuredHashtags;
@Inject
public StatusMatcher(@ConfigProperty(name = "app.hashtags", defaultValue = "")
Optional<List<String>> hashtagsConfig) {
this.configuredHashtags = hashtagsConfig
.filter(list -> list != null && !list.isEmpty())
.map(list -> list.stream()
.map(String::toLowerCase)
.filter(s -> s != null && !s.isBlank())
.toList())
.filter(list -> !list.isEmpty());
}
public boolean matches(MastodonStatus status) {
if (status.tags() == null || status.tags().isEmpty()) {
return false;
}
// If no hashtags configured, match any post with hashtags
if (configuredHashtags.isEmpty()) {
return true;
}
// Otherwise, only match configured hashtags
return status.tags().stream()
.map(t -> t.name().toLowerCase())
.anyMatch(h -> configuredHashtags.get().contains(h));
}
public boolean isFilteringEnabled() {
return configuredHashtags.isPresent();
}
public List<String> getConfiguredHashtags() {
return configuredHashtags.orElse(List.of());
}
}Status conversion service
Create src/main/java/com/example/service/matching/StatusConverter.java:
package com.example.service.matching;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import com.example.entity.HashtagMention;
import com.example.entity.Status;
import com.example.model.MastodonStatus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class StatusConverter {
@Inject
StatusMatcher statusMatcher;
public ConversionResult toEntities(MastodonStatus mastodonStatus) {
List<String> hashtagsToProcess;
if (statusMatcher.isFilteringEnabled()) {
List<String> configuredHashtags = statusMatcher.getConfiguredHashtags();
hashtagsToProcess = mastodonStatus.tags().stream()
.map(t -> t.name().toLowerCase())
.filter(configuredHashtags::contains)
.collect(Collectors.toList());
} else {
hashtagsToProcess = mastodonStatus.tags().stream()
.map(t -> t.name().toLowerCase())
.collect(Collectors.toList());
}
Status status = createStatus(mastodonStatus);
List<HashtagMention> mentions = hashtagsToProcess.stream()
.filter(hashtag -> hashtag != null && !hashtag.isBlank())
.map(hashtag -> createMention(status, hashtag))
.collect(Collectors.toList());
return new ConversionResult(status, mentions);
}
private Status createStatus(MastodonStatus mastodonStatus) {
Status status = new Status();
status.statusId = mastodonStatus.id();
status.accountUsername = mastodonStatus.account() != null
? mastodonStatus.account().username()
: "unknown";
status.instance = mastodonStatus.instance();
status.content = mastodonStatus.content();
status.url = mastodonStatus.url();
status.createdAt = mastodonStatus.createdAt() != null
? mastodonStatus.createdAt()
: Instant.now();
status.recordedAt = Instant.now();
return status;
}
private HashtagMention createMention(Status status, String hashtag) {
HashtagMention mention = new HashtagMention();
mention.status = status;
mention.hashtag = hashtag;
mention.recordedAt = Instant.now();
return mention;
}
public record ConversionResult(Status status, List<HashtagMention> mentions) {
}
}Buffering service
Create src/main/java/com/example/service/buffer/MentionBuffer.java:
package com.example.service.buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import com.example.entity.HashtagMention;
import com.example.entity.Status;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MentionBuffer {
public static class BufferedMention {
public final Status status;
public final HashtagMention mention;
public BufferedMention(Status status, HashtagMention mention) {
this.status = status;
this.mention = mention;
}
}
private final Queue<BufferedMention> buffer = new ConcurrentLinkedQueue<>();
private final AtomicInteger droppedCount = new AtomicInteger(0);
@ConfigProperty(name = "app.buffer.max-size")
int maxSize;
public boolean add(Status status, HashtagMention mention) {
if (buffer.size() >= maxSize) {
droppedCount.incrementAndGet();
return false;
}
buffer.offer(new BufferedMention(status, mention));
return true;
}
public List<BufferedMention> drain() {
List<BufferedMention> batch = new ArrayList<>();
BufferedMention m;
while ((m = buffer.poll()) != null) {
batch.add(m);
}
return batch;
}
public int size() {
return buffer.size();
}
}The buffer stores both status and mention together, preventing data loss during persistence.
Batch persistence with duplicate handling
Create src/main/java/com/example/service/persistence/BatchPersistenceService.java (simplified version):
package com.example.service.persistence;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.example.entity.HashtagMention;
import com.example.entity.Status;
import com.example.service.buffer.MentionBuffer;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class BatchPersistenceService {
@Inject
MentionBuffer buffer;
@Scheduled(every = "${app.buffer.flush-interval-seconds}")
@Transactional
void flush() {
List<MentionBuffer.BufferedMention> batch = buffer.drain();
if (batch.isEmpty()) {
return;
}
// Step 1: Persist or find existing Status entities
Map<String, Status> statusMap = persistOrFindStatuses(batch);
// Step 2: Check for existing mentions
Set<String> existingCombinations = findExistingMentions(batch, statusMap);
// Step 3: Create new mentions
List<HashtagMention> toPersist = batch.stream()
.filter(bm -> {
String key = bm.status.statusId + "|" + bm.mention.hashtag;
return !existingCombinations.contains(key);
})
.map(bm -> {
HashtagMention mention = new HashtagMention();
mention.status = statusMap.get(bm.status.statusId);
mention.hashtag = bm.mention.hashtag;
mention.recordedAt = bm.mention.recordedAt;
return mention;
})
.collect(Collectors.toList());
if (!toPersist.isEmpty()) {
HashtagMention.persist(toPersist);
}
}
private Map<String, Status> persistOrFindStatuses(List<MentionBuffer.BufferedMention> batch) {
Set<String> statusIds = batch.stream()
.map(bm -> bm.status.statusId)
.collect(Collectors.toSet());
Map<String, Status> statusMap = new HashMap<>();
List<Status> existing = Status.find("statusId in ?1", statusIds).list();
existing.forEach(s -> statusMap.put(s.statusId, s));
List<Status> newStatuses = batch.stream()
.map(bm -> bm.status)
.filter(s -> !statusMap.containsKey(s.statusId))
.distinct()
.collect(Collectors.toList());
if (!newStatuses.isEmpty()) {
Status.persist(newStatuses);
newStatuses.forEach(s -> statusMap.put(s.statusId, s));
}
return statusMap;
}
private Set<String> findExistingMentions(List<MentionBuffer.BufferedMention> batch,
Map<String, Status> statusMap) {
List<Long> statusEntityIds = batch.stream()
.map(bm -> statusMap.get(bm.status.statusId))
.filter(s -> s != null && s.id != null)
.map(s -> s.id)
.distinct()
.collect(Collectors.toList());
if (statusEntityIds.isEmpty()) {
return Set.of();
}
return HashtagMention.<HashtagMention>find("status.id in ?1", statusEntityIds)
.list()
.stream()
.map(m -> m.status.statusId + "|" + m.hashtag)
.collect(Collectors.toSet());
}
}This service handles the complex task of persisting normalized data while avoiding duplicates.
Stream service with Vert.x
Create src/main/java/com/example/service/stream/FediBuzzStreamService.java (simplified version):
package com.example.service.stream;
import com.example.model.MastodonStatus;
import com.example.service.buffer.MentionBuffer;
import com.example.service.matching.StatusConverter;
import com.example.service.matching.StatusMatcher;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class FediBuzzStreamService {
@Inject
Vertx vertx;
@Inject
ObjectMapper objectMapper;
@Inject
MentionBuffer buffer;
@Inject
StatusMatcher statusMatcher;
@Inject
StatusConverter statusConverter;
private volatile boolean running;
public void start() {
running = true;
HttpClientOptions options = new HttpClientOptions()
.setSsl(true)
.setTrustAll(true);
HttpClient httpClient = vertx.createHttpClient(options);
httpClient
.request(io.vertx.core.http.HttpMethod.GET, 443, "fedi.buzz",
"/api/v1/streaming/public")
.onSuccess(request -> {
request.putHeader("Accept", "text/event-stream")
.response()
.onSuccess(response -> {
SseStreamParser parser = new SseStreamParser();
response.handler(buffer -> {
parser.parseChunk(buffer, eventData -> {
vertx.executeBlocking(() -> {
processEvent(eventData);
return null;
});
});
});
});
request.end();
});
}
private void processEvent(String eventData) {
if (!running) return;
try {
MastodonStatus status = objectMapper.readValue(eventData, MastodonStatus.class);
if (statusMatcher.matches(status)) {
var result = statusConverter.toEntities(status);
for (var mention : result.mentions()) {
buffer.add(result.status(), mention);
}
}
} catch (Exception e) {
// Log and continue
}
}
}Application lifecycle
Auto-start on boot
Create src/main/java/com/example/lifecycle/StreamStartup.java:
package com.example.lifecycle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import com.example.service.stream.FediBuzzStreamService;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
@ApplicationScoped
public class StreamStartup {
@Inject
FediBuzzStreamService streamService;
@Inject
Vertx vertx;
@ConfigProperty(name = "app.stream.auto-start", defaultValue = "true")
boolean autoStart;
void onStart(@Observes StartupEvent event) {
vertx.exceptionHandler(throwable -> {
// Handle SSE processing exceptions gracefully
});
if (autoStart) {
streamService.start();
}
}
}Health checks
Create src/main/java/com/example/health/StreamHealthCheck.java:
package com.example.health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Readiness;
import com.example.entity.HashtagMention;
import com.example.service.buffer.MentionBuffer;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@Readiness
@ApplicationScoped
public class StreamHealthCheck implements HealthCheck {
@Inject
MentionBuffer buffer;
@Override
public HealthCheckResponse call() {
int bufferSize = buffer.size();
long totalRecords = HashtagMention.count();
return HealthCheckResponse.named("fedi-stream")
.withData("bufferSize", bufferSize)
.withData("totalRecords", totalRecords)
.up()
.build();
}
}Access the health endpoint:
curl http://localhost:8080/q/health/readyDashboard UI
The application includes a live dashboard showing hashtag statistics.
Create src/main/java/com/example/web/HashtagDashboardResource.java:
package com.example.web;
import java.util.List;
import com.example.entity.HashtagMention;
import io.quarkus.qute.Template;
import io.quarkus.qute.TemplateInstance;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/dashboard")
public class HashtagDashboardResource {
@Inject
Template dashboard;
@GET
@Produces(MediaType.TEXT_HTML)
public TemplateInstance dashboard() {
List<HashtagStats> stats = getHashtagStats();
long totalMentions = stats.stream().mapToLong(HashtagStats::count).sum();
long totalHashtags = getTotalHashtagCount();
return dashboard.data("stats", stats)
.data("totalHashtags", totalHashtags)
.data("totalMentions", totalMentions);
}
private List<HashtagStats> getHashtagStats() {
List<Object[]> results = HashtagMention.getEntityManager()
.createQuery("SELECT h.hashtag, COUNT(h) FROM HashtagMention h " +
"GROUP BY h.hashtag ORDER BY COUNT(h) DESC", Object[].class)
.setMaxResults(50)
.getResultList();
return results.stream()
.map(row -> new HashtagStats((String) row[0], ((Number) row[1]).longValue()))
.toList();
}
private long getTotalHashtagCount() {
return HashtagMention.getEntityManager()
.createQuery("SELECT COUNT(DISTINCT h.hashtag) FROM HashtagMention h", Long.class)
.getSingleResult();
}
public record HashtagStats(String hashtag, long count) {
}
}The dashboard provides real-time visibility into collected hashtags.
Run the application
Start the application in Dev Mode:
quarkus devWhat happens:
PostgreSQL starts automatically via Dev Services
SSE stream connects to fedi.buzz
Mentions are ingested and buffered in memory
Batch persistence flushes to database every 5 seconds
Dashboard is available at http://localhost:8080/dashboard
Health check at http://localhost:8080/q/health/ready
Why this architecture works
Normalized schema: Avoids data duplication for posts with multiple hashtags
Vert.x streaming: Provides fine-grained control over SSE connection handling
Separated concerns: Matching, conversion, buffering, and persistence are independent
Duplicate handling: Composite unique constraints and pre-check queries prevent errors
Flexible filtering: Collect all hashtags or filter specific ones
Non-blocking ingestion: Stream processing never waits for database writes
Batch persistence: Reduces transaction overhead and handles duplicates gracefully
Dev Services: Zero-configuration PostgreSQL for development
This pattern is broadly applicable to event-driven ingestion pipelines requiring high throughput and data normalization.
What you could add
Add instance filtering: Allow/deny lists for Fediverse instances
Persist raw JSON: Store original payloads for reprocessing
Add sentiment analysis: Analyze post content
Export aggregated stats: Generate reports and analytics
Enhance dashboard: Add real-time charts and filtering
Add REST API: Expose data via JSON endpoints
Implement backpressure: Handle extreme traffic spikes
Add metrics: Integrate with Prometheus/Grafana
What this was really about
At the end of the day, this project isn’t really about hashtags, dashboards, or even the Fediverse itself. It’s about building small systems that help us understand the world we’re part of as developers. Java has been around for a long time, but the conversations around it keep evolving, and sometimes the only way to get a clear picture is to stop guessing and start listening. Quarkus makes this kind of exploratory, data-driven work pleasantly straightforward, which lowers the barrier to turning curiosity into something concrete. If this tutorial nudges you to instrument your own questions, observe real systems in motion, or build something just to see what’s going on out there, then it has done its job.



