MessageBus
The MessageBus interface is used for communication between different components, for example to synchronize between endpoints. Therefore, the MessageBus is primarily designed for internal use but as it might also be useful for some applications and scenarios there might be implementations that expose the MessageBus to the outside world.
Events
The MessageBus works according to the publish/subscribe principle based on different types of events or event messages (which are subclasses of the abstract class EventMessage
).
Subscriptions are made to a kind of event, i.e. a subclass of EventMessage
or even EventMessage
itself (to receive all events).
When subscribing to a class, all events of this class or any subclass are received.
This is the class hierarchy of available event classes/types
EventMessage
(abstract): Superclass for all events, payload: aReference
to the subject elementAccessEventMessage
(abstract): Superclass for all types of access-based eventsReadEventMessage
(abstract): Superclass for all types of read-events, triggered each time an element is read via APIElementReadEventMessage
: Triggered when aReferable
is read via API, payload: the referable (serialized according to the request, i.e. using the requestedSerializationModifier
)ValueReadEventMessage
: Triggered when the value of an element is read via API, payload: the element value
ExecuteEventMessage
(abstract): Superclass for all events related to executing operationsOperationInvokeEventMessage
: Triggered when an operation is invoked/started, payload: input and inoutput parametersOperationFinishEventMessage
: Triggered when an operation is finished, payload: output and inoutput parameters
ChangeEventMessage
(abstract): Superclass for all types of changesElementChangeEventMessage
(abstract): Superclass for all types of structural changes, payload: the updated elementElementCreateEventMessage
: Triggered when an element is createdElementDeleteEventMessage
: Triggered when an element is deletedElementUpdateEventMessage
: Triggered when an element is updatedValueChangeEventMessage
: Triggered when the value of an element is updated, payload: old value, new value
ErrorEventMessage
: Triggered when an error occurred, payload: message, error level (INFO, WARN, ERROR)
Internal
This is the default implementation of the MessageBus interface which is implemented using Java method calls. Therefore, it can only be accessed from code when FA³ST Service is used as an embedded library.
Configuration
This implementation does not offer any configuration properties.
1{
2 "messageBus": {
3 "@class": "de.fraunhofer.iosb.ilt.faaast.service.messagebus.internal.MessageBusInternal"
4 },
5 //...
6}
MQTT
This implementation of the MessageBus
interface publishes messages via MQTT either by hosting its own MQTT server or by using an externally hosted one.
Topics & Payload
Each message type is published on its own topic in the form of [topicPrefix]/[className]
, e.g. events/ValueChangeEventMessage
.
The payload is a JSON serialization of the corresponding Java class with the following base structure
1{
2 "@type": "[event type]",
3 "element": {
4 // [default JSON serialization of Reference]
5 },
6 // [event-specific properties]
7}
An example ValueChangeEvent
might look like this:
1{
2 "@type": "ValueChangeEvent",
3 "element": {
4 "keys": [
5 {
6 "idType": "Iri",
7 "type": "Submodel",
8 "value": "http://example.org/submodel"
9 },
10 {
11 "idType": "IdShort",
12 "type": "Property",
13 "value": "property"
14 }
15 ] },
16 "oldValue": {
17 "modelType": "Property",
18 "dataType": "int",
19 "value": 0
20 },
21 "newValue": {
22 "modelType": "Property",
23 "dataType": "int",
24 "value": 1
25 }
26}
For deserialization of events the class JsonEventDeserializer
in module dataformat-json
can be used.
Configuration
Name |
Allowed Value |
Description |
Default Value |
---|---|---|---|
clientCertificate |
The client certificate to use. If not set, SSL will be disabled. |
||
clientId |
String |
ClientId to use when connecting to the MQTT server. |
FAST MQTT MessageBus |
host |
String |
The host name of the MQTT server without prefix, e.g., 192.168.0.1. |
localhost |
password |
String |
Password used to connect to the MQTT server. |
|
port |
Integer |
The port to use for TCP communication. |
1883 |
serverCertificate |
The server certificate to use. If not set, SSL will be disabled. |
||
sslPort |
Integer |
The port to use for secure TCP communication. |
8883 |
sslWebsocketPort |
Integer |
The port to use for secure websocket communication. |
443 |
topicPrefix |
String |
Prefix to use for the topic names. |
events/ |
useInternalServer |
Boolean |
If true, FA³ST Service starts its own MQTT server. |
true |
username |
String |
Username used to connect to the MQTT server. |
|
users |
Map<String, String> |
Map of usernames and passwords of users that are allowed to connect to the MQTT server. |
empty list |
useWebsocket |
Boolean |
If true uses websocket, otherwise TCP. |
false |
websocketPort |
Integer |
The port to use for TCP communication |
9001 |
1{
2 "messageBus": {
3 "@class": "de.fraunhofer.iosb.ilt.faaast.service.messagebus.mqtt.MessageBusMqtt",
4 "useInternalServer": true,
5 "port": 1883,
6 "sslPort": 8883,
7 "host": "localhost",
8 "websocketPort": 9001,
9 "sslWebsocketPort": 443,
10 "serverCertificate": {
11 "keyStoreType": "PKCS12",
12 "keyStorePath": "C:\faaast\MyKeyStore.p12",
13 "keyStorePassword": "changeit",
14 "keyAlias": "server-key",
15 "keyPassword": "changeit"
16 },
17 "clientCertificate": {
18 "keyStoreType": "PKCS12",
19 "keyStorePath": "C:\faaast\MyKeyStore.p12",
20 "keyStorePassword": "changeit",
21 "keyAlias": "client-key",
22 "keyPassword": "changeit"
23 },
24 "users": {
25 "user1": "password1"
26 },
27 "username": "messagebus-user",
28 "password": "messagebus-password",
29 "clientId": "CustomClientId",
30 "topicPrefix": "faaast/events/"
31 },
32 //...
33}