This is a followup to consistency. Specifically this is about measuring consistency in a distributed system within different consistency scopes. Event or message driven systems using CQRS present the most consistency challenges so I am going to assume that we are trying to measure consistency for a asynchronous event driven system.

cqrs event service consistency

In CQRS requests that change the state of a system are called commands. Commands are received by the command processor [1]. The processed command generates a state change that is stored in a local store and an event published [2]. Once this processing is complete control returns to the caller.

The event bus transports the event to the query processor [3]. The query processor receives the event and updates its own store. At this point a Query [4] is consistent with the command [1].

Transaction Consistency

Transaction Consistency

transaction consistency scope

The command defines the transaction boundary. Once the command has been processed the command processor achieves transaction consistency.

Service/Cluster Consistency

service consistency scope

For service level consistency the event has to be transported from the command processor to the query processor and processed successfully. Quite often the Command and Query processors are in different processes. High availability means multiple instances so even if the command and query processors are in the same process it is likely that the event will be picked up but another member of the cluster.

For Transaction and service consistency measurement the service can be responsible for monitoring its consistency. Service activity from the command and query processors can be reconciled. The service can also be responsible for alerting when service SLAs are not met.

Capability Consistency

capability consistency scope

Once beyond a single service it is not possible for each service to monitor its own consistency. Service B does not know that Service A has published an event if it has not received one.

At this point in consistency measurement we need to introduce another party to take care of this measurement.

consistency observer

The 3rd party observer is a system implementation of the OO Observer pattern. The Observer relies on activity data published by all the components. In the example above Service A tells the observer that it processed a command [1] and published an event [2]. This activity 'report' sets up an expectation in the observer that Service B will be taking action. Capability consistency is reached when an activity report is received from Service B that it processed the event [3] and updated the external service [4].

Activity reports include details of the data being processed and the data being produced.

I processed [command/event] and produced [event(s)]

'I' Refers to the system that processed the data. This would typically be a unique service name and version. The version is important during updates that introduce a new event dependency.

'command/event' refers to the type and identity of the data being processed.

'events' lists the event types and identities of the events produced.

Activities signal to the observer that there is a consistency expectation that needs to be met.

Enterprise Consistency

Enterprise consistency is a catch-all level and includes all systems implemented and used by the enterprise. It is also the hardest to measure because it includes both real-time user facing systems and warehousing systems.

Consistency measurements at this level are typically computed offline and infrequently.

The Observer

The observer outlined above is a form of distributed tracing. Zipkin and Spring Cloud Sleuth are examples of libraries and tools that support distributed tracing. There are some key differences though. Both Zipkin and Spring Cloud Sleuth are measurement tools that provide a way to investigate latency and execution path issues. For Consistency measurement we need to add expectations so that each execution path can be compared against it.

Expectations

Each component in a distributed messaging system has a set of messages/events that it is interested in. e.g. As an order management system I am interested in new customer registrations.

Given that we have an interest list from each service or component we can infer that if a new customer registration event happens then we should see an activity report from the order management system.

When processing an event it is perfectly reasonable for a service to generate its own event. In this case the activity report would include the events produced from handling the received event. This leads to expectation chaining. An activity report can extend the 'flow' to include new expectations for the produced event.

Single service expectation
  [{:component :order-management
    :version   "1.0"
    :expects   [:new-customer-registration]}]

The expectation declaration above indicates that the order-management system expects to process new-customer-registration events. If a component publishes a new-customer-registration then the observer expects to receive an activity report from the order-management system.

consistency measurement expectation setting

When we receive the activity data from the customer management system this sets up an expectation that to be consistent the order management system must report that is has processed the event.

Customer Management activity report
  {:timestamp   "Sun Jun 18 21:59:07 UTC 2017"
   :activity-id "uuid-1"
   :flow-id     "uuid-2"
   :component   {:name    "customer-management"
                 :version "1.0"}
   :consumed    [{:command "register-customer"
                  :id      "uuid-3"}]
   :produced    [{:event    "new-customer-registration"
                  :event-id "uuid-4" }]}

Receiving this activity report sets up an expectation that the order-management system will report that it has processed the new-customer-registration event as part of the flow identified by the flow-id.

Order Management activity report
  {:timestamp   "Sun Jun 18 21:59:57 UTC 2017"
   :activity-id "uuid-1"
   :flow-id     "uuid-2"
   :component   {:name    "customer-management"
                 :version "1.0"}
   :consumed    [{:event    "new-customer-registration"
                  :event-id "uuid-4" }]
   :produced    []}

Once the activity report has been received we can conclude that consistency was achieved in 50 seconds.

If the activity report is not delivered within an agreed SLA the observer can send an alert or initiate some other action.

Wrapping up

Both Zipkin and Spring Cloud Sleuth use log messages generated by the applications. To be effective at consistency measurement your logging infrastructure needs to be robust and available. Typically logging has not been considered to be a high availability service. For consistency measurement to be an effective the tools need to be highly available and resilient.

Consistency measurement needs to focus on both individual transactions (or flows) through the system and the flows as an aggregate. There are likely to be anomalies - e.g. event messages that go through extended retries during a redeployment so consistency is achieved much later than usual. Quite often consistency requirements are expressed in the from X% of transactions capability consistent within Y seconds.

Next Up: Compensating actions.