00001 /*---------------------------------------------------------------------------- 00002 Name: DefaultCallback.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Default implementation of the POA_serverIdl::BlasterCallback. 00006 -----------------------------------------------------------------------------*/ 00007 00008 #ifndef _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C 00009 #define _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C 00010 00011 #include <client/protocol/corba/DefaultCallback.h> 00012 #include <util/Global.h> 00013 00014 using namespace std; 00015 using namespace org::xmlBlaster::util; 00016 using namespace org::xmlBlaster::client; 00017 using namespace org::xmlBlaster::client::protocol::corba; 00018 using namespace org::xmlBlaster::client::qos; 00019 using namespace org::xmlBlaster::client::key; 00020 00021 00022 DefaultCallback::DefaultCallback(Global& global, const string &name, I_Callback *boss, 00023 /*BlasterCache*/ void* /*cache*/) 00024 :global_(global), log_(global.getLog("org.xmlBlaster.client.protocol.corba")), msgKeyFactory_(global), msgQosFactory_(global) 00025 { 00026 boss_ = boss; 00027 loginName_ = name; 00028 // cache_ = cache; 00029 if (log_.call()) log_.call(me(),"Entering constructor with argument"); 00030 } 00031 00048 serverIdl::XmlTypeArr* DefaultCallback::update(const char* sessionId, 00049 const serverIdl::MessageUnitArr& msgUnitArr) UPDATE_THROW_SPECIFIER 00050 { 00051 serverIdl::XmlTypeArr *res = new serverIdl::XmlTypeArr(msgUnitArr.length()); 00052 res->length(msgUnitArr.length()); 00053 00054 if (log_.call()) { log_.call(me(), "Receiving update of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); } 00055 00056 if (msgUnitArr.length() == 0) { 00057 log_.warn(me(), "Entering update() with 0 messages"); 00058 return res; 00059 } 00060 for (string::size_type i=0; i < msgUnitArr.length(); i++) { 00061 const serverIdl::MessageUnit &msgUnit = msgUnitArr[i]; 00062 UpdateKey *updateKey = 0; 00063 UpdateQos *updateQos = 0; 00064 try { 00065 if (log_.dump()) { 00066 log_.dump(me(), string("update: the key: ") + corbaWStringToString(msgUnit.xmlKey)); 00067 log_.dump(me(), string("update: the qos: ") + corbaWStringToString(msgUnit.qos)); 00068 } 00069 updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey))); 00070 updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos))); 00071 // Now we know all about the received msg, dump it or do 00072 // some checks 00073 if (log_.dump()) log_.dump("UpdateKey", string("\n") + updateKey->toXml()); 00074 if (log_.dump()) { 00075 string msg = "\n"; 00076 for (string::size_type j=0; j < msgUnit.content.length(); j++) 00077 msg += (char)msgUnit.content[j]; 00078 log_.dump("content", "Message received '" + msg + "' with size=" + lexical_cast<std::string>(msgUnit.content.length())); 00079 } 00080 if (log_.dump()) log_.dump("UpdateQos", "\n" + updateQos->toXml()); 00081 if (log_.trace()) log_.trace(me(), "Received message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName()); 00082 00083 //Checking whether the Update is for the Cache or for the boss 00084 //The boss should not be interested in cache updates 00085 bool forCache = false; 00086 // if( cache_ != null ) { 00087 // forCache = cache_.update(updateQos.getSubscriptionId(), 00088 // updateKey.toXml(), msgUnit.content); 00089 // } 00090 string oneRes = "<qos><state id='OK'/></qos>"; 00091 if (!forCache) { 00092 if (boss_) { 00093 int size = 0; 00094 size = msgUnit.content.length(); 00095 const unsigned char *content = NULL; 00096 if (size > 0) content = (const unsigned char*)&msgUnit.content[0]; 00097 if (log_.trace()) log_.trace(me(), "going to invoke client specific update"); 00098 oneRes = boss_->update(sessionId, *updateKey, content, size, *updateQos); 00099 // Call my boss 00100 } 00101 else log_.warn(me(), "can not update: no callback defined"); 00102 } 00103 (*res)[i] = toCorbaWString(oneRes); 00104 } 00105 catch (serverIdl::XmlBlasterException &e) { 00106 log_.error(me(), string(e.message) + " message is on error state: " + updateKey->toXml()); 00107 string oneRes = "<qos><state id='ERROR'/></qos>"; 00108 (*res)[i] = toCorbaWString(oneRes); 00109 } 00110 catch(...) { 00111 string tmp = "Exception caught in update() " + lexical_cast<std::string>(msgUnitArr.length()) + " messages are handled as not delivered"; 00112 log_.error(me(), tmp); 00113 throw serverIdl::XmlBlasterException("user.update.error", "org.xmlBlaster.client", 00114 "client update failed", "en", 00115 tmp.c_str(), "", "", "", "", 00116 "", ""); 00117 } 00118 00119 delete updateKey; 00120 delete updateQos; 00121 } // for every message 00122 return res; 00123 } 00124 00129 void DefaultCallback::updateOneway(const char* sessionId, 00130 const serverIdl::MessageUnitArr& msgUnitArr) PING_THROW_SPECIFIER 00131 { 00132 if (log_.call()) { log_.call(me(), "Receiving updateOneway of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); } 00133 00134 if (msgUnitArr.length() == 0) { 00135 log_.warn(me(), "Entering updateOneway() with 0 messages"); 00136 return; 00137 } 00138 00139 for (string::size_type i=0; i < msgUnitArr.length(); i++) { 00140 UpdateKey *updateKey = 0; 00141 UpdateQos *updateQos = 0; 00142 try { 00143 const serverIdl::MessageUnit &msgUnit = msgUnitArr[i]; 00144 try { 00145 updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey))); 00146 updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos))); 00147 } 00148 catch (serverIdl::XmlBlasterException &e) { 00149 log_.error(me(), string(e.message) ); 00150 } 00151 00152 if (log_.trace()) log_.trace(me(), "Received oneway message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName()); 00153 00154 if (boss_) { 00155 boss_->update(sessionId, *updateKey, 00156 (const unsigned char*)&msgUnit.content[0], 00157 msgUnit.content.length(), *updateQos); 00158 } 00159 else 00160 log_.warn(me(), "can not update: no callback defined"); 00161 } 00162 catch (const exception& e) { 00163 log_.error(me(), string("Exception caught in updateOneway(), it is not transferred to server: ") + e.what()); 00164 } 00165 catch(...) { 00166 log_.error(me(), "Exception caught in updateOneway(), it is not transferred to server"); 00167 } 00168 00169 delete updateKey; 00170 delete updateQos; 00171 } // for each message 00172 00173 } // updateOneway 00174 00179 char* DefaultCallback::ping(const char *qos) PING_THROW_SPECIFIER 00180 { 00181 if (log_.call()) log_.call(me(), "ping(" + string(qos) + ") ..."); 00182 return CORBA::string_dup(""); 00183 } // ping 00184 00185 00186 #endif