1 """
2 __version__ = '$Revision: 1.7 $'
3 __date__ = '$Date: 2004-11-24 20:15:11 +0000 (Wed, 24 Nov 2004) $'
4 __author__ = 'spex66@gmx.net'
5 __license__ = 'pyBlaster is under LGPL, see http://www.xmlBlaster.org/license.html'
6
7 last change by $Author: ruff $
8
9 """
10
11 # Copyright (c) 2003 Peter Arwanitis
12 # mailto:spex66 @ gmx . net
13 # (=PA=)
14
15
16 """
17
18 8888888b. 888888b. 888 888
19 888 Y88b 888 '88b 888 888
20 888 888 888 .88P 888 888
21 888 d88P 888 888 8888888K. 888 8888b. .d8888b 888888 .d88b. 888d888
22 8888888P' 888 888 888 'Y88b 888 '88b 88K 888 d8P Y8b 888P'
23 888 888 888 888 888 888 .d888888 'Y8888b. 888 88888888 888
24 888 Y88b 888 888 d88P 888 888 888 X88 Y88b. Y8b. 888
25 888 'Y88888 8888888P' 888 'Y888888 88888P' 'Y888 'Y8888 888
26 888
27 Y8b d88P
28 'Y88P'
29
30
31 =======================================================
32 THE ABSTRACT
33
34 pyBlaster
35 The Python way ("The first steps" :-)) to use www.XMLBLASTER.org
36
37 A Python module that provides the complete XMLBLASTER interface for XMLRPC
38 This means for asynchronous updates (callbacks), too!
39
40 Fredrik Lundh has provided the excellent XMLRPC library for Python.
41 http://www.pythonware.com/products/xmlrpc/
42
43
44
45 Have fun and thanks to the XMLBLASTER-team!
46 http://www.xmlblaster.org
47
48
49 Peter Arwanitis
50 spex66 @ gmx . net
51 (=PA=)
52
53 =======================================================
54 THE DETAILS
55
56 Core file
57 pyBlaster.py
58
59 My 1st Step:
60 class XmlBlasterClient
61 Implementation of the complete(?) XMLRPC client interface
62 With just a little beautifying of the method-signatures
63
64 My 2nd Step:
65 class XmlBlasterCallbackClient
66 Specialisation of XmlBlasterClient with additional
67 threaded XMLRPC server implementation
68
69
70 Based on (if you have an uptodate installation, delete the provided files)
71 xmlrpclib.py / SimpleXMLRPCServer.py ( Version 1.0.1 )
72
73 Additional core files
74 BaseService.py class to comfortable handle threads
75 found in the Narval project from LOGILAB
76 http://www.logilab.org
77 ThreadedXMLRPCServer.py mixin class SimpleXMLRPCServer & BaseService
78 to build an threaded XMLRPCServer
79
80 ResponsiveThreadedXMLRPCServer thanks to
81 Robin Munn, I've figured out a XMLRPCServer
82 cooperative with threading, look at the
83 comments in new ResponsiveThreadingTCPServer.py
84
85 Optional files
86 ShellService.py mixin class BaseService & InteractiveConsole
87 to serve an interactive Python prompt (shell)
88 for debugging and testing
89
90
91 =======================================================
92 THE INSTALLATION
93
94 Put the pyBlaster directory (its an python package) into your
95 python/Lib/site-packages (or use it direct from the directory)
96
97
98 =======================================================
99 THE USAGE
100
101 Read the XMLBLASTER documentation / requirements, especially for the
102 "quality of service QoS" options.
103
104 In your python project:
105
106 # import
107 from pyBlaster import pyBlaster
108
109 # build an instance
110 xb = pyBlaster.XmlBlasterCallbackClient()
111
112 # start server / use client
113 # its up to you, thats all!
114
115 =======================================================
116 THE TEST
117
118 Developed under Python 2.2.2 with the XMLRPC update from pythonware
119
120 Success stories from Jython and other CPython version are appreciated!
121
122 Test (batteries included):
123 start pyBlaster.py in a shell and have a look at the help text
124 start pyBlaster.py in more than one shell and experiment interactive
125 with publish / subcribe / get <-- this is the python way :)
126
127
128
129 (=PA=)
130 """
131
132
133 from ThreadedXMLRPCServer import ResponsiveThreadedXMLRPCServer
134 import xmlrpclib
135 import sys
136 from ShellService import ShellService
137 from socket import gethostname
138
139 true, false = 1, 0 # Bool init
140
141 class XmlBlasterClient:
142 """Implementation of a client interface for XMLBLASTER
143 (docstrings copied from the java version)
144 """
145 def __init__(self, xmlblaster_url=None):
146 "Optional xmlblaster_url for direct connection"
147 self.xmlblaster_url = xmlblaster_url
148 self.proxy = None
149 self.sessionId = None
150
151 if xmlblaster_url:
152 self.connect(xmlblaster_url)
153
154 # CLIENT Interface #################################################
155
156 def connect(self, xmlblaster_url):
157 self.proxy = xmlrpclib.ServerProxy(xmlblaster_url)
158 print "\n==> ::CONNECT to XmlBlaster:: <=="
159 print ' Sucessful Server connect on ', xmlblaster_url
160 #self.proxy = xmlrpclib.Server(xmlblaster_url) # for old xmlrpclib versions
161
162 # XMLBLASTER
163
164 def login(self, username='guest', password='guest', callback_url=None, additionalConnectQos=''):
165 """
166 Do login to xmlBlaster.
167 @param additionalConnectQos For example "<session timeout='3600000' maxSessions='10'/>"
168 @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.connect.html
169 @deprecated Use connect() instead
170 @return The secret sessionId as a raw string
171 """
172
173 if callback_url:
174 _cb = "<callback type='XMLRPC'>%s</callback>" % callback_url
175 else:
176 _cb = ""
177
178 qos = "<qos>" + _cb + additionalConnectQos + "</qos>"
179
180 # remember the return secret value for further usage
181 self.sessionId = self.proxy.authenticate.login(username, password, qos, "")
182
183 print "==> ::LOGIN:: <=="
184 print ' Success with sessionId= ', self.sessionId
185
186 def logout(self):
187
188 print "==> ::LOGOUT:: <=="
189 self.proxy.authenticate.logout(self.sessionId)
190
191 def publish(self, xmlKey, content, qos):
192 """
193 Publish messages.
194
195 This variant allows to pass an array of MsgUnitRaw object, for performance reasons and
196 probably in future as an entity for transactions.
197
198 @see org.xmlBlaster.engine.RequestBroker
199 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
200 """
201
202 print "==> ::PUBLISH:: <=="
203 return self.proxy.xmlBlaster.publish(self.sessionId, xmlKey, content, qos)
204
205 def publishOneway(self, msgUnitArr):
206 """
207 Publish an array of messages.
208
209 The oneway variant may be used for better performance,
210 it is not returning a value (no application level ACK)
211 and there are no exceptions supported over the connection to the client.
212
213 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
214 """
215 print "==> ::PUBLISHONEWAY:: <=="
216 """
217 Hack: I don't think the oneway is supported by XmlRpc because http requests always
218 return something, for now we fake it using a publishArr() and ignore the returned
219 value (Marcel 2004-08-12)
220 """
221 #self.proxy.xmlBlaster.publishOneway(self.sessionId, msgUnitArr)
222 ignoreReturn = self.proxy.xmlBlaster.publishArr(self.sessionId, msgUnitArr)
223
224 def publishArr(self, msgUnitArr):
225 """
226 Publish an array of messages.
227
228 This variant may be used for better performance.
229
230 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
231 """
232 print "==> ::PUBLISHARR:: <=="
233 return self.proxy.xmlBlaster.publishArr(self.sessionId, msgUnitArr)
234
235 def subscribe(self, xmlKey, qos):
236 """
237 Subscribe to messages.
238
239 @param xmlKey_literal Depending on the security plugin this key is encrypted
240 @param subscribeQoS_literal Depending on the security plugin this qos is encrypted
241 @see org.xmlBlaster.engine.RequestBroker
242 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html">The interface.subscribe requirement</a>
243 """
244
245 print "==> ::SUBSCRIBE:: <=="
246 return self.proxy.xmlBlaster.subscribe(self.sessionId, xmlKey, qos)
247
248 def unSubscribe(self, xmlKey, qos):
249 """
250 unSubscribe from messages.
251
252 To pass the raw xml ASCII strings, use this method.
253
254 @param xmlKey_literal Depending on the security plugin this key is encrypted
255 @param unSubscribe QoS_literal Depending on the security plugin this qos is encrypted
256 @see org.xmlBlaster.engine.RequestBroker
257 """
258 print "==> ::unSubscribe :: <=="
259 return self.proxy.xmlBlaster.unSubscribe (self.sessionId, xmlKey, qos)
260
261 def get(self, xmlKey, qos):
262 """
263 Synchronous access a message.
264
265 @see org.xmlBlaster.engine.RequestBroker
266 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.get.html">The interface.get requirement</a>
267 """
268 print "==> ::GET:: <=="
269 return self.proxy.xmlBlaster.get(self.sessionId, xmlKey, qos)
270
271 def erase(self, xmlKey, qos):
272 """
273 Delete messages.
274 @see org.xmlBlaster.engine.RequestBroker
275 @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.erase.html">The interface.erase requirement</a>
276 """
277
278 print "==> ::ERASE:: <=="
279 return self.proxy.xmlBlaster.erase(self.sessionId, xmlKey, qos)
280
281 def printMessages(self, messages):
282 print " Received ", len(messages), " messages:"
283 for msg in messages:
284 key = msg[0]
285 content = msg[1] # content is of type xmlrpclib.Binary
286 qos = msg[2]
287 print " key=", key
288 print " content=", content.data, " bytes"
289 print " qos=", qos
290
291
292 class XmlBlasterCallbackClient(XmlBlasterClient):
293 """Specialication of the client class with the additional
294 implementation of the server interface
295
296 To use asynchronous update() or updateOneway() you can
297 subtype this class and override the methods with your own
298 project specific dispatchers.
299
300 Hint:
301 Remember that the calls occur in a seperate thread,
302 so usage of a threadsafe queue is always a good idea
303
304 Look for python cookbook hints on threaded programming
305
306 The XMLRPC callback server runs on an own port as a seperate thread
307 (docstrings copied from the java version)
308 """
309
310 def __init__(self, xmlblaster_url=None):
311
312 # INIT client part
313 XmlBlasterClient.__init__(self, xmlblaster_url)
314
315 self.callback_url = None
316 self.callback_server = None
317 self.shell = None
318
319 # Dispatcher Class (XMLRPC Server Interface) ###############################
320
321 class XB_CallbackDispatcher:
322 def __init__(self, xb_CallbackInstance):
323 self.xb_CallbackInstance=xb_CallbackInstance
324 def update(self, *attrs):
325 return self.xb_CallbackInstance.update(*attrs)
326 def updateOneway(self, *attrs):
327 return self.xb_CallbackInstance.updateOneway(*attrs)
328 def ping(self, *attrs):
329 return self.xb_CallbackInstance.ping(*attrs)
330
331 # Start / Stop Server ######################################################
332
333 def startCallbackServer(self, port=0) :
334 # Automatic port is allocated if port is 0
335
336 self.callback_server = ResponsiveThreadedXMLRPCServer(port,
337 dispatcherClass=self.XB_CallbackDispatcher,
338 callbackInstance=self
339 )
340
341 # thanks to Doug Palmer
342 allocated_port = self.callback_server.getConnectedPort()
343
344 #print 'autoport acquired:: ', allocated_port
345
346 self.callback_url = 'http://%s:%i/RPC2' % (gethostname(), allocated_port)
347
348 print "\n==> ::STARTCALLBACKSERVER:: <=="
349 print ' Success with callback_url= ', self.callback_url
350
351 self.callback_server.start()
352
353 def stopCallbackServer(self) :
354 print "\n==> ::STOPCALLBACKSERVER:: <=="
355 print " I'm dying... "
356 self.callback_server.stop()
357
358 print 'CBServer is alive? ', self.callback_server.isAlive()
359 print " ...good bye!"
360
361
362 # Start / Stop SHELL Service ##############################################
363
364 def startShellService(self):
365 if not self.shell:
366 print "\n==> ::STARTSHELLSERVICE:: <=="
367 self.shell = ShellService(engine=self, name='ShellService')
368 self.shell.start()
369
370 def stopShellService(self):
371 if self.shell:
372 print "\n==> ::STOPSHELLSERVICE:: <=="
373 self.shell.stop()
374
375 # Total Shutdown ##############################################
376 def shutdown(self):
377 "Closes all servers (joining all threads) and connections"
378
379 # XXX 080403 PA ? thread stopping isn't workink smooth yet... help appreciated!
380 # XXX 080503 PA ? callbackserver joins now, but not perfectly smooth
381 # with stopping the shellservice :-/
382
383 # from time to time shellservice ends automagically ?!?
384
385 print "\n==> ::SHUTDOWN Initiated:: <=="
386 if self.sessionId: self.logout()
387 if self.callback_server: self.stopCallbackServer()
388 if self.shell: self.stopShellService()
389 print "\n==> ::SHUTDOWN Completed:: <=="
390 #sys.exit(1)
391
392
393
394
395 # SERVER Interface ##########################################################
396
397 def update(self, sessionId, key, content, qos):
398 """ This is the callback method invoked from the server
399 informing the client in an asynchronous mode about new messages
400
401 You have to override this method in a specialication, to establish
402 your own logic.
403 """
404
405 print "==> ::UPDATE:: <=="
406 print " SessionId:: ", sessionId
407 print " Key:: ", key
408 print " Content:: ", content.data
409 print " QoS:: ", qos
410
411 return ""
412
413
414 def updateOneway(self, sessionId, key, content, qos):
415 """ This oneway method does not return something, it is high performing but
416 you loose the application level hand shake.
417 @see <a href="http://www.xmlBlaster.org/xmlBlaster/src/java/org/xmlBlaster/protocol/corba/xmlBlaster.idl" target="others">CORBA xmlBlaster.idl</a>
418
419 You have to override this method in a specialication, to establish
420 your own logic.
421 """
422 print "==> ::UPDATEONEWAY:: <=="
423 print " SessionId:: ", sessionId
424 print " Key:: ", key
425 print " Content:: ", content.data
426 print " QoS:: ", qos
427
428 def ping(self, qos):
429 """ Ping to check if the callback server is alive.
430 @param qos ""
431 @return ""
432 """
433 #print "==>::PING:: ", qos
434
435 return ""
436
437 # TEST with interaction #####################################################
438
439 __usage__ = """Usage from the shell:
440 python pyBlaster.py [xmlblaster_url,
441 callbackport,
442 username, password,
443 XPath subscription testphrase
444 ]
445
446 Example:
447 java -jar lib/xmlBlaster.jar
448 <-- starts XmlBlaster
449
450 pyBlaster.py http://<the-xb-machine>:8080 8081 me too first
451 <-- starts your first client with callbacks
452
453 pyBlaster.py http://<the-xb-machine>:8080 8082 you too second
454 <-- starts another one in another shell (on another computer)
455
456 XPath subscription testphrase: 'first' , 'second' or 'third'
457 look at the result of the five publish() calls, copy them for
458 further testing
459
460 try to publish something on your own :-)
461 the python interactive shell is your friend
462
463 Pythonshell:
464 _ is the instance of your XmlBlasterCallbackClient,
465 >>> dir(_)
466 shows the interface
467
468 >>> print _.publish.__doc__
469 gives a interactive look at the docstring
470 (it's just an example :-))
471
472 cause callbacks are mixed on the same output,
473 it's a bit messy sometimes :-)
474
475 <ctrl><pause> is killing the program, without ending
476 each thread and connection by hand
477
478 Hint:
479 Situation:
480 A client got a message with an oid (i.e. '3')
481 cause it's fitting an appropriate subsribe/XPath
482 Effect:
483 If the message with exactly _this_ oid is changed/altered,
484 the client recieves an update() on this, ignoring the
485 subscribe/XPath!
486
487 If you have an OID once, you have an subscription to all
488 changes, nice!
489
490 """
491
492 if __name__ == '__main__':
493 # ok, it is a raw usage of sys.argv, but there is no confusion
494 # through all the perfect modules to parse the parameters :-)
495 #
496 # for the beauty of option-parsing look elsewhere, thanks
497 try:
498 xmlblaster_url = sys.argv[1]
499 callbackport = int(sys.argv[2])
500 user = sys.argv[3]
501 passwd = sys.argv[4]
502 phrase = sys.argv[5]
503 except:
504 print __usage__
505 sys.exit()
506
507 xb = XmlBlasterCallbackClient(xmlblaster_url)
508 xb.startCallbackServer(callbackport)
509 xb.startShellService()
510
511
512 print """ _.login('%s', '%s', '%s')""" % (user, passwd, xb.callback_url)
513 additionalConnectQos = "<session timeout='3600000' maxSessions='10'/>"
514 xb.login(user, passwd, xb.callback_url, additionalConnectQos)
515
516 print """ _.subscribe("<key oid='' queryType='XPATH'>//%s</key>", "<qos/>")""" % phrase
517 xb.subscribe("<key oid='' queryType='XPATH'>//%s</key>" % phrase, "<qos/>")
518
519
520 print """ _.publish("<key oid='1'><first/></key>", 'First Type Message', "<qos></qos>")"""
521 publishRetQos = xb.publish("<key oid='1'><first/></key>", 'First Type Message', "<qos></qos>")
522 #print publishRetQos
523
524 print """ _.publish("<key oid='2'><second/></key>", 'Second Type Message', "<qos></qos>")"""
525 xb.publish("<key oid='2'><second/></key>", 'Second Type Message', "<qos></qos>")
526
527 print """ _.publish("<key oid='3'><first/></key>", 'First Type Message', "<qos></qos>")"""
528 xb.publish("<key oid='3'><first/></key>", 'First Type Message', "<qos></qos>")
529
530 print """ _.publish("<key oid='4'><third/></key>", 'Third Type Message', "<qos></qos>")"""
531 xb.publish("<key oid='4'><third/></key>", 'Third Type Message', "<qos></qos>")
532
533 print """ _.publish("<key oid='5'><third/></key>", 'Third Type Message', "<qos></qos>")"""
534 xb.publish("<key oid='5'><third/></key>", 'Third Type Message', "<qos></qos>")
535
536 #print """ _.publishArr([["<key oid='6'><first/></key>", '', "<qos/>"]]) ..."""
537 #publishArrRetQos = xb.publishArr([["<key oid='6'><first/></key>", '', "<qos/>"]])
538 #print publishArrRetQos
539
540 print """\n\nNow it's on you, the python-prompt is yours! Yes it's an python prompt !-)
541
542 _ <-- is the running instance
543
544 maybe a first attempt is to copy the printed calls and alter them...
545 """
546
547
syntax highlighted by Code2HTML, v. 0.9.1