Radhakrishnan Nariangadu

Accolite Digital

This article explores working code demo to showcase the concepts discussed in Part 1.

I had discussed two approaches for use cases 2 and 3. The code shared in this article showcases both approaches so that you can understand the pros and cons better. 

For Approach 1, I’ve not applied a distributed lock so that I can demonstrate the potential data integrity issues when individual clients attempt to update a shared object.

Please note that the code is a very rudimentary timeline management service, missing many use cases. It’s meant to showcase how a distributed cache enables atomic operations across a cluster in a very efficient manner.

The code example uses java data objects just to keep it simple, but in a system targeting high throughput there’s a very good probability that the data format string, e.g., json string, is passed along the pipeline to eliminate Object SerDe processing.

The diagram below shows the proposed architecture of this demo app:

Architecture

The class diagram for the main classes in this demo:

Class Diagram

The Abstract class above carries the bulk of the processing. I use the Guava collection’s Evicting Queue as the timeline repository.

Setting up Hazelcast:

Download Hazelcast (HZ) 5 platform from https://hazelcast.com/open-source-projects/downloads/

Setup your JAVA_HOME and PATH to the respective JDK/JRE folders. Ensure that you’re using the same JRE version as the code that you’ll deploy to the cluster. I’ve used JDK version 1.8.

Following steps are based on a windows setup. Replace paths based on your environment.

Add the following ‘timeline_cache’ map configuration to your HZ cluster config (<HZ installation folder>\config\hazelcast.xml) The cache’s in-memory format is OBJECT to make cluster processing more efficient.

Timeline cache map

 

Starting up your HZ cluster

 starting HZ cluster

Notes on the classes:

Here’s my package hierarchy. You can follow your own, just ensure that you make the appropriate changes to the code.

 

 class hierarchy

 

The codebase depends on 2 libraries - Providing the maven dependencies below.

Dependency Libraries

    

Hazelcast’s runtime user code deployment feature doesn’t seem to work well with inner classes, which I’ve used. To avoid any runtime class discovery issues, I’ve copied the following 2 required jars to the HZ installation lib folder as a shortcut. You can also configure a classpath in HZ configuration so that you leave the installation’s lib folder clean.

Since the Hazelcast jars are already present in the HZ installation, you’ll only need to copy guava-31.0.1-jre.jar from your local maven repo to <HZ installation folder>\lib

After your build the codebase in this article, you’ll need to create a jar only containing the com.demo.cache package. Ensure to deploy this into the lib folder as well, similar to guava library.

 

TimelineUpdaterDemo => The main class of this Demo. The console messages should be self-explanatory. You pass it either…

             “client_update” -> timeline updates are performed by the Client nodes,ie, RecommendationMessageDistributor and FolloweeUserMessageDistributor

             “cluster_update” => timeline updates are performed by TimelineAppender (EntryProcessor) which RecommendationMessageDistributor and FolloweeUserMessageDistributor push to the cache.

 TimelineUpdaterDemo.java

 

TimelineMessageListener => An Hazelcast EventListener Impl listening to changes in the timeline cache. It prints the timeline in the console so that you can see what’s happening.

TimelineMessageListener.java

 

TimelineAppender => An implementation of the EventProcessor to update the subscriber’s timeline directly in the cache node, in an atomic manner.

 TimelineAppender.java

 

Message, User and UserTimeline => Data objects

 Message.java

 

User.java

 

 UserTimeline.java

 

MessageDistributorService => an interface to define the Message distributor service instances. It manages the timeline of subscribers it’s assigned to. It can receive messages from its respective source and update the timeline of the subscriber in an atomic manner.

               MessageDistributorService.java

 

RecommendationsMessageDistributor => A Recommendation service that pushes recommendation messages to its subscriber list. Multiple nodes of the same Recommendation system manage their distinct set of subscribers.

         RecommendationsMessageDistributor.java

 

FolloweeUserMessageDistributor => Node listens to messages from say, an Influencer, and pushes the messages to their subscribers. You could have multiple nodes listening to the same Influencer but managing a distinct set of subscribers.

                 Followeeusermessagedistributor.java

 

