1 /*----------------------------------------------------------------------------
2 Name: DefaultCallback.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Default implementation of the POA_serverIdl::BlasterCallback.
6 -----------------------------------------------------------------------------*/
7
8 #ifndef _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
9 #define _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
10
11 #include <client/protocol/corba/DefaultCallback.h>
12 #include <util/Global.h>
13
14 using namespace std;
15 using namespace org::xmlBlaster::util;
16 using namespace org::xmlBlaster::client;
17 using namespace org::xmlBlaster::client::protocol::corba;
18 using namespace org::xmlBlaster::client::qos;
19 using namespace org::xmlBlaster::client::key;
20
21
22 DefaultCallback::DefaultCallback(Global& global, const string &name, I_Callback *boss,
23 /*BlasterCache*/ void* /*cache*/)
24 :global_(global), log_(global.getLog("org.xmlBlaster.client.protocol.corba")), msgKeyFactory_(global), msgQosFactory_(global)
25 {
26 boss_ = boss;
27 loginName_ = name;
28 // cache_ = cache;
29 if (log_.call()) log_.call(me(),"Entering constructor with argument");
30 }
31
32 /**
33 * This is the callback method invoked from the server
34 * informing the client in an asynchronous mode about new messages.
35 * <p />
36 * You don't need to use this little method, but it nicely converts
37 * the raw CORBA BlasterCallback.update() with raw Strings and arrays
38 * in corresponding objects and calls for every received message
39 * the I_Callback.update().
40 * <p />
41 * So you should implement in your client the I_Callback interface -
42 * suppling the update() method.
43 *
44 * @param loginName The name to whom the callback belongs
45 * @param msgUnit Contains a MessageUnit structs (your message)
46 * @param qos Quality of Service of the MessageUnit
47 */
48 serverIdl::XmlTypeArr* DefaultCallback::update(const char* sessionId,
49 const serverIdl::MessageUnitArr& msgUnitArr) UPDATE_THROW_SPECIFIER
50 {
51 serverIdl::XmlTypeArr *res = new serverIdl::XmlTypeArr(msgUnitArr.length());
52 res->length(msgUnitArr.length());
53
54 if (log_.call()) { log_.call(me(), "Receiving update of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
55
56 if (msgUnitArr.length() == 0) {
57 log_.warn(me(), "Entering update() with 0 messages");
58 return res;
59 }
60 for (string::size_type i=0; i < msgUnitArr.length(); i++) {
61 const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
62 UpdateKey *updateKey = 0;
63 UpdateQos *updateQos = 0;
64 try {
65 if (log_.dump()) {
66 log_.dump(me(), string("update: the key: ") + corbaWStringToString(msgUnit.xmlKey));
67 log_.dump(me(), string("update: the qos: ") + corbaWStringToString(msgUnit.qos));
68 }
69 updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
70 updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
71 // Now we know all about the received msg, dump it or do
72 // some checks
73 if (log_.dump()) log_.dump("UpdateKey", string("\n") + updateKey->toXml());
74 if (log_.dump()) {
75 string msg = "\n";
76 for (string::size_type j=0; j < msgUnit.content.length(); j++)
77 msg += (char)msgUnit.content[j];
78 log_.dump("content", "Message received '" + msg + "' with size=" + lexical_cast<std::string>(msgUnit.content.length()));
79 }
80 if (log_.dump()) log_.dump("UpdateQos", "\n" + updateQos->toXml());
81 if (log_.trace()) log_.trace(me(), "Received message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
82
83 //Checking whether the Update is for the Cache or for the boss
84 //The boss should not be interested in cache updates
85 bool forCache = false;
86 // if( cache_ != null ) {
87 // forCache = cache_.update(updateQos.getSubscriptionId(),
88 // updateKey.toXml(), msgUnit.content);
89 // }
90 string oneRes = "<qos><state id='OK'/></qos>";
91 if (!forCache) {
92 if (boss_) {
93 int size = 0;
94 size = msgUnit.content.length();
95 const unsigned char *content = NULL;
96 if (size > 0) content = (const unsigned char*)&msgUnit.content[0];
97 if (log_.trace()) log_.trace(me(), "going to invoke client specific update");
98 oneRes = boss_->update(sessionId, *updateKey, content, size, *updateQos);
99 // Call my boss
100 }
101 else log_.warn(me(), "can not update: no callback defined");
102 }
103 (*res)[i] = toCorbaWString(oneRes);
104 }
105 catch (serverIdl::XmlBlasterException &e) {
106 log_.error(me(), string(e.message) + " message is on error state: " + updateKey->toXml());
107 string oneRes = "<qos><state id='ERROR'/></qos>";
108 (*res)[i] = toCorbaWString(oneRes);
109 }
110 catch(...) {
111 string tmp = "Exception caught in update() " + lexical_cast<std::string>(msgUnitArr.length()) + " messages are handled as not delivered";
112 log_.error(me(), tmp);
113 throw serverIdl::XmlBlasterException("user.update.error", "org.xmlBlaster.client",
114 "client update failed", "en",
115 tmp.c_str(), "", "", "", "",
116 "", "");
117 }
118
119 delete updateKey;
120 delete updateQos;
121 } // for every message
122 return res;
123 }
124
125 /**
126 * This is the oneway variant, not returning a value (no application level ACK).
127 * @see update()
128 */
129 void DefaultCallback::updateOneway(const char* sessionId,
130 const serverIdl::MessageUnitArr& msgUnitArr) PING_THROW_SPECIFIER
131 {
132 if (log_.call()) { log_.call(me(), "Receiving updateOneway of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
133
134 if (msgUnitArr.length() == 0) {
135 log_.warn(me(), "Entering updateOneway() with 0 messages");
136 return;
137 }
138
139 for (string::size_type i=0; i < msgUnitArr.length(); i++) {
140 UpdateKey *updateKey = 0;
141 UpdateQos *updateQos = 0;
142 try {
143 const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
144 try {
145 updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
146 updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
147 }
148 catch (serverIdl::XmlBlasterException &e) {
149 log_.error(me(), string(e.message) );
150 }
151
152 if (log_.trace()) log_.trace(me(), "Received oneway message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
153
154 if (boss_) {
155 boss_->update(sessionId, *updateKey,
156 (const unsigned char*)&msgUnit.content[0],
157 msgUnit.content.length(), *updateQos);
158 }
159 else
160 log_.warn(me(), "can not update: no callback defined");
161 }
162 catch (const exception& e) {
163 log_.error(me(), string("Exception caught in updateOneway(), it is not transferred to server: ") + e.what());
164 }
165 catch(...) {
166 log_.error(me(), "Exception caught in updateOneway(), it is not transferred to server");
167 }
168
169 delete updateKey;
170 delete updateQos;
171 } // for each message
172
173 } // updateOneway
174
175 /**
176 * Check the callback server.
177 * @see xmlBlaster.idl
178 */
179 char* DefaultCallback::ping(const char *qos) PING_THROW_SPECIFIER
180 {
181 if (log_.call()) log_.call(me(), "ping(" + string(qos) + ") ...");
182 return CORBA::string_dup("");
183 } // ping
184
185
186 #endif
syntax highlighted by Code2HTML, v. 0.9.1