1 /*------------------------------------------------------------------------------
2 Name: ConnectQueueEntry.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 #include <util/queue/ConnectQueueEntry.h>
7 #include <util/dispatch/I_ConnectionsHandler.h>
8 #include <util/msgUtil.h> // from xmlBlaster C library
9 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug);
10 #include <cstring> // memset()
11
12 namespace org { namespace xmlBlaster { namespace util { namespace queue {
13
14 using namespace std;
15 using namespace org::xmlBlaster::util::qos;
16 using namespace org::xmlBlaster::util::dispatch;
17
18 ConnectQueueEntry::ConnectQueueEntry(Global& global, const ConnectQosRef& connectQos, int priority, Timestamp uniqueId)
19 : MsgQueueEntry(global, connectQos,
20 org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT,
21 priority,
22 (connectQos.isNull() ? false : connectQos->isPersistent()),
23 uniqueId)
24 {
25 ME = "ConnectQueueEntry";
26 if (log_.call()) log_.call(ME, "ctor ...");
27 memset(&blobHolder_, 0, sizeof(BlobHolder));
28 }
29
30 /** copy constructor */
31 ConnectQueueEntry::ConnectQueueEntry(const ConnectQueueEntry& rhs)
32 : MsgQueueEntry(rhs.getGlobal(), rhs.getConnectQos(), rhs.getEmbeddedType(), rhs.getPriority(), rhs.isPersistent(), rhs.getUniqueId())
33 {
34 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
35 }
36
37 /** assignment constructor */
38 ConnectQueueEntry& ConnectQueueEntry::operator=(const ConnectQueueEntry& rhs)
39 {
40 if (this == &rhs)
41 return *this;
42 memset(&blobHolder_, 0, sizeof(BlobHolder)); // reset cache
43 return *this;
44 }
45
46 MsgQueueEntry *ConnectQueueEntry::getClone() const
47 {
48 return new ConnectQueueEntry(*this);
49 }
50
51 ConnectQueueEntry::~ConnectQueueEntry() {
52 ::BlobHolder blob;
53 blob.data = blobHolder_.data;
54 blob.dataLen = blobHolder_.dataLen;
55 ::freeBlobHolderContent(&blob);
56 memset(&blobHolder_, 0, sizeof(BlobHolder));
57 }
58
59 bool ConnectQueueEntry::isConnect() const {
60 return true;
61 }
62
63 const void* ConnectQueueEntry::getEmbeddedObject() const
64 {
65 if (log_.call()) log_.call(ME, "getEmbeddedObject() ...");
66 if (connectQos_.isNull()) {
67 return 0;
68 }
69 if (embeddedType_ != (org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT)) // "MSG_RAW|connect"
70 throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + ".getEmbeddedObject()", string("We only support embeddedType '") + org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::CONNECT + "', '" + embeddedType_ + "' is not known");
71
72 if (blobHolder_.data != 0) // Cached
73 return &blobHolder_;
74
75 //log_.info(ME+".getEmbeddedObject()", string("C++ connectQos=")+connectQos_->toXml());
76
77 // dump MsgQueueEntry->connectQos_ with SOCKET protocol into C ::MsgUnit
78 ::MsgUnit mu;
79 memset(&mu, 0, sizeof(::MsgUnit));
80 mu.key = "";
81 string qosXml = connectQos_->toXml();
82 mu.qos = qosXml.c_str();
83
84 /*
85 if (log_.dump()) {
86 char *p = ::messageUnitToXmlLimited(&mu, 100);
87 log_.dump(ME+".getEmbeddedObject()", string("C msgUnit:") + p);
88 ::xmlBlasterFree(p);
89 }
90 */
91
92 // Serialize the message identical to the SOCKET protocol serialization
93 // We use the functionality from our xmlBlaster C library
94 ::BlobHolder blob = ::encodeMsgUnit(&mu, 0);
95
96 blobHolder_.data = blob.data;
97 blobHolder_.dataLen = blob.dataLen;
98
99 if (log_.dump()) {
100 char *p = ::blobDump(&blob);
101 log_.dump(ME+".getEmbeddedObject()", string("Putting connect entry into queue:") + p);
102 ::freeBlobDump(p);
103 }
104
105 return &blobHolder_;
106 }
107
108 const MsgQueueEntry& ConnectQueueEntry::send(I_ConnectionsHandler& connectionsHandler) const
109 {
110 if (log_.call()) log_.call(ME, "send");
111 if (log_.dump()) log_.dump(ME, string("send: ") + toXml());
112 connectReturnQos_ = connectionsHandler.connectRaw(connectQos_);
113 // connectionsHandler.setConnectReturnQos(*connectReturnQos_);
114 return *this;
115 }
116
117 size_t ConnectQueueEntry::getSizeInBytes() const
118 {
119 if (!connectQos_.isNull()) {
120 return sizeof(*connectQos_); // TODO: use toXml().size() ?
121 }
122 return 1024;
123 }
124
125 ConnectQosRef ConnectQueueEntry::getConnectQos() const
126 {
127 return connectQos_;
128 }
129
130 ConnectReturnQosRef ConnectQueueEntry::getConnectReturnQos() const
131 {
132 return connectReturnQos_;
133 }
134
135
136 string ConnectQueueEntry::toXml(const string& indent) const
137 {
138 string extraOffset = " " + indent;
139 string ret = indent + "<connectQueueEntry>\n";
140 if (!connectQos_.isNull()) {
141 ret += extraOffset + connectQos_->toXml(indent);
142 }
143 ret += indent + "</connectQueueEntry>\n";
144 return ret;
145 }
146
147 }}}} // namespace
syntax highlighted by Code2HTML, v. 0.9.1