Real-Time Log Streaming in Java: Compress SSE Logs with Quarkus and LZ4
Build a production-style pipeline with batching, live metrics, and bandwidth savings you can measure
Most developers think log compression is a storage problem. Logs get written to disk, rotated, compressed later, and shipped somewhere else. That model works when traffic is low and latency does not matter.
This mental model breaks as soon as logs become part of a real-time system. Centralized logging, live debugging, security monitoring, and streaming analytics all depend on moving logs continuously, not hours later. At that point, bandwidth becomes the bottleneck, not disk.
In production, uncompressed log streams fail in very boring ways. Network links saturate. Log pipelines fall behind. Buffers grow. Eventually something drops data. Nobody notices until the incident review, when the most important logs are missing.
The fix is not “more compression later.” The fix is compressing logs as they are produced, without slowing down the application. That means fast algorithms, streaming APIs, and backpressure-aware systems.
That is exactly what we build here.
Prerequisites
You need a working Java and Quarkus setup. This tutorial assumes you already know basic REST endpoints.
Java 17 or newer
Quarkus CLI or Maven
Basic understanding of reactive streams
Project Setup
Create the project or start from the ready made project on my Github repository.
quarkus create app com.example:lz4-streaming \
--extension=rest-jackson,smallrye-health,micrometer-registry-prometheus
cd lz4-streamingAdd the LZ4 dependency.
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>Start dev mode once to verify everything works:
quarkus devLog Generation
Before compression, we need data. This service simulates a busy microservice producing structured logs.
Log Model
src/main/java/com/example/model/LogEntry.java
package com.example.model;
import java.time.Instant;
import com.fasterxml.jackson.annotation.JsonFormat;
public record LogEntry(
@JsonFormat(shape = JsonFormat.Shape.STRING) Instant timestamp,
String level,
String service,
String traceId,
String message,
Integer responseTimeMs,
String userId) {
public static LogEntry generateRandom() {
return new LogEntry(
Instant.now(),
randomLevel(),
"order-service",
generateTraceId(),
randomMessage(),
randomResponseTime(),
randomUserId());
}
private static String randomLevel() {
double r = Math.random();
if (r < 0.7)
return "INFO";
if (r < 0.9)
return "WARN";
if (r < 0.98)
return "ERROR";
return "DEBUG";
}
private static String generateTraceId() {
return Long.toHexString(Double.doubleToLongBits(Math.random()));
}
private static String randomMessage() {
String[] messages = {
"Order processed",
"Cache miss",
"Retrying payment",
"User authenticated",
"Circuit breaker opened",
"Background job started"
};
return messages[(int) (Math.random() * messages.length)];
}
private static Integer randomResponseTime() {
return (int) (Math.random() * 800);
}
private static String randomUserId() {
return "user-" + (int) (Math.random() * 1000);
}
}This is intentionally repetitive data. Compression needs redundancy to work well. Real logs behave the same way.
Log Generator Service
src/main/java/com/example/service/LogGeneratorService.java
package com.example.service;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import com.example.model.LogEntry;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class LogGeneratorService {
@Inject
ObjectMapper mapper;
private final AtomicLong totalLogs = new AtomicLong();
public Multi<String> stream(int logsPerSecond) {
long interval = 1000L / logsPerSecond;
return Multi.createFrom().ticks()
.every(Duration.ofMillis(interval))
.map(tick -> {
totalLogs.incrementAndGet();
try {
return mapper.writeValueAsString(LogEntry.generateRandom()) + "\n";
} catch (Exception e) {
return "";
}
})
.filter(s -> !s.isEmpty());
}
public String batch(int count) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < count; i++) {
try {
sb.append(mapper.writeValueAsString(LogEntry.generateRandom())).append("\n");
} catch (Exception ignored) {
}
}
totalLogs.addAndGet(count);
return sb.toString();
}
public long totalLogs() {
return totalLogs.get();
}
public void reset() {
totalLogs.set(0);
}
}This service has two modes: streaming and batching. Streaming shows latency. Batching shows compression efficiency.
LZ4 Compression Service
This is the core of the system.
Compression Logic
src/main/java/com/example/service/CompressionService.java
package com.example.service;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.enterprise.context.ApplicationScoped;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
@ApplicationScoped
public class CompressionService {
private final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
private final AtomicLong rawBytes = new AtomicLong();
private final AtomicLong compressedBytes = new AtomicLong();
private final AtomicLong operations = new AtomicLong();
public byte[] compressWithHeader(String data) {
byte[] input = data.getBytes(StandardCharsets.UTF_8);
rawBytes.addAndGet(input.length);
byte[] compressed = new byte[compressor.maxCompressedLength(input.length)];
int size = compressor.compress(input, 0, input.length, compressed, 0);
byte[] result = new byte[size + 4];
result[0] = (byte) (input.length >>> 24);
result[1] = (byte) (input.length >>> 16);
result[2] = (byte) (input.length >>> 8);
result[3] = (byte) input.length;
System.arraycopy(compressed, 0, result, 4, size);
compressedBytes.addAndGet(result.length);
operations.incrementAndGet();
return result;
}
public double ratio() {
long raw = rawBytes.get();
return raw == 0 ? 1.0 : (double) raw / compressedBytes.get();
}
public void reset() {
rawBytes.set(0);
compressedBytes.set(0);
operations.set(0);
}
public long rawBytes() {
return rawBytes.get();
}
public long compressedBytes() {
return compressedBytes.get();
}
public long operations() {
return operations.get();
}
}The length header is critical. Without it, decompression becomes unsafe. In production, missing this detail causes corrupted streams and memory explosions.
Real-Time Streaming API
We expose both raw and compressed streams over SSE.
Streaming Resource
src/main/java/com/example/resource/LogStreamResource.java
package com.example.resource;
import java.time.Duration;
import java.util.Base64;
import org.jboss.resteasy.reactive.RestStreamElementType;
import com.example.service.CompressionService;
import com.example.service.LogGeneratorService;
import io.smallrye.mutiny.Multi;
import jakarta.inject.Inject;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public class LogStreamResource {
@Inject
LogGeneratorService generator;
@Inject
CompressionService compression;
@GET
@Path("/raw")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> raw(@QueryParam("rate") @DefaultValue("10") int rate) {
return generator.stream(rate);
}
@GET
@Path("/compressed")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> compressed(
@QueryParam("rate") @DefaultValue("10") int rate,
@QueryParam("batch") @DefaultValue("50") int batch) {
long interval = (batch * 1000L) / rate;
return Multi.createFrom().ticks()
.every(Duration.ofMillis(interval))
.map(tick -> {
String logs = generator.batch(batch);
byte[] data = compression.compressWithHeader(logs);
return Base64.getEncoder().encodeToString(data);
});
}
}Batching is the difference between a 1.3× ratio and a 2.5× ratio. In production, that difference pays for entire clusters.
Metrics and Verification
Metrics Resource
src/main/java/com/example/resource/MetricsResource.java
package com.example.resource;
import com.example.service.CompressionService;
import com.example.service.LogGeneratorService;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/metrics")
@Produces(MediaType.APPLICATION_JSON)
public class MetricsResource {
@Inject
CompressionService compression;
@Inject
LogGeneratorService generator;
@GET
public Snapshot snapshot() {
return new Snapshot(
generator.totalLogs(),
compression.operations(),
compression.rawBytes(),
compression.compressedBytes(),
compression.ratio());
}
@POST
@Path("/reset")
public void reset() {
compression.reset();
generator.reset();
}
public record Snapshot(
long logs,
long operations,
long rawBytes,
long compressedBytes,
double ratio) {
}
}Manual Verification
Raw stream:
curl 'http://localhost:8080/stream/raw?rate=20'Compressed stream:
curl 'http://localhost:8080/stream/compressed?rate=20&batch=100'Metrics:
curl http://localhost:8080/metrics | jqResults:
{
"logs": 345,
"operations": 2,
"rawBytes": 36904,
"compressedBytes": 7713,
"ratio": 4.784649293400752
}
You should see compression ratios between 3.0× and 5.0× for JSON logs.
Production Hardening
What Happens Under Load
Compression does not block request threads. Mutiny emits data on a scheduler. If a client slows down, SSE buffers grow. This is dangerous.
Use overflow strategies:
.onOverflow().drop()Dropping logs is better than killing the JVM.
Security Boundaries
Never trust decompressed size blindly. Always cap maximum message size. LZ4 is fast enough to decompress bombs very quickly.
When This Pattern Fits
This pattern works well for:
Log shipping
Telemetry
Metrics streaming
Event pipelines
It does not work well for:
Already compressed data
Very small payloads
CPU-starved environments
Conclusion
We built a real-time log compression pipeline that survives production pressure. Logs are generated continuously, compressed in batches with LZ4, streamed reactively, and measured with real metrics. Most importantly, we understand what breaks, why batching matters, and where the safety boundaries are.
If you run high-volume systems, this pattern is not optional anymore.


