Messaging¶
Morphium provides a MongoDB‑backed message queue with topic‑based listeners.
Concepts
- Topic: string category for messages (e.g.,
user.created
) - Exclusive vs non‑exclusive:
- Exclusive (
Msg.setExclusive(true)
): exactly one listener processes the message (one‑of‑n); implemented using a lock collection - Non‑exclusive (default): every registered listener for the topic processes the message (broadcast)
- Answers: listeners may return a
Msg
as response; senders can wait synchronously or asynchronously - Implementations: choose between Standard and Advanced; see Messaging Implementations for differences and migration.
Setup
import de.caluga.morphium.messaging.*;
// Create via Morphium factory (preferred)
MorphiumMessaging messaging = morphium.createMessaging();
// Or with overrides: morphium.createMessaging(cfg.messagingSettings())
messaging.start();
// Listen to a topic
messaging.addListenerForTopic("user.created", (mm, m) -> {
System.out.println("Got: " + m.getMsg());
return null; // no answer
});
// Send a message
Msg msg = new Msg("user.created", "User alice created", "userId:123");
messaging.sendMessage(msg);
Request/Response
// Listener answering a request
messaging.addListenerForTopic("user.lookup", (mm, m) -> {
var response = new Msg(m.getTopic(), "ok", "");
response.setMapValue(Map.of("userId", m.getValue()));
return response;
});
// Sender waiting for first answer
Msg req = new Msg("user.lookup", "find", "user123");
Msg resp = messaging.sendAndAwaitFirstAnswer(req, 5000);
Configuration (via MessagingSettings
)
- Queue name:
setMessageQueueName(String)
: collection suffix used for the queue. - Window size:
setMessagingWindowSize(int)
: number of messages processed per batch. Messaging marks up to this many messages and processes them as one window. - Multithreading:
setMessagingMultithreadded(boolean)
: process multiple messages in parallel using (virtual) threads;false
enforces single‑threaded, sequential handling. - Change streams:
setUseChangeStream(boolean)
: use MongoDB Change Streams to get push‑style notifications for new messages; whenfalse
, messaging uses polling. Requires a replica set for Change Streams. - Poll pause:
setMessagingPollPause(int)
: pause (in ms) between polling requests when not using Change Streams. Also used as a heartbeat to check for messages outside the current processing window (e.g., if new messages arrive and the queue holds more thanwindowSize
, a poll is triggered once after this pause).
Example
var ms = new MessagingSettings();
ms.setMessageQueueName("default");
ms.setMessagingWindowSize(100);
ms.setMessagingMultithreadded(true);
ms.setUseChangeStream(true);
ms.setMessagingPollPause(250);
MorphiumMessaging mq = morphium.createMessaging(ms);
mq.start();
Examples and behavior
- Sequential processing:
multithreadded=false
,windowSize=1
→ exactly one message is processed at a time, in order. - Batched parallelism:
multithreadded=true
,windowSize=100
→ up to 100 messages are fetched and processed concurrently per window.
Notes
- When Change Streams are disabled, polling respects
messagingPollPause
to reduce load but still peeks for messages beyond the current window so bursts are noticed promptly.
Benefits & Trade‑offs¶
Benefits
- Persistent queue: messages are stored in MongoDB by default (durability across restarts); use in‑memory storage only when persistence is not needed.
- Queryable messages: run ad‑hoc queries for statistics, audits, or status checks without interfering with processing.
- Change streams: combine with MongoDB change streams to react to new messages transparently (no polling; requires replica set).
- No extra infrastructure: reuse your existing MongoDB setup—no separate broker or runtime dependency when you already operate a replica set.
Trade‑offs
- Throughput: slower than purpose‑built brokers; every message is a document write/read.
- Load: very high message rates will add notable database load—plan capacity accordingly or choose a different transport when ultra‑high throughput is critical.
V6.0 Improvements¶
Change Stream Reliability¶
Morphium 6.0 significantly improved change stream handling in messaging:
No More Re-reads
- v5: messaging layer re-read documents after change stream events
- v6: uses evt.getFullDocument()
directly from change stream snapshots
- More efficient, no dirty reads, no race conditions
Document Snapshots
// Change stream events now contain immutable snapshots
// Messages are processed from the exact state at insert time
messaging.addListenerForTopic("events", (m, msg) -> {
// msg is from immutable snapshot, safe to process
// No concurrent modifications possible
return null;
});
InMemoryDriver Support¶
Full messaging support with InMemoryDriver for testing:
MorphiumConfig cfg = new MorphiumConfig();
cfg.driverSettings().setDriverName("InMemDriver");
cfg.connectionSettings().setDatabase("testdb");
try (Morphium morphium = new Morphium(cfg)) {
MorphiumMessaging sender = morphium.createMessaging();
MorphiumMessaging receiver = morphium.createMessaging();
receiver.addListenerForTopic("test", (m, msg) -> {
// Process message
return null;
});
sender.start();
receiver.start();
sender.sendMessage(new Msg("test", "Hello", "World", 30000));
}
Multi-Instance Support
// Multiple Morphium instances sharing same database
Morphium m1 = new Morphium(cfg);
Morphium m2 = new Morphium(cfg);
MorphiumMessaging msg1 = m1.createMessaging();
MorphiumMessaging msg2 = m2.createMessaging();
// Both share the same InMemoryDriver
// Change streams work correctly
// Exclusive messages properly distributed
// Broadcast messages delivered to all
Virtual Threads¶
Java 21 virtual threads for lightweight concurrency: - Change stream callbacks run on virtual threads - Each change stream watcher has its own virtual thread executor - Minimal memory overhead for thousands of concurrent listeners
See also
Notes and best practices
- No wildcard/global listeners: register explicit topics via
addListenerForTopic(topic, listener)
- Non‑exclusive messages are broadcast to all listeners of a topic
- For delayed/scheduled handling, add your own not‑before timestamp field and have the listener re‑queue or skip until due;
Msg.timestamp
is used for ordering, not scheduling - For retries and DLQ, implement logic in listeners (inspect payload, track retry count, re‑queue or redirect to a DLQ topic)
- For distributed cache synchronization, see Caching Examples and Cache Patterns; Morphium provides
MessagingCacheSynchronizer
.