util/queue/ConnectQueueEntry.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      ConnectQueueEntry.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 #include <util/queue/ConnectQueueEntry.h>
00007 #include <util/dispatch/I_ConnectionsHandler.h>
00008 #include <util/msgUtil.h> // from xmlBlaster C library
00009 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
00010 
00011 namespace org { namespace xmlBlaster { namespace util { namespace queue {
00012 
00013 using namespace std;
00014 using namespace org::xmlBlaster::util::qos;
00015 using namespace org::xmlBlaster::util::dispatch;
00016 
00017 ConnectQueueEntry::ConnectQueueEntry(Global& global, const ConnectQosRef& connectQos, int priority, Timestamp uniqueId)
00018    : MsgQueueEntry(global, connectQos,
00019                    org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT,
00020                    priority,
00021                    (connectQos.isNull() ? false : connectQos->isPersistent()),
00022                    uniqueId)
00023 {
00024    ME = "ConnectQueueEntry";
00025    if (log_.call()) log_.call(ME, "ctor ...");
00026    memset(&blobHolder_, 0, sizeof(BlobHolder));
00027 }
00028 
00030 ConnectQueueEntry::ConnectQueueEntry(const ConnectQueueEntry& rhs)
00031     : MsgQueueEntry(rhs.getGlobal(), rhs.getConnectQos(), rhs.getEmbeddedType(), rhs.getPriority(), rhs.isPersistent(), rhs.getUniqueId())
00032 {
00033    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
00034 }
00035 
00037 ConnectQueueEntry& ConnectQueueEntry::operator=(const ConnectQueueEntry& rhs)
00038 {
00039    if (this == &rhs)
00040       return *this;
00041    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
00042    return *this;
00043 }
00044 
00045 MsgQueueEntry *ConnectQueueEntry::getClone() const
00046 {
00047    return new ConnectQueueEntry(*this);
00048 }
00049 
00050 ConnectQueueEntry::~ConnectQueueEntry() {
00051    ::BlobHolder blob;
00052    blob.data    = blobHolder_.data;
00053    blob.dataLen = blobHolder_.dataLen;
00054    ::freeBlobHolderContent(&blob);
00055    memset(&blobHolder_, 0, sizeof(BlobHolder));
00056 }
00057 
00058 bool ConnectQueueEntry::isConnect() const {
00059    return true;
00060 }
00061 
00062 const void* ConnectQueueEntry::getEmbeddedObject() const
00063 {
00064    if (log_.call()) log_.call(ME, "getEmbeddedObject() ...");
00065    if (connectQos_.isNull()) {
00066       return 0;
00067    }
00068    if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT)) // "MSG_RAW|connect"
00069       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + ".getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT + "', '" + embeddedType_ + "' is not known");
00070 
00071    if (blobHolder_.data != 0) // Cached
00072       return &blobHolder_;
00073 
00074    //log_.info(ME+".getEmbeddedObject()", string("C++ connectQos=")+connectQos_->toXml());
00075 
00076    // dump MsgQueueEntry->connectQos_ with SOCKET protocol into C ::MsgUnit
00077    ::MsgUnit mu;
00078    memset(&mu, 0, sizeof(::MsgUnit));
00079    mu.key = "";
00080    string qosXml = connectQos_->toXml();
00081    mu.qos = qosXml.c_str();
00082 
00083    /*
00084    if (log_.dump()) {
00085       char *p = ::messageUnitToXmlLimited(&mu, 100);
00086       log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
00087       ::xmlBlasterFree(p);
00088    }
00089    */
00090 
00091    // Serialize the message identical to the SOCKET protocol serialization
00092    // We use the functionality from our xmlBlaster C library
00093    ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
00094 
00095    blobHolder_.data = blob.data;
00096    blobHolder_.dataLen = blob.dataLen;
00097 
00098    if (log_.dump()) {
00099       char *p = ::blobDump(&blob);
00100       log_.dump(ME+".getEmbeddedObject()", string("Putting connect entry into queue:") + p);
00101       ::freeBlobDump(p);
00102    }
00103 
00104    return &blobHolder_;
00105 }
00106 
00107 const MsgQueueEntry& ConnectQueueEntry::send(I_ConnectionsHandler& connectionsHandler) const
00108 {
00109    if (log_.call()) log_.call(ME, "send");
00110    if (log_.dump()) log_.dump(ME, string("send: ") + toXml());
00111    connectReturnQos_ = connectionsHandler.connectRaw(connectQos_);
00112 //   connectionsHandler.setConnectReturnQos(*connectReturnQos_);
00113    return *this;
00114 }
00115 
00116 size_t ConnectQueueEntry::getSizeInBytes() const
00117 {
00118    if (!connectQos_.isNull()) {
00119       return sizeof(*connectQos_); // TODO: use toXml().size() ?
00120    }
00121    return 1024;
00122 }
00123 
00124 ConnectQosRef ConnectQueueEntry::getConnectQos() const
00125 {
00126    return connectQos_;
00127 }
00128 
00129 ConnectReturnQosRef ConnectQueueEntry::getConnectReturnQos() const
00130 {
00131    return connectReturnQos_;
00132 }
00133 
00134 
00135 string ConnectQueueEntry::toXml(const string& indent) const
00136 {
00137    string extraOffset = "   " + indent;
00138    string ret = indent + "<connectQueueEntry>\n";
00139    if (!connectQos_.isNull()) {
00140       ret += extraOffset + connectQos_->toXml(indent);
00141    }
00142    ret += indent + "</connectQueueEntry>\n";
00143    return ret;
00144 }
00145 
00146 }}}} // namespace
00147 
00148