1 /*------------------------------------------------------------------------------
  2 Name:      FileWriter.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 #include <contrib/FileWriter.h>
  8 #include <util/Global.h>
  9 #include <util/qos/ConnectQosFactory.h>
 10 #include <util/lexical_cast.h>
 11 #include <util/Timestamp.h>
 12 #include <util/dispatch/DispatchManager.h>
 13 #include <util/parser/ParserFactory.h>
 14 #include <stdio.h>
 15 #include <fstream>
 16 
 17 namespace org { namespace xmlBlaster { namespace contrib {
 18 
 19 using namespace std;
 20 using namespace org::xmlBlaster::util;
 21 using namespace org::xmlBlaster::util::qos;
 22 using namespace org::xmlBlaster::util::dispatch;
 23 using namespace org::xmlBlaster::util::dispatch;
 24 using namespace org::xmlBlaster::util::qos::storage;
 25 using namespace org::xmlBlaster::util::qos::address;
 26 using namespace org::xmlBlaster::authentication;
 27 using namespace org::xmlBlaster::client::protocol;
 28 using namespace org::xmlBlaster::client::key;
 29 using namespace org::xmlBlaster::client::qos;
 30 
 31 FileWriter::FileWriter(org::xmlBlaster::util::Global &global, std::string &name)
 32    : ME("FileWriter"),
 33      global_(global),
 34      log_(global.getLog("org.xmlBlaster.contrib")),
 35      subscribeKey_(""),
 36      subscribeQos_(""),
 37      name_(name),
 38      momAdministered_(true),
 39      connectQos_((org::xmlBlaster::util::qos::ConnectQos*)0)
 40 {
 41    access_ = NULL;
 42    callback_ = NULL;
 43    
 44 }
 45 
 46 void FileWriter::init() 
 47 {
 48    try {
 49       org::xmlBlaster::util::Property props = global_.getProperty();
 50       std::string key("mom.connectQos");
 51       if (props.propertyExists(key)) {
 52          std::string connectQosLiteral = props.get(key, "");
 53          org::xmlBlaster::util::qos::ConnectQosFactory factory(global_);
 54          connectQos_ = factory.readObject(connectQosLiteral);
 55       }
 56       else {
 57          std::string userId = props.get("mom.loginName", "_" + name_);
 58          std::string password = props.get("mom.password", "");
 59          connectQos_ = ConnectQosRef(new ConnectQos(global_, userId, password));
 60       }
 61       
 62       // momAdministered = this.global.get("mom.administered", false, null, pluginConfig);
 63       
 64       // tmp = this.global.get("mom.subscribeKey", (String)null, null, pluginConfig);
 65       // String topicName =  this.global.get("mom.topicName", (String)null, null, pluginConfig);
 66       if (momAdministered_) {
 67          std::string replDispatchPlugin("ReplManager,1.0");
 68          org::xmlBlaster::util::qos::address::CallbackAddress *cbAddr = new org::xmlBlaster::util::qos::address::CallbackAddress(global_);
 69          cbAddr->setRetries(-1);
 70          cbAddr->setDispatchPlugin(replDispatchPlugin);
 71          org::xmlBlaster::util::qos::address::AddressBaseRef ref(cbAddr);
 72          connectQos_->addCbAddress(ref);
 73       }
 74       key = std::string ("filewriter.directoryName");
 75       if (!props.propertyExists(key)) {
 76          std::string location(ME + "::init");
 77          std::string txt("prop '" + key + "' must be set");
 78          throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
 79       }
 80       std::string directoryName = props.get(key, ".");
 81       key = std::string("filewriter.tmpDirectoryName");
 82       std::string tmpDirectoryName = props.get(key, directoryName + FILE_SEP + "tmp");
 83       bool overwrite = props.get("filewriter.overwrite", true);
 84       std::string lockExtention = props.get("filewriter.lockExtention", ".lck");
 85       bool keepDumpFiles = false;
 86       callback_ = new FileWriterCallback(global_, directoryName, tmpDirectoryName, lockExtention, overwrite, keepDumpFiles);
 87       initConnection();
 88    }
 89    catch (XmlBlasterException &ex) {
 90       throw ex;
 91    }
 92    catch (exception &ex) {
 93       throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::init", ex.what());
 94    }
 95    catch (...) {
 96       throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::init", "unknown exception");
 97    }
 98    
 99 }
100 
101 void FileWriter::initConnection()
102 {
103    log_.trace(ME, "init");
104    access_ = new org::xmlBlaster::client::XmlBlasterAccess(global_);
105    access_->connect(*connectQos_, this);
106    if (!momAdministered_) {
107       // access_->subscribe(subscribeKey_, subscribeQos_);
108       std::string location(ME + "::putAllChunksTogether");
109       std::string txt("momAdministered 'false' is not implemented");
110       throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
111    }
112       
113 }
114 
115 
116 void FileWriter::shutdown()
117 {
118    if (access_ != NULL) {
119       DisconnectQos qos(global_);
120       // access_->disconnect(qos);
121       access_ = NULL;
122    }
123    callback_ = NULL;
124 }
125 
126 // TODO The COMMUNICATION_USER_HOLDBACK Exceptions should be moved to the library
127 std::string FileWriter::update(const std::string &sessionId,
128                        org::xmlBlaster::client::key::UpdateKey &updateKey,
129                        const unsigned char *content, long contentSize,
130                        org::xmlBlaster::client::qos::UpdateQos &updateQos)
131 {
132    try {
133          // InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), updateQos.getClientProperties());
134          std::string timestamp = "" + updateQos.getRcvTime();
135          std::map<std::string, org::xmlBlaster::util::qos::ClientProperty> props = updateQos.getClientProperties();
136          //std::map<std::string, org::xmlBlaster::util::qos::ClientProperty>::const_iterator iter = props.end();
137          org::xmlBlaster::util::qos::ClientProperty property(Constants::TIMESTAMP_ATTR, timestamp, Constants::TYPE_STRING, "UTF-8");
138          props.insert(pair<std::string, org::xmlBlaster::util::qos::ClientProperty>(Constants::TIMESTAMP_ATTR, property));
139          return callback_->update(sessionId, updateKey, content, contentSize, updateQos);
140    }
141    catch (XmlBlasterException &ex) {
142       throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + ":update", ex.getMessage());
143    }
144    catch (exception &ex) {
145       throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + "::update", ex.what());
146    }
147    catch (...) {
148       throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + "::update", "unknown exception");
149    }
150 }
151 
152 FileWriter::~FileWriter() 
153 {
154 }
155 
156 
157 }}} // namespaces


syntax highlighted by Code2HTML, v. 0.9.1