In this demo codebase, both RecommendationsMessageDistributor and FolloweeUserMessageDistributor start as a single instance and send messages to their subscriber list with random timing. They extend from AbstractMessageDistributorHZCacheImpl which handles the actual processing.

           AbstractMessageDistributorHZCacheImpl.java

 

The main message publishing code is located at AbstractMessageDistributorHZCacheImpl. distributeMessage(). This method uses either Client or Cluster update modes based on the command line toggle when running the demo.

Familiarize yourself with the code and then give it a run. I’ve explained the output you’ll see in the console below. Note that TimelineMessageListener prints the timeline on StdErr.

The output snapshot below is for approach 1 – “client_update”

 

You can see that the message “User:User-6=>"message at 46.107" is lost between updates – see the log output marked in red. Never a good thing! This would not occur had we incorporated a distributed lock but that brings a whole set of additional headaches, i.e., management of the granular locks themselves.

User:User-3=>"message at 46.095" è printed by the Followee instance when it generates a message

User:User-6=>"message at 46.107"

Reco:Reco-0=>"message at 46.107" è printed by the Recommendation instance when it generates a message

The TimelineListener prints the logs below when it registers a timeline change.

The format is <second.millisecond> => timeline messages in reverse chronological order

46.108 => User:User-3=>"message at 46.095" | User:User-3=>"message at 00.946" | User:User-6=>"message at 00.900" | User:User-4=>"message at 00.837" | User:User-0=>"message at 00.790" | User:User-7=>"message at 00.744" | User:User-7=>"message at 00.682" | User:User-8=>"message at 00.635" | User:User-5=>"message at 00.589" | User:User-9=>"message at 00.528" |

46.110 => User:User-6=>"message at 46.107" | User:User-3=>"message at 46.095" | User:User-3=>"message at 00.946" | User:User-6=>"message at 00.900" | User:User-4=>"message at 00.837" | User:User-0=>"message at 00.790" | User:User-7=>"message at 00.744" | User:User-7=>"message at 00.682" | User:User-8=>"message at 00.635" | User:User-5=>"message at 00.589" |

46.112 => Reco:Reco-0=>"message at 46.107" | User:User-3=>"message at 46.095" | User:User-3=>"message at 00.946" | User:User-6=>"message at 00.900" | User:User-4=>"message at 00.837" | User:User-0=>"message at 00.790" | User:User-7=>"message at 00.744" | User:User-7=>"message at 00.682" | User:User-8=>"message at 00.635" | User:User-5=>"message at 00.589" |

 

 client_update

 

The output snapshot below is for approach 1 – “cluster_update”

 

The EntryProcessor is pushed to the cluster to update the timeline in an atomic manner. The logs clearly demonstrate that proper sequencing of timeline updates occur without any message losses. The Distributed cache atomic update is ensured by the EntryProcessor since the Hazelcast engine applies a  Distributed cache write synchronization across all the nodes that need to be updated.

Reco:Reco-3=>"message at 47.383"

47.388 => Reco:Reco-3=>"message at 47.383" | User:User-3=>"message at 47.352" | Reco:Reco-5=>"message at 47.338" | User:User-8=>"message at 47.306" | User:User-9=>"message at 47.257" | User:User-2=>"message at 47.213" |

User:User-8=>"message at 47.399"

Reco:Reco-2=>"message at 47.399"

47.404 => User:User-8=>"message at 47.399" | Reco:Reco-3=>"message at 47.383" | User:User-3=>"message at 47.352" | Reco:Reco-5=>"message at 47.338" | User:User-8=>"message at 47.306" | User:User-9=>"message at 47.257" | User:User-2=>"message at 47.213" |

47.406 => Reco:Reco-2=>"message at 47.399" | User:User-8=>"message at 47.399" | Reco:Reco-3=>"message at 47.383" | User:User-3=>"message at 47.352" | Reco:Reco-5=>"message at 47.338" | User:User-8=>"message at 47.306" | User:User-9=>"message at 47.257" | User:User-2=>"message at 47.213" |

Cluster_update

 

I hope this deep dive into a very important feature set of distributed caches, i.e, distributed compute has made you eager to learn more about dist caches and their features that go beyond mere distributed data storage!

 

Read the full blog here:
https://medium.com/@AccoliteDigital/distributed-caches-the-guide-you-need-part-ii-61928e0c614b

Related Insights