00001 /*------------------------------------------------------------------------------ 00002 Name: PublishQueueEntry.cpp 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 ------------------------------------------------------------------------------*/ 00006 00007 #include <util/queue/PublishQueueEntry.h> 00008 #include <util/dispatch/I_ConnectionsHandler.h> 00009 #include <util/msgUtil.h> // from xmlBlaster C library 00010 #include <socket/xmlBlasterSocket.h> // from xmlBlaster C library ::encodeMsgUnit(&msgUnit, debug); 00011 00012 namespace org { namespace xmlBlaster { namespace util { namespace queue { 00013 00014 using namespace std; 00015 using namespace org::xmlBlaster::util::dispatch; 00016 using namespace org::xmlBlaster::client::qos; 00017 using namespace org::xmlBlaster::client::key; 00018 00019 PublishQueueEntry::PublishQueueEntry(Global& global, const MessageUnit& msgUnit, 00020 int priority, Timestamp uniqueId) 00021 : MsgQueueEntry(global, msgUnit, 00022 org::xmlBlaster::util::Constants::ENTRY_TYPE_MSG_RAW + "|" + org::xmlBlaster::util::MethodName::PUBLISH, 00023 priority, msgUnit.getQos().isPersistent(), uniqueId) 00024 { 00025 ME = "PublishQueueEntry"; 00026 if (log_.call()) log_.call(ME, "ctor ..."); 00027 if (priority < 0) priority_ = msgUnit.getQos().getPriority(); 00028 } 00029 00030 PublishQueueEntry::~PublishQueueEntry() { 00031 } 00032 00034 PublishQueueEntry::PublishQueueEntry(const PublishQueueEntry& rhs) 00035 //: MsgQueueEntry((MsgQueueEntry)rhs) 00036 : MsgQueueEntry(rhs.getGlobal(), rhs.getMsgUnit(), rhs.getEmbeddedType(), rhs.getPriority(), rhs.isPersistent(), rhs.getUniqueId()) 00037 { 00038 } 00039 00041 PublishQueueEntry& PublishQueueEntry::operator=(const PublishQueueEntry& rhs) 00042 { 00043 if (this == &rhs) 00044 return *this; 00045 return *this; 00046 } 00047 00048 MsgQueueEntry *PublishQueueEntry::getClone() const 00049 { 00050 return new PublishQueueEntry(*this); 00051 } 00052 00053 const MsgQueueEntry& PublishQueueEntry::send(I_ConnectionsHandler& connectionsHandler) const 00054 { 00055 if (log_.call()) log_.call(ME, "send"); 00056 if (publishReturnQos_) { 00057 delete publishReturnQos_; 00058 publishReturnQos_ = NULL; 00059 } 00060 if (log_.dump()) log_.dump(ME, string("send: ") + PublishQueueEntry::toXml()); 00061 publishReturnQos_ = new PublishReturnQos(connectionsHandler.getConnection().publish(*msgUnit_)); 00062 return *this; 00063 } 00064 00065 const PublishReturnQos* PublishQueueEntry::getPublishReturnQos() const 00066 { 00067 return publishReturnQos_; 00068 } 00069 00070 bool PublishQueueEntry::isPublish() const { 00071 return true; 00072 } 00073 00074 size_t PublishQueueEntry::getSizeInBytes() const 00075 { 00076 if (msgUnit_) return msgUnit_->getSizeInBytes(); 00077 return 0; 00078 } 00079 00080 string PublishQueueEntry::toXml(const string& indent) const 00081 { 00082 string extraOffset = " " + indent; 00083 string ret = indent + "<publishQueueEntry>\n"; 00084 if (msgUnit_) { 00085 ret += extraOffset + msgUnit_->toXml(indent); 00086 } 00087 ret += indent + "</publishQueueEntry>\n"; 00088 return ret; 00089 } 00090 00091 }}}} // namespace 00092 00093