MessageBus

The MessageBus interface is used for communication between different components, for example to synchronize between endpoints. Therefore, the MessageBus is primarily designed for internal use but as it might also be useful for some applications and scenarios there might be implementations that expose the MessageBus to the outside world.

Events

The MessageBus works according to the publish/subscribe principle based on different types of events or event messages (which are subclasses of the abstract class EventMessage). Subscriptions are made to a kind of event, i.e. a subclass of EventMessage or even EventMessage itself (to receive all events). When subscribing to a class, all events of this class or any subclass are received.

This is the class hierarchy of available event classes/types

  • EventMessage (abstract): Superclass for all events, payload: a Reference to the subject element

    • AccessEventMessage (abstract): Superclass for all types of access-based events

      • ReadEventMessage (abstract): Superclass for all types of read-events, triggered each time an element is read via API

        • ElementReadEventMessage: Triggered when a Referable is read via API, payload: the referable (serialized according to the request, i.e. using the requested SerializationModifier)

        • ValueReadEventMessage: Triggered when the value of an element is read via API, payload: the element value

      • ExecuteEventMessage (abstract): Superclass for all events related to executing operations

        • OperationInvokeEventMessage: Triggered when an operation is invoked/started, payload: input and inoutput parameters

        • OperationFinishEventMessage: Triggered when an operation is finished, payload: output and inoutput parameters

    • ChangeEventMessage (abstract): Superclass for all types of changes

      • ElementChangeEventMessage (abstract): Superclass for all types of structural changes, payload: the updated element

        • ElementCreateEventMessage: Triggered when an element is created

        • ElementDeleteEventMessage: Triggered when an element is deleted

        • ElementUpdateEventMessage: Triggered when an element is updated

        • ValueChangeEventMessage: Triggered when the value of an element is updated, payload: old value, new value

    • ErrorEventMessage: Triggered when an error occurred, payload: message, error level (INFO, WARN, ERROR)

Internal

This is the default implementation of the MessageBus interface which is implemented using Java method calls. Therefore, it can only be accessed from code when FA³ST Service is used as an embedded library.

Configuration

This implementation does not offer any configuration properties.

Example configuration for Internal MessageBus.
1{
2	"messageBus": {
3		"@class": "de.fraunhofer.iosb.ilt.faaast.service.messagebus.internal.MessageBusInternal"
4	},
5	//...
6}

MQTT

This implementation of the MessageBus interface publishes messages via MQTT either by hosting its own MQTT server or by using an externally hosted one.

Topics & Payload

Each message type is published on its own topic in the form of [topicPrefix]/[className], e.g. events/ValueChangeEventMessage. The payload is a JSON serialization of the corresponding Java class with the following base structure

JSON structure of serialized MessageBus events.
1{
2	"@type": "[event type]",
3	"element": { 
4		// [default JSON serialization of Reference] 
5	},
6	// [event-specific properties]
7}

An example ValueChangeEvent might look like this:

JSON serialization of an example ValueChangeEvent.
 1{
 2    "@type": "ValueChangeEvent",
 3    "element": {
 4        "keys": [
 5            {
 6                "idType": "Iri",
 7                "type": "Submodel",
 8                "value": "http://example.org/submodel"
 9            },
10            {
11                "idType": "IdShort",
12                "type": "Property",
13                "value": "property"
14            }
15    ] },
16    "oldValue": {
17        "modelType": "Property",
18        "dataType": "int",
19        "value": 0
20    },
21    "newValue": {
22        "modelType": "Property",
23        "dataType": "int",
24        "value": 1
25    }
26}

For deserialization of events the class JsonEventDeserializer in module dataformat-json can be used.

Configuration

Configuration properties of MQTT MessageBus.

Name

Allowed Value

Description

Default Value

clientCertificate
(optional)

CertificateInfo

The client certificate to use. If not set, SSL will be disabled.

clientId
(optional)

String

ClientId to use when connecting to the MQTT server.

FAST MQTT MessageBus

host
(optional)

String

The host name of the MQTT server without prefix, e.g., 192.168.0.1.

localhost

password
(optional)

String

Password used to connect to the MQTT server.

port
(optional)

Integer

The port to use for TCP communication.

1883

serverCertificate
(optional)

CertificateInfo

The server certificate to use. If not set, SSL will be disabled.

sslPort
(optional)

Integer

The port to use for secure TCP communication.

8883

sslWebsocketPort
(optional)

Integer

The port to use for secure websocket communication.

443

topicPrefix
(optional)

String

Prefix to use for the topic names.

events/

useInternalServer
(optional)

Boolean

If true, FA³ST Service starts its own MQTT server.
If false, FA³ST Service uses external MQTT server.

true

username
(optional)

String

Username used to connect to the MQTT server.

users
(optional)

Map<String, String>

Map of usernames and passwords of users that are allowed to connect to the MQTT server.
This is only used when useInternalServer is true

empty list

useWebsocket
(optional)

Boolean

If true uses websocket, otherwise TCP.

false

websocketPort
(optional)

Integer

The port to use for TCP communication

9001

Example configuration for MQTT MessageBus.
 1{
 2	"messageBus": {
 3		"@class": "de.fraunhofer.iosb.ilt.faaast.service.messagebus.mqtt.MessageBusMqtt",
 4		"useInternalServer": true,
 5		"port": 1883,
 6		"sslPort": 8883,
 7		"host": "localhost",
 8		"websocketPort": 9001,
 9		"sslWebsocketPort": 443,
10		"serverCertificate": {
11			"keyStoreType": "PKCS12",
12			"keyStorePath": "C:\faaast\MyKeyStore.p12",
13			"keyStorePassword": "changeit",
14			"keyAlias": "server-key",
15			"keyPassword": "changeit"
16		},
17		"clientCertificate": {
18			"keyStoreType": "PKCS12",
19			"keyStorePath": "C:\faaast\MyKeyStore.p12",
20			"keyStorePassword": "changeit",
21			"keyAlias": "client-key",
22			"keyPassword": "changeit"
23		},
24		"users": {
25			"user1": "password1"
26		},
27		"username": "messagebus-user",
28		"password": "messagebus-password",
29		"clientId": "CustomClientId",
30		"topicPrefix": "faaast/events/"
31	},
32	//...
33}