REQUIREMENT contrib.dbwatcher |
Type | NEW |
Priority | HIGH |
Status | CLOSED |
Topic | DbWatcher polls a database for changes and publishes those to xmlBlaster. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Des cription |
DbWatcher overview
DbWatcher is a little framework of plugins which detects changes on a database table,
queries the changes, formats them as a XML message and publishes them to xmBlaster.
DbWatcher is ready to go, you only set some configuration variables in xmlBlasterPlugins.xml
and on next xmlBlaster startup the configured database table is observed. DbWatcher may be useful when running xmlBlaster to integrate legacy applications where often many code segments update a database and other code segments or foreign applications need to know about the changes. With the delivered JDBC ResultSet to XML converter plugin a typical database change message looks like this: <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>INSERT</command> <ident>EDDI</ident> <colname type='DATE' nullable='0'>DATUM</colname> <colname type='NUMBER' precision='11' scale='0' signed='false'> CPU</colname> <colname type='VARCHAR2' precision='20' readOnly='true'>COL1</colname> <colname table='OMZ' schema='AA' catalog='CAT' type='VARCHAR2' precision='10'>ICAO_ID</colname> </desc> <row num='0'> <col name='DATUM'>2005-01-05 15:52:06.0</col> <col name='CPU'>238333</col> <col name='COL1'><![CDATA[<Bla]]></col> <col name='ICAO_ID'>EDDI</col> <attr name='LR'>SRANIL</attr> <attr name='SUBNET_ID'>TCP</attr> </row> <row num='1'> <col name='DATUM'>2005-01-05 15:52:07.0</col> <col name='CPU'>238340</col> <col name='COL1' encoding='base64'>QmxdXT5CbA==</col> <col name='ICAO_ID'>EDDI</col> </row> </sql>
The generated message has a header tag <desc> which contains the meta information
of the delivered data. This is self explaining, the <ident> tag delivers the changed
value of the database table column ICAO_ID which caused the change message. Now that you know what DbWatcher can do for you we should look into its design. DbWatcher designDbWatcher consists of a set of Interfaces which allows any thinkable customization. It is easily possible to run DbWatcher outside from xmlBlaster as a standalone program or even without any dependency on xmlBlaster.
The javadoc of each interface contains enough information to assist you writing your own customized plugin. Database change detectionThe most difficult point is how to get a change event when a foreign application changes data on the database. There are following ways to detect DB changes, in future we could add such additional detection plugins:
Configuration hintTo configure your own poller you can take the simple standalone example client xmlBlaster/src/java/org/xmlBlaster/contrib/dbwatcher/Example.java adjust your query configuration and start it interactively. The testlog.properties in the same directory switches on fine logging and you can manually see what happens. Logging hintDbWatcher is using the JDK 1.4 logging framework. When running natively as xmlBlaster plugin this logging output is redirected to xmlBlaster utils logging. To switch on detailed logging, add those lines to xmlBlaster.properties: logging/org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter FINE logging/org.xmlBlaster.contrib.db.DbPool FINE logging/org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector FINE logging/org.xmlBlaster.contrib.dbwatcher.detector.AlertScheduler FINE logging/org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector FINE logging/org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin FINE logging/org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher FINE logging/org.xmlBlaster.contrib.dbwatcher.DbWatcher FINE LimitationsEvery change detector has pro and cons and limitations, the following table gives an overview for the currently supplied detector plugins.
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
Timestamp based polling exampleLets create a database table TEST_POLL which contains a TIMESTAMP column TS (tested on Oracle): set autocommit on; CREATE TABLE TEST_POLL (COLOR VARCHAR(10), TS TIMESTAMP, CAR VARCHAR(10)); INSERT INTO TEST_POLL VALUES ('green', CURRENT_TIMESTAMP, 'Fiat'); INSERT INTO TEST_POLL VALUES ('red', CURRENT_TIMESTAMP, 'Lada'); INSERT INTO TEST_POLL VALUES ('blue', CURRENT_TIMESTAMP, 'BMW'); INSERT INTO TEST_POLL VALUES ('pink', CURRENT_TIMESTAMP, 'Fiat'); INSERT INTO TEST_POLL VALUES ('white', CURRENT_TIMESTAMP, 'Fiat'); SELECT * FROM TEST_POLL ORDER BY CAR; Now we configure a xmlBlaster native DbWatcher plugin in xmlBlasterPlugins.xml <plugin id='DbWatcherPlugin.testPoll' className='org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin'> <attribute id='jdbc.drivers'>oracle.jdbc.driver.OracleDriver</attribute> <attribute id='db.url'>jdbc:oracle:thin:@localhost:1521:orcl</attribute> <attribute id='db.user'>system</attribute> <attribute id='db.password'>secret</attribute> <attribute id='mom.loginName'>DbWatcherPlugin.testPoll/1</attribute> <attribute id='mom.topicName'>db.test_poll.event.${groupColValue}</attribute> <attribute id='alertScheduler.pollInterval'>10000</attribute> <attribute id='changeDetector.class'> org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector </attribute> <attribute id='changeDetector.groupColName'>CAR</attribute> <attribute id='changeDetector.detectStatement'> SELECT MAX(TO_CHAR(TS, 'YYYY-MM-DD HH24:MI:SSXFF')) from TEST_POLL </attribute> <attribute id='db.queryMeatStatement'> SELECT * FROM TEST_POLL WHERE TO_CHAR(TS, 'YYYY-MM-DD HH24:MI:SSXFF') > '${oldTimestamp}' ORDER BY CAR </attribute> <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/> <action do='STOP' onShutdownRunlevel='6' sequence='5'/> </plugin>
To see what messages are published start the server and a client These are the three messages you receive on startup: --------------------------------------- <key oid='db.test_poll.event.BMW'/> <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>UPDATE</command> <ident>BMW</ident> <colname type='VARCHAR2' precision='10'>COLOR</colname> <colname type='TIMESTAMP' scale='1'>TS</colname> <colname type='VARCHAR2' precision='10'>CAR</colname> </desc> <row num='0'> <col name='COLOR'>blue</col> <col name='TS'>2005-1-11.15.15. 30. 566126000</col> <col name='CAR'>BMW</col> </row> </sql> --------------------------------------- <key oid='db.test_poll.event.Fiat'/> <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>UPDATE</command> <ident>Fiat</ident> <colname type='VARCHAR2' precision='10'>COLOR</colname> <colname type='TIMESTAMP' scale='1'>TS</colname> <colname type='VARCHAR2' precision='10'>CAR</colname> </desc> <row num='0'> <col name='COLOR'>green</col> <col name='TS'>2005-1-11.11.53. 34. 263515000</col> <col name='CAR'>Fiat</col> </row> <row num='1'> <col name='COLOR'>pink</col> <col name='TS'>2005-1-11.11.53. 34. 447328000</col> <col name='CAR'>Fiat</col> </row> <row num='2'> <col name='COLOR'>white</col> <col name='TS'>2005-1-11.11.53. 34. 456153000</col> <col name='CAR'>Fiat</col> </row> </sql> --------------------------------------- <key oid='db.test_poll.event.Lada'/> <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>UPDATE</command> <ident>Lada</ident> <colname type='VARCHAR2' precision='10'>COLOR</colname> <colname type='TIMESTAMP' scale='1'>TS</colname> <colname type='VARCHAR2' precision='10'>CAR</colname> </desc> <row num='0'> <col name='COLOR'>red</col> <col name='TS'>2005-1-11.11.53. 34. 425024000</col> <col name='CAR'>Lada</col> </row> </sql> Now to verify the detection, we update one table row set autocommit on; UPDATE TEST_POLL SET COLOR='yellow', TS=CURRENT_TIMESTAMP WHERE CAR='BMW'; and your subscriber should receive this message <key oid='db.change.BMW'/> <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>UPDATE</command> <ident>BMW</ident> <colname type='VARCHAR2' precision='10'>COLOR</colname> <colname type='TIMESTAMP' scale='1'>TS</colname> <colname type='VARCHAR2' precision='10'>CAR</colname> </desc> <row num='0'> <col name='COLOR'>yellow</col> <col name='TS'>2005-1-11.15.52. 1. 204668000</col> <col name='CAR'>BMW</col> </row> </sql> |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
MD5 based polling exampleThe same table used in the previous example can be polled with MD5 comparison as well: <plugin id='DbWatcherPlugin.testPoll_MD5' className='org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin'> <attribute id='jdbc.drivers'>oracle.jdbc.driver.OracleDriver</attribute> <attribute id='db.url'>jdbc:oracle:thin:@localhost:1521:orcl</attribute> <attribute id='db.user'>system</attribute> <attribute id='db.password'>secret</attribute> <attribute id='mom.loginName'>DbWatcherPlugin.testPoll/2</attribute> <attribute id='mom.topicName'>db.test_poll.event.${groupColValue}</attribute> <attribute id='alertScheduler.pollInterval'>2000</attribute> <attribute id='changeDetector.class'> org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector </attribute> <attribute id='changeDetector.groupColName'>CAR</attribute> <attribute id='changeDetector.detectStatement'> SELECT color, car from TEST_POLL order by CAR </attribute> <attribute id='db.queryMeatStatement'> SELECT * FROM TEST_POLL WHERE CAR = '${groupColValue}' </attribute> <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/> <action do='STOP' onShutdownRunlevel='6' sequence='5'/> </plugin> The only difference to the timestamp based example is that we can detect DELETE statements: set autocommit on; DELETE FROM TEST_POLL WHERE CAR='BMW'; and your subscriber should receive this message <key oid='db.test_poll.event.BMW'/> <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>DELETE</command> <ident>BMW</ident> </desc> </sql> </content> |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
Trigger the polling with xmlBlaster messagesThe alert scheduler plugin typically polls for changes on the database. There is also support to poll the database after getting a kick by a xmlBlaster message send by some client. Here is an example setup: <plugin id='DbWatcherPlugin.testPoll-MsgTriggered' className='org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin'> <attribute id='jdbc.drivers'>oracle.jdbc.driver.OracleDriver</attribute> <attribute id='db.url'>jdbc:oracle:thin:@localhost:1521:orcl</attribute> <attribute id='db.user'>system</attribute> <attribute id='db.password'>secret</attribute> <attribute id='db.queryMeatStatement'> SELECT * FROM TEST_POLL WHERE TO_CHAR(TS, 'YYYY-MM-DD HH24:MI:SSXFF') > '${oldTimestamp}' ORDER BY CAR </attribute> <attribute id='mom.loginName'>DbWatcherPlugin.testPoll/1</attribute> <attribute id='mom.topicName'>db.test_poll.event.${groupColValue}</attribute> <attribute id='alertProducer.class'>org.xmlBlaster.contrib.dbwatcher.mom.XmlBlasterPublisher</attribute> <attribute id='mom.alertSubscribeKey'><key oid='checkDb'/></attribute> <attribute id='mom.alertSubscribeQos'><qos/></attribute> <attribute id='changeDetector.class'> org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector </attribute> <attribute id='changeDetector.groupColName'>CAR</attribute> <attribute id='changeDetector.detectStatement'> SELECT MAX(TO_CHAR(TS, 'YYYY-MM-DD HH24:MI:SSXFF')) from TEST_POLL </attribute> <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/> <action do='STOP' onShutdownRunlevel='6' sequence='5'/> </plugin> You can now test a poll with a command line publish, first start a subscriber so we can see what happens: java javaclients.HelloWorldSubscribe -xpath //key -session.name subscriber/1 -maxContentLength 1000and then publish an alert message which triggers a database poll: java javaclients.HelloWorldPublish -oid checkDb -numPublish 10 -erase falseAn additional feature of the TimestampChangeDetector plugin allows you to send the oldTimestamp and the change detector will send all messages newer than your supplied timestamp, for example: java javaclients.HelloWorldPublish -oid checkDb -clientProperty[oldTimestamp] "2005-02-01 11:59:24.639575" -erase falseYou can also misuse this feature to send an oldTimestamp far in the future and like this set the DbWatcher on standby. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
Example for table creation change event message<?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>CREATE</command> <colname type='NUMBER' precision='10' scale='3'>COLKEY</colname> <colname type='VARCHAR2' precision='20'>COL1</colname> <colname type='NUMBER' precision='12'>COL2</colname> <colname type='VARCHAR2' precision='10'>ICAO_ID</colname> </desc> </sql> Example for change event message<?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>INSERT</command> <ident>EDDI</ident> <colname type='DATE' nullable='0'>DATUM</colname> <colname type='NUMBER' precision='11' scale='0' signed='false'>CPU</colname> <colname type='VARCHAR2' precision='20' readOnly='true'>COL1</colname> <colname table='OMZ' schema='AA' catalog='CAT' type='VARCHAR2' precision='10'>ICAO_ID</colname> </desc> <row num='0'> <col name='DATUM'>2005-01-05 15:52:06.0</col> <col name='CPU'>238333</col> <col name='COL1'><![CDATA[<Bla]]></col> <col name='ICAO_ID'>EDDI</col> <attr name='LR'>SRANIL</attr> <attr name='SUBNET_ID'>TCP</attr> </row> <row num='1'> <col name='DATUM'>2005-01-05 15:52:07.0</col> <col name='CPU'>238340</col> <col name='COL1' encoding='base64'>QmxdXT5CbA==</col> <col name='ICAO_ID'>EDDI</col> </row> <row num='2'> <col name='DATUM'>2005-01-05 15:52:08.0</col> <col name='CPU'>238343</col> <col name='COL1'>BOO</col> <col name='ICAO_ID'>EDDI</col> <attr name='SUBNET_ID'>X25</attr> </row> </sql> Data which would break the xml of being well-formed are enclosed with a CDATA section, data which contains a CDATA end-token is delivered base64 encoded. Example for a deleted entry messageThe topic name specifies the entry, for example 'db.change.event.EDDI', the content contains a DELETE command: <?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>DELETE</command> <ident>EDDI</ident> </desc> </sql> Use the mom.eraseOnDelete=true configuration to delete the topic on a table row DELETE event. Example for a drop entry message<?xml version='1.0' encoding='UTF-8' ?> <sql> <desc> <command>DROP</command> <ident>EDDI</ident> </desc> </sql> Use the mom.eraseOnDrop=true configuration to delete the topic on a table DROP event. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
Example for Microsoft SQLServer with rowversion column<plugin id='DbWatcherPlugin.testTS' className='org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin'> <attribute id='jdbc.drivers'>com.microsoft.jdbc.sqlserver.SQLServerDriver</attribute> <attribute id='db.url'> jdbc:microsoft:sqlserver://localhost:1433;SelectMethod=cursor;DatabaseName=mydb </attribute> <attribute id='db.user'>sa</attribute> <attribute id='db.password'>secret</attribute> <attribute id='mom.loginName'>DbWatcherPlugin.testPoll/1</attribute> <attribute id='mom.topicName'>db.test_poll.event</attribute> <attribute id='alertScheduler.pollInterval'>500</attribute> <attribute id='changeDetector.class'> org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector </attribute> <!-- timestamp variable, we want to get each row seperately --> <attribute id='changeDetector.groupColName'>polterTS</attribute> <!-- check if there was any change --> <attribute id='changeDetector.detectStatement'> SELECT MAX(CAST(polterTS as datetime)) from polter </attribute> <!-- each result row will be one explicit publish --> <attribute id='db.queryMeatStatement'> SELECT * FROM polter WHERE CAST(polterTS as datetime) > '${oldTimestamp}' ORDER BY polterTS </attribute> <attribute id='mom.eraseOnDelete'>true</attribute> <attribute id='mom.eraseOnDrop'>true</attribute> <action do='LOAD' onStartupRunlevel='6' sequence='6' onFail='resource.configuration.pluginFailed'/> <action do='STOP' onShutdownRunlevel='5' sequence='6'/> </plugin> The polterTS column is of type rowVersion. You can add such a column with this command: alter table [xxx].[polter] add polterTS rowversion |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Example any |
Sequence diagram showing the usage sequence of the plugins: |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Configure |
The plugin is configured in the xmlBlasterPlugins.xml configuration file. If you run DbWatcher outside of xmlBlaster the I_Info interface must return those settings.
The plugin is configured in the xmlBlasterPlugins.xml configuration file. Here an example where most properties are used: <xmlBlaster> ... <plugin id='DbWatcherPlugin.MD5' className='org.xmlBlaster.contrib.dbwatcher.plugin.DbWatcherPlugin'> <attribute id='jdbc.drivers'>oracle.jdbc.driver.OracleDriver</attribute> <attribute id='db.url'>jdbc:oracle:thin:@localhost:1521:orcl</attribute> <attribute id='db.user'>system</attribute> <attribute id='db.password'>secret</attribute> <attribute id='db.queryMeatStatement'></attribute> <attribute id='mom.topicName'>db.change.event.${groupColValue}</attribute> <attribute id='mom.loginName'>dbWatcher/1</attribute> <attribute id='mom.password'>secret</attribute> <attribute id='mom.alertSubscribeKey'><key oid=''/></attribute> <attribute id='mom.alertSubscribeQos'><qos/></attribute> <attribute id='changeDetector.class'> org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector </attribute> <attribute id='alertScheduler.pollInterval'>5000</attribute> <attribute id='changeDetector.groupColName'>ICAO_ID</attribute> <attribute id='changeDetector.detectStatement'> SELECT col1, col2, ICAO_ID FROM TEST_POLL ORDER BY ICAO_ID </attribute> <attribute id='converter.class'> org.xmlBlaster.contrib.dbwatcher.convert.ResultSetToXmlConverter </attribute> <attribute id='transformer.class'></attribute> <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/> <action do='STOP' onShutdownRunlevel='6' sequence='5'/> </plugin> ... </xmlBlaster> Note:Variables like ${db.user} are replaced on startup when the plugin is loaded. You could add db.user=system into xmlBlaster.properties or on command line and ${db.user} would be replaced to system.
Now we face the problem that we need to set ${...} constructs which we want to pass through
to the DbWatcher sub-plugins like MD5ChangeDetector. On plugin load they can't
be resolved and an exception is thrown. To escape such variable please write $_{...},
for example $_{changeDetector.detectStatement},
and they will be passed through as is. The currently available sub-plugins themselves are capable to replace
such constructs as well. Needed class filesThe complete DbWatcher code resides under
You can manually distribute it with cd xmlBlaster/build.tmp/classes jar cvf dbwatcher.jar org/xmlBlaster/contrib
NOTE: Configuration parameters are specified on command line (-someValue 17) or in the
xmlBlaster.properties file (someValue=17). See requirement "util.property" for details. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Todo |
Port testsuite to run with HSQLDB and MS-SQLServer, currently the SQL statements from the testsuite are Oracle specific, currently you need a running Oracle, a running xmlBlaster and start the tests separately: java -Ddb.password=secret junit.swingui.TestRunner -noloading org.xmlBlaster.test.contrib.dbwatcher.TestResultSetToXmlConverter java -Ddb.password=secret junit.swingui.TestRunner -noloading org.xmlBlaster.test.contrib.dbwatcher.TestTimestamp Add other I_ChangeDetector plugins which scale better for tables with huge amount of entries. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See API | org.xmlBlaster.contrib.dbwatcher.DbWatcher | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See API | org.xmlBlaster.contrib.dbwatcher.detector.MD5ChangeDetector | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See API | org.xmlBlaster.contrib.dbwatcher.detector.TimestampChangeDetector | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See REQ | engine.runlevel | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See TEST | org.xmlBlaster.test.contrib.dbwatcher.TestTimestamp | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
See TEST | org.xmlBlaster.test.contrib.dbwatcher.TestResultSetToXmlConverter |
This page is generated from the requirement XML file xmlBlaster/doc/requirements/contrib.dbwatcher.xml