1 #!/usr/bin/perl
2 # Invoke
3 # perl subscribePoller.pl http://myHost:8080/
4 # if xmlBlaster runs on 'myHost'
5 #
6 # Work around as we don't have a callback server
7 # to not loose asynchronously arriving messages for us.
8 #
9 # Connects with a persistent session 'joe/1',
10 # subscribes on topic 'myTopic'
11 # and synchronously polls the callback queue
12 # with get() for arrived messages.
13 # The session/subscription is persistent and
14 # we never loose any message even if the script
15 # terminates for a while.
16 #
17 # Test setup:
18 #
19 # Start server
20 # java -Dcom.sun.management.jmxremote org.xmlBlaster.Main
21 # (you can use JDK's 'jconsole' to observe the server status)
22 #
23 # Start a publisher to send test messages
24 # java javaclients.HelloWorldPublish -oid myTopic -numPublish 20
25 #
26 # @author Marcel Ruff
27
28 use Frontier::Client;
29 use MIME::Base64;
30
31 $server_url = @ARGV[0];
32 if ($#ARGV == -1) {
33 $host = `uname -n`;
34 $host =~ s/^\s*(.*?)\s*$/$1/;
35 $server_url = "http://" . $host . ":8080/"; # guess where xmlBlaster is
36 }
37 print "\nTrying to connect to xmlBlaster server on $server_url ...\n";
38
39 $server = Frontier::Client->new(url => $server_url);
40 print "Connected to xmlBlaster server on $server_url \n";
41
42 # Login and set dispatcherActive='false' as we have no callback server
43 # Use a fake EMAIL callback protocol to satisfy xmlBlaster
44 # We are only interested in the callback queue to hold the messages
45 $sessionId = $server->call('authenticate.login', "ben", "secret",
46 "<qos>
47 <session name='joe/3' timeout='-1'/>
48 <persistent/>
49 <queue relating='callback' maxEntries='1000'>
50 <callback type='EMAIL' retries='-1' pingInterval='0' dispatcherActive='false'>
51 a@b
52 </callback>
53 </queue>
54 </qos>", "");
55
56 print "Login success, got secret sessionId=$sessionId \n";
57
58 # Subscribe with persistence flag to survive server restart
59 # Subscribe once is enough as we have a persistent session 'joe/3'.
60 # To avoid duplicate subscriptions on restart of this script
61 # we set multiSubscribe to false
62 $topicId = 'myTopic';
63 $returnQos = $server->call('xmlBlaster.subscribe', $sessionId,
64 "<key oid='" . $topicId . "'/>",
65 "<qos>
66 <multiSubscribe>false</multiSubscribe>
67 <persistent>true</persistent>
68 </qos>");
69 print "\nResult for a subscribe(" . $topicId . "):\n------------", $returnQos, "\n------------\n";
70
71 # Poll for messages
72 while (true) {
73 $queryKey = "<key oid='__cmd:client/joe/3/?cbQueueEntries'/>";
74 # Access the callback queue and consume all messages from there
75 @msgUnits = $server->call('xmlBlaster.get', $sessionId, $queryKey,
76 "<qos>
77 <querySpec type='QueueQuery'>
78 <![CDATA[maxEntries=-1&maxSize=-1&consumable=true&waitingDelay=0]]>
79 </querySpec>
80 </qos>");
81
82 print "\nResults for a get($queryKey):";
83 for $i (0 .. $#msgUnits) {
84 for $j (0 .. $#{$msgUnits[$i]}) {
85 print "\n-------------#$j-------------------";
86 $key = $msgUnits[$i][$j][0];
87 $contentBase64AndEncoded = $msgUnits[$i][$j][1];
88 $content = decode_base64($contentBase64AndEncoded->value());
89 $qos = $msgUnits[$i][$j][2];
90 print $key;
91 print "\n<content>" . $content . "</content>\n";
92 print $qos; # TODO: re-subscribe if an ERASE arrives
93 print "\n-------------#$j-------------------\n";
94 }
95 }
96 sleep(2); # Poll invterval set to 2 seconds
97 }
98
99 # No logout from xmlBlaster to keep the session 'joe/3'
100 #$server->call('authenticate.logout', $sessionId);
101
102 print "\nKeeping session, bye.\n";
syntax highlighted by Code2HTML, v. 0.9.1