00001 /*------------------------------------------------------------------------------ 00002 Name: ConnectQueueEntry.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 #include <util/queue/ConnectQueueEntry.h> 00007 #include <util/dispatch/I_ConnectionsHandler.h> 00008 #include <util/msgUtil.h> // from xmlBlaster C library 00009 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug); 00010 00011 namespace org { namespace xmlBlaster { namespace util { namespace queue { 00012 00013 using namespace std; 00014 using namespace org::xmlBlaster::util::qos; 00015 using namespace org::xmlBlaster::util::dispatch; 00016 00017 ConnectQueueEntry::ConnectQueueEntry(Global& global, const ConnectQosRef& connectQos, int priority, Timestamp uniqueId) 00018 : MsgQueueEntry(global, connectQos, 00019 org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT, 00020 priority, 00021 (connectQos.isNull() ? false : connectQos->isPersistent()), 00022 uniqueId) 00023 { 00024 ME = "ConnectQueueEntry"; 00025 if (log_.call()) log_.call(ME, "ctor ..."); 00026 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00027 } 00028 00030 ConnectQueueEntry::ConnectQueueEntry(const ConnectQueueEntry& rhs) 00031 : MsgQueueEntry(rhs.getGlobal(), rhs.getConnectQos(), rhs.getEmbeddedType(), rhs.getPriority(), rhs.isPersistent(), rhs.getUniqueId()) 00032 { 00033 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache 00034 } 00035 00037 ConnectQueueEntry& ConnectQueueEntry::operator=(const ConnectQueueEntry& rhs) 00038 { 00039 if (this == &rhs) 00040 return *this; 00041 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache 00042 return *this; 00043 } 00044 00045 MsgQueueEntry *ConnectQueueEntry::getClone() const 00046 { 00047 return new ConnectQueueEntry(*this); 00048 } 00049 00050 ConnectQueueEntry::~ConnectQueueEntry() { 00051 ::BlobHolder blob; 00052 blob.data = blobHolder_.data; 00053 blob.dataLen = blobHolder_.dataLen; 00054 ::freeBlobHolderContent(&blob); 00055 memset(&blobHolder_, 0, sizeof(BlobHolder)); 00056 } 00057 00058 bool ConnectQueueEntry::isConnect() const { 00059 return true; 00060 } 00061 00062 const void* ConnectQueueEntry::getEmbeddedObject() const 00063 { 00064 if (log_.call()) log_.call(ME, "getEmbeddedObject() ..."); 00065 if (connectQos_.isNull()) { 00066 return 0; 00067 } 00068 if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT)) // "MSG_RAW|connect" 00069 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + ".getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT + "', '" + embeddedType_ + "' is not known"); 00070 00071 if (blobHolder_.data != 0) // Cached 00072 return &blobHolder_; 00073 00074 //log_.info(ME+".getEmbeddedObject()", string("C++ connectQos=")+connectQos_->toXml()); 00075 00076 // dump MsgQueueEntry->connectQos_ with SOCKET protocol into C ::MsgUnit 00077 ::MsgUnit mu; 00078 memset(&mu, 0, sizeof(::MsgUnit)); 00079 mu.key = ""; 00080 string qosXml = connectQos_->toXml(); 00081 mu.qos = qosXml.c_str(); 00082 00083 /* 00084 if (log_.dump()) { 00085 char *p = ::messageUnitToXmlLimited(&mu, 100); 00086 log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p); 00087 ::xmlBlasterFree(p); 00088 } 00089 */ 00090 00091 // Serialize the message identical to the SOCKET protocol serialization 00092 // We use the functionality from our xmlBlaster C library 00093 ::BlobHolder blob = ::encodeMsgUnit(&mu, 0); 00094 00095 blobHolder_.data = blob.data; 00096 blobHolder_.dataLen = blob.dataLen; 00097 00098 if (log_.dump()) { 00099 char *p = ::blobDump(&blob); 00100 log_.dump(ME+".getEmbeddedObject()", string("Putting connect entry into queue:") + p); 00101 ::freeBlobDump(p); 00102 } 00103 00104 return &blobHolder_; 00105 } 00106 00107 const MsgQueueEntry& ConnectQueueEntry::send(I_ConnectionsHandler& connectionsHandler) const 00108 { 00109 if (log_.call()) log_.call(ME, "send"); 00110 if (log_.dump()) log_.dump(ME, string("send: ") + toXml()); 00111 connectReturnQos_ = connectionsHandler.connectRaw(connectQos_); 00112 // connectionsHandler.setConnectReturnQos(*connectReturnQos_); 00113 return *this; 00114 } 00115 00116 size_t ConnectQueueEntry::getSizeInBytes() const 00117 { 00118 if (!connectQos_.isNull()) { 00119 return sizeof(*connectQos_); // TODO: use toXml().size() ? 00120 } 00121 return 1024; 00122 } 00123 00124 ConnectQosRef ConnectQueueEntry::getConnectQos() const 00125 { 00126 return connectQos_; 00127 } 00128 00129 ConnectReturnQosRef ConnectQueueEntry::getConnectReturnQos() const 00130 { 00131 return connectReturnQos_; 00132 } 00133 00134 00135 string ConnectQueueEntry::toXml(const string& indent) const 00136 { 00137 string extraOffset = " " + indent; 00138 string ret = indent + "<connectQueueEntry>\n"; 00139 if (!connectQos_.isNull()) { 00140 ret += extraOffset + connectQos_->toXml(indent); 00141 } 00142 ret += indent + "</connectQueueEntry>\n"; 00143 return ret; 00144 } 00145 00146 }}}} // namespace 00147 00148