XmlBlaster Logo

REQUIREMENT

contrib.filewatcher

XmlBlaster Logo


Type NEW
Priority HIGH
Status CLOSED
Topic You can publish messages by putting files on a certain location in the file system.
Des
cription
This is a native plugin which is listening on a certain directory in the file system. It detects by polling on the directory if a new file has arrived. Once the plugin assumes it has completely written, it will read its contents and publish them.
  • The Watcher is running as a native xmlBlaster plugin
  • Every registered Watcher is listening on exactly one single directory
  • The polling interval is expressed in ms and is configurable
  • All files in a directory on which a Watcher instance is listening, are published on a single configurable topic.
  • It is possible to specify a filter for the file name, then only the files matching the filter will be processed by the current plugin instance.
  • It is possible to have several plugins polling for the same directory provided they all have a filter specified. The choice of the filters must be such to avoid that the same file will match more than one filter. If the user does not ensure that, unpredicted behavior will result.
  • After a successful publish of the content of a file, the file is removed from the file system. Alternatively, if a 'sent' directory is specified in the configuration, processed files will be moved to that destination. If for some reason it is not possible to move the file to that direction (for example if necessary rights are missing), then the plugin will continue to poll but will not process any file anymore until the problem has been manually removed.
  • It is possible to configure a maximum file size. Files bigger than the specified size are not published. If a 'discarded' directory is specified in the properties, then the file will be moved to that directory, otherwise it will be deleted.
  • For large or huge files it is possible to send them over xmlBlaster in a chunkwise manner, that is they are divided into smaller messages. The maximum size of these chunks is configurable.
  • When moving files, wether to the 'sent' or 'discarded' directories, if a file with the same relative name already exists, it is overwritten.
  • Detection of 'file ready for publishing':

    If lockExtention is specified in the plugin configuration, then publishing of a file is controlled by a lock file. This means that if you specified *.lck as the lockExtention, and want to publish a file called someFile.gif, then the publishing of the file will be inhibited until a file someFile.gif.lck will be present in the directory.

    The plugin will wait delaySinceLastFileChange milliseconds since the last modification of the file to be published (date and size are checked) before processing anyway. This way, by choosing a sufficiently long delaySinceLastFileChange it is possible to work in an optimistic way without any lock file.

    Note however that this optimistic publishing is not to 100% safe. Suppose the writing of the file to be published is stalled for a time which is longer than what specified in delaySinceLastFileChange, then the file will be published in its incomplete state.

  • If permissions to read the directory are missing or xmlBlaster has no read rights on a specific file, or if an IO exception occurs when reading a file or trying to delete or move a file, an error is reported (logged) and the plugin does temporarly stop working. (to ensure correct sequence). The plugin continues however to poll and when the rights are manually fixed, it continues working again.
  • To be able to track the message origin, filename and date are sent in the client properties as:
    • _filename
    • (the relative file name as a String)
    • _fileDate
    • (the timestamp of the file in ms as a long)
  • To listen to different directories and/or to publish on different topics you need to configure different plugins. When doing so assign to each plugin a different id (this is actually a general requirement for the plugins).
  • The files to be published are sorted in the first place chronologically. Files having the same last change timestamp are sorted alphabetically. The best precision for the timestamp is one millisecond. If the operating system does not offer such a precision, the precision of the operating system will be determinant. Also, the timestamp is the last change timestamp. Depending on the operating system this time could be the time when the writing operation is started, when it is ended or in between. This would have to be considered when the applications writing to the directory on which the filewatcher is listening are working multithreaded.
  • It is possible to make the filewatcher be part of a file replication workflow. Its counterpart, i.e. the instance which writes the files on reception is the file writer. It also can be configured to be monitored and administered using the monitor for the replication.
Example
any

Testing your filter pattern

You have two options to test your regular expression or your 'simple' filter in your configuration.

First, you can use the jconsole (see admin.jmx requirement) to test your filter pattern. Go to the MBean -> org.xmlBlaster -> node -> [your cluster name] -> service -> FileWatcherPlugin[xxx] area. Here you can adjust all settings dynamically and test the settings under the tab Operations by clicking the triggerScan button.

Further you can test the pattern on command line:

 java org.xmlBlaster.contrib.filewatcher.DirectoryManager -path /tmp/filewatcher -filter "*.xml" -filterType simple 

 java org.xmlBlaster.contrib.filewatcher.DirectoryManager -path /tmp/filewatcher -filter "hi.*\.xml" -filterType regex 
Configure

