1 /*------------------------------------------------------------------------------
  2 Name:      ConnectQueueEntry.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 #include <util/queue/ConnectQueueEntry.h>
  7 #include <util/dispatch/I_ConnectionsHandler.h>
  8 #include <util/msgUtil.h> // from xmlBlaster C library
  9 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
 10 #include <cstring> // memset()
 11 
 12 namespace org { namespace xmlBlaster { namespace util { namespace queue {
 13 
 14 using namespace std;
 15 using namespace org::xmlBlaster::util::qos;
 16 using namespace org::xmlBlaster::util::dispatch;
 17 
 18 ConnectQueueEntry::ConnectQueueEntry(Global& global, const ConnectQosRef& connectQos, int priority, Timestamp uniqueId)
 19    : MsgQueueEntry(global, connectQos,
 20                    org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT,
 21                    priority,
 22                    (connectQos.isNull() ? false : connectQos->isPersistent()),
 23                    uniqueId)
 24 {
 25    ME = "ConnectQueueEntry";
 26    if (log_.call()) log_.call(ME, "ctor ...");
 27    memset(&blobHolder_, 0, sizeof(BlobHolder));
 28 }
 29 
 30 /** copy constructor */
 31 ConnectQueueEntry::ConnectQueueEntry(const ConnectQueueEntry& rhs)
 32     : MsgQueueEntry(rhs.getGlobal(), rhs.getConnectQos(), rhs.getEmbeddedType(), rhs.getPriority(), rhs.isPersistent(), rhs.getUniqueId())
 33 {
 34    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
 35 }
 36 
 37 /** assignment constructor */
 38 ConnectQueueEntry& ConnectQueueEntry::operator=(const ConnectQueueEntry& rhs)
 39 {
 40    if (this == &rhs)
 41       return *this;
 42    memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
 43    return *this;
 44 }
 45 
 46 MsgQueueEntry *ConnectQueueEntry::getClone() const
 47 {
 48    return new ConnectQueueEntry(*this);
 49 }
 50 
 51 ConnectQueueEntry::~ConnectQueueEntry() {
 52    ::BlobHolder blob;
 53    blob.data    = blobHolder_.data;
 54    blob.dataLen = blobHolder_.dataLen;
 55    ::freeBlobHolderContent(&blob);
 56    memset(&blobHolder_, 0, sizeof(BlobHolder));
 57 }
 58 
 59 bool ConnectQueueEntry::isConnect() const {
 60         return true;
 61 }
 62 
 63 const void* ConnectQueueEntry::getEmbeddedObject() const
 64 {
 65    if (log_.call()) log_.call(ME, "getEmbeddedObject() ...");
 66    if (connectQos_.isNull()) {
 67       return 0;
 68    }
 69    if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT)) // "MSG_RAW|connect"
 70       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + ".getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT + "', '" + embeddedType_ + "' is not known");
 71 
 72    if (blobHolder_.data != 0) // Cached
 73       return &blobHolder_;
 74 
 75    //log_.info(ME+".getEmbeddedObject()", string("C++ connectQos=")+connectQos_->toXml());
 76 
 77    // dump MsgQueueEntry->connectQos_ with SOCKET protocol into C ::MsgUnit
 78    ::MsgUnit mu;
 79    memset(&mu, 0, sizeof(::MsgUnit));
 80    mu.key = "";
 81    string qosXml = connectQos_->toXml();
 82    mu.qos = qosXml.c_str();
 83 
 84    /*
 85    if (log_.dump()) {
 86       char *p = ::messageUnitToXmlLimited(&mu, 100);
 87       log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
 88       ::xmlBlasterFree(p);
 89    }
 90    */
 91 
 92    // Serialize the message identical to the SOCKET protocol serialization
 93    // We use the functionality from our xmlBlaster C library
 94    ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
 95 
 96    blobHolder_.data = blob.data;
 97    blobHolder_.dataLen = blob.dataLen;
 98 
 99    if (log_.dump()) {
100       char *p = ::blobDump(&blob);
101       log_.dump(ME+".getEmbeddedObject()", string("Putting connect entry into queue:") + p);
102       ::freeBlobDump(p);
103    }
104 
105    return &blobHolder_;
106 }
107 
108 const MsgQueueEntry& ConnectQueueEntry::send(I_ConnectionsHandler& connectionsHandler) const
109 {
110    if (log_.call()) log_.call(ME, "send");
111    if (log_.dump()) log_.dump(ME, string("send: ") + toXml());
112    connectReturnQos_ = connectionsHandler.connectRaw(connectQos_);
113 //   connectionsHandler.setConnectReturnQos(*connectReturnQos_);
114    return *this;
115 }
116 
117 size_t ConnectQueueEntry::getSizeInBytes() const
118 {
119    if (!connectQos_.isNull()) {
120       return sizeof(*connectQos_); // TODO: use toXml().size() ?
121    }
122    return 1024;
123 }
124 
125 ConnectQosRef ConnectQueueEntry::getConnectQos() const
126 {
127    return connectQos_;
128 }
129 
130 ConnectReturnQosRef ConnectQueueEntry::getConnectReturnQos() const
131 {
132    return connectReturnQos_;
133 }
134 
135 
136 string ConnectQueueEntry::toXml(const string& indent) const
137 {
138    string extraOffset = "   " + indent;
139    string ret = indent + "<connectQueueEntry>\n";
140    if (!connectQos_.isNull()) {
141       ret += extraOffset + connectQos_->toXml(indent);
142    }
143    ret += indent + "</connectQueueEntry>\n";
144    return ret;
145 }
146 
147 }}}} // namespace


syntax highlighted by Code2HTML, v. 0.9.1