1 /*------------------------------------------------------------------------------
2 Name: FileWriter.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 #include <contrib/FileWriter.h>
8 #include <util/Global.h>
9 #include <util/qos/ConnectQosFactory.h>
10 #include <util/lexical_cast.h>
11 #include <util/Timestamp.h>
12 #include <util/dispatch/DispatchManager.h>
13 #include <util/parser/ParserFactory.h>
14 #include <stdio.h>
15 #include <fstream>
16
17 namespace org { namespace xmlBlaster { namespace contrib {
18
19 using namespace std;
20 using namespace org::xmlBlaster::util;
21 using namespace org::xmlBlaster::util::qos;
22 using namespace org::xmlBlaster::util::dispatch;
23 using namespace org::xmlBlaster::util::dispatch;
24 using namespace org::xmlBlaster::util::qos::storage;
25 using namespace org::xmlBlaster::util::qos::address;
26 using namespace org::xmlBlaster::authentication;
27 using namespace org::xmlBlaster::client::protocol;
28 using namespace org::xmlBlaster::client::key;
29 using namespace org::xmlBlaster::client::qos;
30
31 FileWriter::FileWriter(org::xmlBlaster::util::Global &global, std::string &name)
32 : ME("FileWriter"),
33 global_(global),
34 log_(global.getLog("org.xmlBlaster.contrib")),
35 subscribeKey_(""),
36 subscribeQos_(""),
37 name_(name),
38 momAdministered_(true),
39 connectQos_((org::xmlBlaster::util::qos::ConnectQos*)0)
40 {
41 access_ = NULL;
42 callback_ = NULL;
43
44 }
45
46 void FileWriter::init()
47 {
48 try {
49 org::xmlBlaster::util::Property props = global_.getProperty();
50 std::string key("mom.connectQos");
51 if (props.propertyExists(key)) {
52 std::string connectQosLiteral = props.get(key, "");
53 org::xmlBlaster::util::qos::ConnectQosFactory factory(global_);
54 connectQos_ = factory.readObject(connectQosLiteral);
55 }
56 else {
57 std::string userId = props.get("mom.loginName", "_" + name_);
58 std::string password = props.get("mom.password", "");
59 connectQos_ = ConnectQosRef(new ConnectQos(global_, userId, password));
60 }
61
62 // momAdministered = this.global.get("mom.administered", false, null, pluginConfig);
63
64 // tmp = this.global.get("mom.subscribeKey", (String)null, null, pluginConfig);
65 // String topicName = this.global.get("mom.topicName", (String)null, null, pluginConfig);
66 if (momAdministered_) {
67 std::string replDispatchPlugin("ReplManager,1.0");
68 org::xmlBlaster::util::qos::address::CallbackAddress *cbAddr = new org::xmlBlaster::util::qos::address::CallbackAddress(global_);
69 cbAddr->setRetries(-1);
70 cbAddr->setDispatchPlugin(replDispatchPlugin);
71 org::xmlBlaster::util::qos::address::AddressBaseRef ref(cbAddr);
72 connectQos_->addCbAddress(ref);
73 }
74 key = std::string ("filewriter.directoryName");
75 if (!props.propertyExists(key)) {
76 std::string location(ME + "::init");
77 std::string txt("prop '" + key + "' must be set");
78 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
79 }
80 std::string directoryName = props.get(key, ".");
81 key = std::string("filewriter.tmpDirectoryName");
82 std::string tmpDirectoryName = props.get(key, directoryName + FILE_SEP + "tmp");
83 bool overwrite = props.get("filewriter.overwrite", true);
84 std::string lockExtention = props.get("filewriter.lockExtention", ".lck");
85 bool keepDumpFiles = false;
86 callback_ = new FileWriterCallback(global_, directoryName, tmpDirectoryName, lockExtention, overwrite, keepDumpFiles);
87 initConnection();
88 }
89 catch (XmlBlasterException &ex) {
90 throw ex;
91 }
92 catch (exception &ex) {
93 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::init", ex.what());
94 }
95 catch (...) {
96 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::init", "unknown exception");
97 }
98
99 }
100
101 void FileWriter::initConnection()
102 {
103 log_.trace(ME, "init");
104 access_ = new org::xmlBlaster::client::XmlBlasterAccess(global_);
105 access_->connect(*connectQos_, this);
106 if (!momAdministered_) {
107 // access_->subscribe(subscribeKey_, subscribeQos_);
108 std::string location(ME + "::putAllChunksTogether");
109 std::string txt("momAdministered 'false' is not implemented");
110 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
111 }
112
113 }
114
115
116 void FileWriter::shutdown()
117 {
118 if (access_ != NULL) {
119 DisconnectQos qos(global_);
120 // access_->disconnect(qos);
121 access_ = NULL;
122 }
123 callback_ = NULL;
124 }
125
126 // TODO The COMMUNICATION_USER_HOLDBACK Exceptions should be moved to the library
127 std::string FileWriter::update(const std::string &sessionId,
128 org::xmlBlaster::client::key::UpdateKey &updateKey,
129 const unsigned char *content, long contentSize,
130 org::xmlBlaster::client::qos::UpdateQos &updateQos)
131 {
132 try {
133 // InputStream is = MomEventEngine.decompress(new ByteArrayInputStream(content), updateQos.getClientProperties());
134 std::string timestamp = "" + updateQos.getRcvTime();
135 std::map<std::string, org::xmlBlaster::util::qos::ClientProperty> props = updateQos.getClientProperties();
136 //std::map<std::string, org::xmlBlaster::util::qos::ClientProperty>::const_iterator iter = props.end();
137 org::xmlBlaster::util::qos::ClientProperty property(Constants::TIMESTAMP_ATTR, timestamp, Constants::TYPE_STRING, "UTF-8");
138 props.insert(pair<std::string, org::xmlBlaster::util::qos::ClientProperty>(Constants::TIMESTAMP_ATTR, property));
139 return callback_->update(sessionId, updateKey, content, contentSize, updateQos);
140 }
141 catch (XmlBlasterException &ex) {
142 throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + ":update", ex.getMessage());
143 }
144 catch (exception &ex) {
145 throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + "::update", ex.what());
146 }
147 catch (...) {
148 throw XmlBlasterException(USER_UPDATE_HOLDBACK, ME + "::update", "unknown exception");
149 }
150 }
151
152 FileWriter::~FileWriter()
153 {
154 }
155
156
157 }}} // namespaces
syntax highlighted by Code2HTML, v. 0.9.1