RSS

Oracle Service Bus duplicate message check using Coherence

20 Aug

In a situation where you need some sort of duplicate message check for an Oracle Service Bus project you would need some custom code. Since the Oracle Service Bus is stateless, when it handles a proxy service  call it will not know if this specific message was handled before. So there needs to be some sort of logic in your service for validating it’s a new unique message id.

Giving the fact that every message on our ESB has an unique messageID element in the SOAP header we could store this on disk, database or in memory. With the help of Oracle Coherence this last option, in memory, is relatively simple.

remark: However these changes mentioned above (either disk, database or coherence) makes your stateless OSB a bit more statefull. Be carefull as developer or architect when you encounter requirements like these. The OSB wasn’t designed as a fast and stateless message handling ESB for nothing. We use this mechanism in a process flow with low message load where an old mainframe system with a custom (old) adapter occasionally (and unwanted) triggers duplicate messages. So in our case memory load is not a huge issue and we can proceed with our default monitoring tools).

Ok warnings there, so now let’s check our example custom SOAP header:


<tns:RubixHeader xmlns:tns="http://www.rubix.nl/header">
 <CorrelationId>AF1204DD-17D3-28A3-09A2-0888F2FFC123</CorrelationId>
 <MessageId>3F2504E0-4F89-11D3-9A0C-0305E82C3301</MessageId>
 <MessageType/>
 <Timestamp>2012-08-18T12:10:00</Timestamp>
 <Source/>
</tns:RubixHeader>

Every SOAP message through our OSB contains such a custom SOAP header and every messageID is generated as GUID and can be considered unique. So then a few lines of Java using the Coherence lib:

/**
 *
 */
package nl.rubix.coherence;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import weblogic.logging.LoggingHelper;
import java.util.logging.Logger;
// import com.tangosol.util.ConverterCollections.ConverterCollection;

/**
 * @author JvZoggel
 *
 */
public class OsbCoherenceHelper
{

 public OsbCoherenceHelper()
 {
 }

public static boolean CheckDuplicate(String messageid, String datetime)
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 boolean resultaat;
 Logger logger = LoggingHelper.getServerLogger();

 if (myCache.containsKey(messageid))
 {
 logger.warning(">> MessageID=" + messageid + " already exists with value=" + myCache.get(messageid));
 resultaat = true;
 }
 else
 {
 logger.warning(">> MessageID=" + messageid + " doesn't exists yet, now storing with value=" + datetime);
 myCache.put(messageid,datetime);
 resultaat = false;
 }
 return resultaat;
 }

public static String getValue(String messageid)
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 return myCache.get(messageid).toString();
 }

 public static String showValues()
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 return myCache.values().toString();
 }

 /**
 * @param args
 */
 public static void main(String[] args)
 {
 // CacheFactory.ensureCluster();
 // System.out.println(">>>> Cluster = " + CacheFactory.getCluster());
 // boolean x1 = CheckDuplicate("id0001","2001-10-26T21:00:00");
 // System.out.println("X1 = " + x1);
 // x1 = CheckDuplicate("id0001","2001-10-27T22:00:00");
 // System.out.println("X1 = " + x1);
 }
}

These few lines of code (ab)use the Coherence memory cache to store the XML message element MessageID as the Key and stores the element timestamp of the XML message as value. You could also store correlationID, or any other element, which might not be unique but this is not an issue for the Coherence values as long as the key is unique. The fact I use timestamp is to be able to create an overview of old messages (and maybe delete old keys with an additional Java method).

So let’s see how this looks like in the Oracle Service Bus:

Input for the callout are these 2 elements from our custom soap header. The result boolean variable is checked in the IF-THEN action. If TRUE then the proxy will throw an error because the message is a duplicate already passed earlier on.

Firing the service request to both managed servers results in the following logging:

Managed Server 1:

####<Aug 18 ..> <Warning> <> <server1.local> <rbx_osb_dev_01> <[ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <..> <..> <BEA-000000> <>> MessageID=3D2504E0-4F89-11D3-9A0C-0305E82C3301 doesn't exists yet, now storing with value=2012-08-18T12:10:00>

and firing again to Managed Server 2:

####<Aug 18 ..> <Error> <> <server2.local> <rbx_osb_dev_02> <[ACTIVE] ExecuteThread: '6' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <..> <..> <BEA-000000> <>> MessageID=3D2504E0-4F89-11D3-9A0C-0305E82C3301 already exists with value=2012-08-18T12:10:00>

Remind yourself that by default we are using the default Coherence caching strategy which means we store these cache entries in the default Weblogic JVM which hosts the Oracle Service Bus services, messages, transformations, jdbc connection & jms modules. To protect your JVM you can use an external coherence server which is explained by William Markito Oliveira in his blog on OTN.

I really love the fact how Oracle integrated Coherence into OSB by extending the Business Service capabilities to use result caching (see blog from Mark Smith if you don’t know this feature). The code used in this blogpost is not rocket science and I hope to see features like this out-of-the-box in future upgrades of the Oracle Service Bus.

References:

About these ads
 
6 Comments

Posted by on 20-08-2012 in Coherence, Java, Oracle, OSB

 

Tags: , , ,

6 responses to “Oracle Service Bus duplicate message check using Coherence

  1. Steffen

    05-10-2012 at 10:44

    Hi Jan,
    thanks for the post, helped me with exactly the same customer question ;-) I have an improvement to reduce the time between lookup and put. Use directly put and check the return value, which is “old” object if already in the cache. And “null” if the object is new.

    if (myCache.put(messageid, “”) != null) {
    logger.warning(“>> MessageID=” + messageid + ” already exists.”);
    result = true;
    } else {
    logger.warning(“>> MessageID=” + messageid + ” doesn’t exists yet”);
    result = false;
    }
    return result;

    Regards
    Steffen

     
    • jvzoggel

      07-10-2012 at 06:57

      Hi Steffen, really appreciate your reply and think your improvement will indeed perform better. Cheers Jan

       
  2. Fabio

    08-11-2012 at 13:37

    Hi guys,
    probably i have a really simple question how could i address the default cache used from Oracle Service Bus, if i use your code it seems to get a new instance of the Coherence cache.

    Thanks in advance
    F.

     

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 343 other followers

%d bloggers like this: