[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[xmlblaster] Re: There is Memory leak on xmlblaster serverside?
Hi,
is your topic 'lpszKey,' changing for each publish?
If yes, a new topic is created for each message
and not destoyed as xmlBlaster keeps the message
in the history queue (max 10 as default).
To avoid this switch off the history queue
or use messages with expire or erase the topic
actively.
Or better: Use the same topic for all messages
if your use case allows it.
best regards
Marcel
>
>
> DuXiPeng(杜希鹏) wrote:
>
>> Error happens when I send a lot of volatile message to xmlblaster(1.6.2 vision). The publisher send volatile messages (1k per second)and the
>>
>> Subscriber received it. The memory’s consuming on serverside is
>> increasing constantly. Why is not the memory release?
>>
>>
>> thanks
>>
>> my publish code
>>
>>
>>
>>
>>
>> bool Publish(LPCTSTR lpszKey, LPBYTE pContent, int nSize, int optional,
>> LPCTSTR lpszTarget)
>>
>> {
>>
>> PublishKey key(global_, string(lpszKey), "text/xml", "1.0");
>>
>>
>>
>> string clientTags;
>>
>>
>>
>> clientTags = "<test Flag=”123”></ test >
>>
>>
>>
>> key.setClientTags(clientTags);
>>
>>
>>
>> if (domain != "")
>>
>> key.setDomain(domain);
>>
>>
>>
>> PublishQos pq(global_);
>>
>>
>>
>>
>>
>> pq.setPriority(MAX_PRIORITY);
>>
>>
>>
>> if (optional & MSG_VOLATILE)// volatile
>>
>> {
>>
>> pq.setVolatile(true);
>>
>> //pq.setLifeTime(lifeTime);
>>
>> pq.setForceDestroy(true);
>>
>>
>>
>> }
>>
>> Else //non- volatile
>>
>> {
>>
>> pq.setLifeTime(-1);
>>
>> }
>>
>>
>>
>> if (optional & MSG_PERSISTENT)
>>
>> pq.setPersistent(true);
>>
>> pq.setForceUpdate(forceUpdate);
>>
>> pq.setSubscribable(subscribable);
>>
>>
>>
>> TopicProperty topicProperty(global_);
>>
>> topicProperty.setDestroyDelay(0);
>>
>> topicProperty.setCreateDomEntry(false);
>>
>> if (historyMaxMsg >= 0L)
>>
>> {
>>
>> HistoryQueueProperty prop(global_, "");
>>
>> prop.setMaxEntries(historyMaxMsg);
>>
>> prop.setMaxEntriesCache(historyMaxEntriesCache);
>>
>> prop.setMaxBytesCache(historyMaxBytesCache);
>>
>> prop.setMaxBytes(historyMaxBytes);
>>
>> prop.setExpires(historyExpiresTimes);
>>
>>
>>
>> topicProperty.setHistoryQueueProperty(prop);
>>
>> }
>>
>> pq.setTopicProperty(topicProperty);
>>
>> if (lpszTarget != NULL)
>>
>> {
>>
>> pq.setSubscribable(false);
>>
>>
>>
>> SessionName sessionName(global_, lpszTarget);
>>
>> Destination dest(global_, sessionName);
>>
>> dest.forceQueuing(forceQueuing);
>>
>> pq.addDestination(dest);
>>
>> }
>>
>>
>>
>> stringstream sout;
>>
>>
>>
>> std::vector< unsigned char > contentVec;
>>
>> for (int i = 0 ;i < nSize; i++)
>>
>> contentVec.push_back(*(pContent + i));
>>
>>
>>
>> MessageUnit msgUnit(key, contentVec, pq);
>>
>>
>>
>> bool bSucceeded = false;
>>
>> if (oneway)
>>
>> {
>>
>> //log_.trace(ME, string("publishOneway() message unit: ") +
>> msgUnit.toXml());
>>
>> vector<MessageUnit> msgUnitArr;
>>
>> msgUnitArr.push_back(msgUnit);
>>
>> try
>>
>> {
>>
>> connection_.publishOneway(msgUnitArr);
>>
>> log_.trace(ME, "publishOneway() done");
>>
>> bSucceeded = true;
>>
>> }
>>
>> catch (XmlBlasterException e)
>>
>> {
>>
>> lastError = "publishOneway failed. Error info: " + e.toXml();
>>
>> log_.error(ME, lastError);
>>
>> }
>>
>> }
>>
>> else
>>
>> {
>>
>> //log_.trace(ME, string("publish() message unit: ") +
>> msgUnit.toXml());
>>
>> for (int i = 0; i < publishRetryCount + 1; i++)
>>
>> {
>>
>> try
>>
>> {
>>
>> PublishReturnQos tmp = connection_.publish(msgUnit);
>>
>> log_.trace(ME, string("publish return qos: ") + tmp.toXml());
>>
>> bSucceeded = true;
>>
>> break;
>>
>> }
>>
>> catch (XmlBlasterException e)
>>
>> {
>>
>> lastError = "publish failed. Error info: " + e.toXml();
>>
>> log_.error(ME, lastError);
>>
>> }
>>
>> }
>>
>> }
>>
>>
>>
>> contentVec.clear();
>>
>>
>>
>> return bSucceeded;
>>
>> }
>>
>>
>>
>>
>
>
>