The plugin is configured in the xmlBlasterPlugins.xml configuration file. Here an example where all properties are used:


  <xmlBlaster>
    ...
     <plugin id='FileWatcherPlugin' className='org.xmlBlaster.contrib.filewatcher.FileWatcherPlugin'>
        <attribute id='mom.topicName'>watcher.test</attribute>
        <attribute id='mom.publishKey'><key oid='watcher.test'><watcher><test/></watcher></key></attribute>
        <attribute id='mom.publishQos'><qos><expiration lifeTime='4000'/></qos></attribute>
        <attribute id='mom.connectQos'></attribute>
        <attribute id='mom.loginName'>fritz</attribute>
        <attribute id='mom.password'>secret</attribute>
        <attribute id='filewatcher.directoryName'>/tmp/fileWatcher</attribute>
        <attribute id='filewatcher.maximumFileSize'>10000000</attribute>
        <attribute id='filewatcher.delaySinceLastFileChange'>10000</attribute>
        <attribute id='filewatcher.pollInterval'>2000</attribute>
        <attribute id='filewatcher.sent'>Sent</attribute>
        <attribute id='filewatcher.discarded'>Discarded</attribute>
        <attribute id='filewatcher.fileFilter'></attribute>
        <attribute id='filewatcher.lockExtention'>*.lck</attribute>
        <attribute id='filewatcher.filterType'>simple</attribute>
        <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/>
        <action do='STOP' onShutdownRunlevel='6' sequence='5'/>
     </plugin>
   ...

 </xmlBlaster>

