1 /*----------------------------------------------------------------------------
  2 Name:      DefaultCallback.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Default implementation of the POA_serverIdl::BlasterCallback.
  6 -----------------------------------------------------------------------------*/
  7 
  8 #ifndef _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
  9 #define _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
 10 
 11 #include <client/protocol/corba/DefaultCallback.h>
 12 #include <util/Global.h>
 13 
 14 using namespace std;
 15 using namespace org::xmlBlaster::util;
 16 using namespace org::xmlBlaster::client;
 17 using namespace org::xmlBlaster::client::protocol::corba;
 18 using namespace org::xmlBlaster::client::qos;
 19 using namespace org::xmlBlaster::client::key;
 20 
 21 
 22 DefaultCallback::DefaultCallback(Global& global, const string &name, I_Callback *boss,
 23                 /*BlasterCache*/ void* /*cache*/) 
 24 :global_(global), log_(global.getLog("org.xmlBlaster.client.protocol.corba")), msgKeyFactory_(global), msgQosFactory_(global)
 25 {
 26    boss_         = boss;
 27    loginName_    = name;
 28    // cache_ = cache;
 29    if (log_.call()) log_.call(me(),"Entering constructor with argument");
 30 }
 31 
 32 /**
 33  * This is the callback method invoked from the server
 34  * informing the client in an asynchronous mode about new messages.
 35  * <p />
 36  * You don't need to use this little method, but it nicely converts
 37  * the raw CORBA BlasterCallback.update() with raw Strings and arrays
 38  * in corresponding objects and calls for every received message
 39  * the I_Callback.update().
 40  * <p />
 41  * So you should implement in your client the I_Callback interface -
 42  * suppling the update() method.
 43  *
 44  * @param loginName        The name to whom the callback belongs
 45  * @param msgUnit      Contains a MessageUnit structs (your message)
 46  * @param qos              Quality of Service of the MessageUnit
 47  */
 48 serverIdl::XmlTypeArr* DefaultCallback::update(const char* sessionId,
 49                        const serverIdl::MessageUnitArr& msgUnitArr) UPDATE_THROW_SPECIFIER
 50 {
 51    serverIdl::XmlTypeArr *res = new serverIdl::XmlTypeArr(msgUnitArr.length());
 52    res->length(msgUnitArr.length());
 53 
 54    if (log_.call()) { log_.call(me(), "Receiving update of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
 55    
 56    if (msgUnitArr.length() == 0) {
 57       log_.warn(me(), "Entering update() with 0 messages");
 58       return res;
 59    }
 60    for (string::size_type i=0; i < msgUnitArr.length(); i++) {
 61       const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
 62       UpdateKey *updateKey = 0;
 63       UpdateQos *updateQos = 0;
 64       try {
 65          if (log_.dump()) {
 66             log_.dump(me(), string("update: the key: ") + corbaWStringToString(msgUnit.xmlKey));
 67             log_.dump(me(), string("update: the qos: ") + corbaWStringToString(msgUnit.qos));
 68          }
 69          updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
 70          updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
 71          // Now we know all about the received msg, dump it or do 
 72          // some checks
 73          if (log_.dump()) log_.dump("UpdateKey", string("\n") + updateKey->toXml());
 74          if (log_.dump()) {
 75             string msg = "\n";
 76             for (string::size_type j=0; j < msgUnit.content.length(); j++) 
 77                msg += (char)msgUnit.content[j];
 78             log_.dump("content", "Message received '" + msg + "' with size=" + lexical_cast<std::string>(msgUnit.content.length()));
 79          }
 80          if (log_.dump()) log_.dump("UpdateQos", "\n" + updateQos->toXml());
 81          if (log_.trace()) log_.trace(me(), "Received message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
 82 
 83          //Checking whether the Update is for the Cache or for the boss
 84          //The boss should not be interested in cache updates
 85          bool forCache = false;
 86          //          if( cache_ != null ) {
 87          //             forCache = cache_.update(updateQos.getSubscriptionId(), 
 88          //                                      updateKey.toXml(), msgUnit.content);
 89          //          }
 90          string oneRes = "<qos><state id='OK'/></qos>";
 91          if (!forCache) {
 92             if (boss_) {
 93                int size = 0;
 94                size = msgUnit.content.length();
 95                const unsigned char *content = NULL;
 96                if (size > 0) content = (const unsigned char*)&msgUnit.content[0];
 97                if (log_.trace()) log_.trace(me(), "going to invoke client specific update");
 98                oneRes = boss_->update(sessionId, *updateKey, content, size, *updateQos); 
 99                // Call my boss
100             }
101             else log_.warn(me(), "can not update: no callback defined");
102          }
103          (*res)[i] = toCorbaWString(oneRes);
104       } 
105       catch (serverIdl::XmlBlasterException &e) {
106          log_.error(me(), string(e.message) + " message is on error state: " + updateKey->toXml());
107          string oneRes = "<qos><state id='ERROR'/></qos>";
108          (*res)[i] = toCorbaWString(oneRes);
109       }
110       catch(...) {
111          string tmp = "Exception caught in update() " + lexical_cast<std::string>(msgUnitArr.length()) + " messages are handled as not delivered";
112          log_.error(me(), tmp);
113          throw serverIdl::XmlBlasterException("user.update.error", "org.xmlBlaster.client", 
114                                               "client update failed", "en",
115                                               tmp.c_str(), "", "", "", "", 
116                                               "", "");
117       }
118 
119       delete updateKey;
120       delete updateQos;
121    } // for every message
122    return res;
123 }
124 
125       /**
126        * This is the oneway variant, not returning a value (no application level ACK). 
127        * @see update()
128        */
129 void DefaultCallback::updateOneway(const char* sessionId,
130                       const serverIdl::MessageUnitArr& msgUnitArr) PING_THROW_SPECIFIER
131 {
132    if (log_.call()) { log_.call(me(), "Receiving updateOneway of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
133    
134    if (msgUnitArr.length() == 0) {
135       log_.warn(me(), "Entering updateOneway() with 0 messages");
136       return;
137    }
138 
139    for (string::size_type i=0; i < msgUnitArr.length(); i++) {
140       UpdateKey *updateKey = 0;
141       UpdateQos *updateQos = 0;
142       try {
143          const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
144          try {
145             updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
146             updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
147          } 
148          catch (serverIdl::XmlBlasterException &e) {
149             log_.error(me(), string(e.message) );
150          }
151 
152          if (log_.trace()) log_.trace(me(), "Received oneway message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
153 
154          if (boss_) {
155             boss_->update(sessionId, *updateKey,
156                            (const unsigned char*)&msgUnit.content[0], 
157                            msgUnit.content.length(), *updateQos); 
158          }
159          else
160             log_.warn(me(), "can not update: no callback defined");
161       }
162       catch (const exception& e) {
163          log_.error(me(), string("Exception caught in updateOneway(), it is not transferred to server: ") + e.what());
164       }
165       catch(...) {
166          log_.error(me(), "Exception caught in updateOneway(), it is not transferred to server");
167       }
168 
169       delete updateKey;
170       delete updateQos;
171    } // for each message
172 
173 } // updateOneway
174 
175 /**
176  * Check the callback server.
177  * @see xmlBlaster.idl
178  */
179 char* DefaultCallback::ping(const char *qos) PING_THROW_SPECIFIER
180 {
181    if (log_.call()) log_.call(me(), "ping(" + string(qos) + ") ...");
182    return CORBA::string_dup("");
183 } // ping
184 
185 
186 #endif


syntax highlighted by Code2HTML, v. 0.9.1