and here a configuration for a filewatcher which has to be part of a file replication (i.e. which has to be monitored by a replication monitor:

     <plugin create="true" id='fileSet02' className='org.xmlBlaster.contrib.filewatcher.FileWatcherPlugin'>
        <attribute id='filewatcher.directoryName'>/tmp/fileWatcher</attribute>
        <attribute id='mom.publishQos'><qos><persistent/></qos></attribute>
        <attribute id='mom.loginName'>watcher01/1</attribute>
        <attribute id='mom.password'>secret</attribute>
	<!-- maximum file size would be 2 GB in this case -->
        <attribute id='filewatcher.maximumFileSize'>2147483648</attribute>
	<!-- maximum chunk size would be 256 kB in this case -->
        <attribute id='filewatcher.maximumChunkSize'>262144</attribute>
        <!-- optimistically waits 10 minutes after last change in file size -->
        <attribute id='filewatcher.delaySinceLastFileChange'>600000</attribute>
        <!-- polls every 5 minutes to see if there are new files -->
        <attribute id='filewatcher.pollInterval'>300000</attribute>
        <attribute id='filewatcher.sent'>Sent</attribute>
        <attribute id='filewatcher.discarded'>Discarded</attribute>
        <attribute id='filewatcher.filterType'>simple</attribute>
        <!-- this token will be replaced with the id of the plugin: here 'fileSet02' -->
        <attribute id='replication.prefix'>${id}</attribute>
        <action do='LOAD' onStartupRunlevel='7' sequence='3' onFail='resource.configuration.pluginFailed'/>
        <action do='STOP' onShutdownRunlevel='6' sequence='7'/>
     </plugin>

and here a minimalistic one:

  <xmlBlaster>
    ...

     <plugin id='FileWatcherPlugin' className='org.xmlBlaster.client.filewatcher.FileWatcherPlugin'>
        <attribute id='filewatcher.directoryName'>/tmp/fileWatcher</attribute>
        <attribute id='mom.topicName'>watcher.test</attribute>
        <!-- determines on which level to start/shutdown the plugin and what to do in case of an exception -->
        <action do='LOAD' onStartupRunlevel='9' sequence='6' onFail='resource.configuration.pluginFailed'/>
        <action do='STOP' onShutdownRunlevel='6' sequence='5'/>
     </plugin>
   ...

 </xmlBlaster>

and here a more detailed description about each attribute:
id (name) type default value Comment
filwatcher.directoryName String null mandatory: The name of the directory on which this plugin instance will listen. If the directory does not exist, one will be created. You can specify an absolute name or a relative one. A relative one is based on the ${user.home}.
mom.topicName String null mandatory unless publishKey defined. If both are specified this attribute is ignored and a warning is logged. This is the name of the topic on which to publish (the oid).
mom.publishKey String null optional if replication.prefix has been set. mandatory if neither replication.prefix nor mom.topicName have been set. You should not set it if you have set mom.topicName: doing so, a warning is written to the logs and this key has precedence over the mom.topicName. If you need to specify a more complex key for the publishing (for example to make it searcheable) of the messages you can specify it here. Note that you can only specify one single key which will be used for all messages published by this plugin. If you need to differentiate you can use more plugins.
mom.publishQos String null optional. If you need to specify a more complex qos for the publishing you can specify it here. Note that you can only specify one single qos which will be used for all messages published by this plugin. If you need to differentiate you can use more plugins.
mom.connectQos String null optional. If you need to specify more complex connection behavior you can do it here. Note however that this is a plugin which per default uses the local protocol.
mom.loginName String '_' + id of plugin optional. If the id of the plugin is watcher1, then the default loginName is _watcher1. If you specify a connectQos, then this attribute is ignored.
mom.password String null optional. This is the password to be used to authenticate the loginName. If you specify a connectQos, then this attribute is ignored.
filewatcher.pollInterval long 2000 optional: The time in milliseconds the plugin sleeps between each poll sweep. This time is also the one used to retry an action which has failed (reading, deleting or moving a file).
filewatcher.delaySinceLastFileChange long 10000 optional: This is the time expressed in milliseconds which the plugin will wait before publishing a file after its latest detected change. The plugin detects changes on discrete times (every time it polls), so the acurancy is never better than what specified in pollInterval. A file is considered to have changed either if its size has changed or its date. The plugin always waits what specified here even when using a lock file. So if you always control your publishes with a locking file you can set this attribute to zero to avoid delay.
filewatcher.maximumFileSize long 10000000 optional: The maximum file size expressed in bytes to be allowed to be published. If the file exceeds this size, then it is not published. It is either deleted or moved to the directory specified in the discarded attribute.
filewatcher.maximumChunkSize long optional: The maximum chunk size expressed in bytes which will be published in a single message. If the total file size to be published is bigger than this number, the file is published in several such chunks.
filewatcher.fileFilter String null optional: If you want this plugin to be listening only on a specified filename extention you can specify it here. Let's say you only want files with the gif extention to be sent, then you specify *.gif. If something is specified here, files on the directory not matching this pattern are left untouched and are not published.
filewatcher.lockExtention String null optional: If you want to use the locking approach you need to specify here the extention of the lock files. The syntax is of the kind *.lck, i.e. the first character is a '*'.
filewatcher.sent String null optional: If nothing is specified here, files are deleted after having been published. If something is specified here it is assumed to be a directory and files published are moved to this location. If the directory does not exist, one is created once the plugin is initialized. You can either specify an absolute or a relative directory name. Relative names are put in the directory specified in the directoryName attribute.
filewatcher.discarded String null optional: If nothing is specified here, files which are too big to be published are deleted. If something is specified it is assumed to be a directory and the files which are too bif are moved to this location. If the directory does not exist, one is created once the plugin is initialized. You can either specify an absolute or a relative directory name. Relative names are put in the directory specified in the directoryName attribute.
filewatcher.filterType String simple optional: If nothing is specified here, 'simple' is assumed. Valid types are 'simple' and 'regex'. This attribute decides which filter to use when processing both the fileFilter. If you choose 'simple' then you can specify filters of the kind '*.xml' which are very simple to be used but somehow limited if you want more advanced things to be done. The type 'regex' allows you to use regex syntax for the fileFilter. Note that lockExtention is not affected by this flag and is always considered of type 'simple'.
filewatcher.copyOnMove boolean true optional: This flag has only effect if sent or discarded have been defined. If this flag is set to false then the file will be simply moved (or renamed). This is fast but it does not work on all OS systems. Particularly on non-local file systems this is known to fail. Leaving this flag true (default) forces a complete copy/delete of the file. This is slower but works even on file systems mounted remotely like nfs, samba or cifs.
filewatcher.recursive boolean false optional: If you set this flag to true, the watcher will look in all the subdirectories (childs, grandchilds and so on) of the directory. The directories specified for sent and discarded are not scanned to avoid recursivity. In the Client Properties of the published message the _filename attribute is always containing the relative filename. An additional client property _subdir will contain the name of the subdirectory relative to the directory specified in the configuration. For example if you configured your FileWatcher to look into /tmp/someBaseDir and you have a file /tmp/someBaseDir/one/two/three.dat, then _filename='three.dat' and _subdir='one/two'. Note that _subdir is only provided on subdirectories, not on the base directory.
replication.prefix String optional: This identifier (which must be unique) tells xmlBlaster that this filewatcher wants to be monitored via the replication monitor. This is a parameter which has currenlty no effect on the replication of files.
replication.version String 1.0 optional: This identifier has only effect if you have set replication.prefix. It tells the replication mechanism which version of the replication data you are using. This can normally be left untouched. If you don't specify anything and choosed replication it will set it to "1.0".

NOTE: Configuration parameters are specified on command line (-someValue 17) or in the xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.

See REQ engine.runlevel
See TEST org.xmlBlaster.test.contrib.TestFileWatcherPlugin

This page is generated from the requirement XML file xmlBlaster/doc/requirements/contrib.filewatcher.xml

Back to overview