1 /*------------------------------------------------------------------------------
2 Name: FileWriterCallback.cpp
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 #include <contrib/FileWriterCallback.h>
8 #include <util/Global.h>
9 #include <util/lexical_cast.h>
10 #include <util/Timestamp.h>
11 #include <util/dispatch/DispatchManager.h>
12 #include <util/parser/ParserFactory.h>
13 #include <cstdio>
14 #include <cstring>
15 #if defined(_WINDOWS)
16 # ifdef WIN32_LEAN_AND_MEAN
17 # undef WIN32_LEAN_AND_MEAN
18 # endif
19 # define WIN32_LEAN_AND_MEAN 1
20 # include <windows.h>
21 #else
22 # include <dirent.h>
23 # include <sys/stat.h> //mkdir()
24 #endif
25 #include <fstream>
26
27 static void create_directorys(const std::string &fnfull);
28 static void add_fn_part(std::string &fnpath, const char *part);
29
30 namespace org { namespace xmlBlaster { namespace contrib {
31
32 using namespace std;
33 using namespace org::xmlBlaster::util;
34 using namespace org::xmlBlaster::util::qos;
35 using namespace org::xmlBlaster::util::dispatch;
36 using namespace org::xmlBlaster::util::dispatch;
37 using namespace org::xmlBlaster::util::qos::storage;
38 using namespace org::xmlBlaster::util::qos::address;
39 using namespace org::xmlBlaster::authentication;
40 using namespace org::xmlBlaster::client::protocol;
41 using namespace org::xmlBlaster::client::key;
42 using namespace org::xmlBlaster::client::qos;
43
44 FileWriterCallback::FileWriterCallback(org::xmlBlaster::util::Global &global, std::string &dirName, std::string &tmpDirName, std::string &lockExtention, bool overwrite, bool keepDumpFiles)
45 : ME("FileWriterCallback"),
46 global_(global),
47 BUF_SIZE(300000),
48 dirName_(dirName),
49 lockExtention_(lockExtention),
50 overwrite_(overwrite),
51 tmpDirName_(tmpDirName),
52 keepDumpFiles_(keepDumpFiles),
53 directory_(dirName),
54 tmpDirectory_(tmpDirName),
55 log_(global.getLog("org.xmlBlaster.contrib"))
56 {
57 bufSize_ = 120000;
58 buf_ = new char[bufSize_];
59
60 try {
61 // test creation of files in the directory and temporary directory and throw an exception if it is not possible to
62 // create a file
63 std::string completeFilename;
64 completeFilename.append(dirName).append(FILE_SEP).append("__checkCreation.dat");
65 std::ofstream out(completeFilename.c_str());
66 if (!out.is_open()) {
67 std::string txt("can not open files in directory '" + dirName + "'");
68 std::string location(ME + "::" + ME);
69 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
70 }
71 out.close();
72 if (::remove(completeFilename.c_str()) != 0) {
73 std::string location(ME + "::" + ME);
74 std::string txt("can not remove open files in directory '" + dirName + "'");
75 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
76 }
77
78 std::string completeTmpFilename;
79 completeTmpFilename.append(tmpDirName).append(FILE_SEP).append("__checkTmpCreation.dat");
80 std::ofstream outTmp(completeTmpFilename.c_str());
81 if (!outTmp.is_open()) {
82 std::string txt("can not open files in temporary directory '" + tmpDirName + "'");
83 std::string location(ME + "::" + ME);
84 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
85 }
86 outTmp.close();
87 if (::remove(completeTmpFilename.c_str()) != 0) {
88 std::string location(ME + "::" + ME);
89 std::string txt("can not remove open files in temporary directory '" + tmpDirName + "'");
90 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
91 }
92 }
93 catch (XmlBlasterException &ex) {
94 throw ex;
95 }
96 catch (exception &ex) {
97 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::" + ME, ex.what());
98 }
99 catch (...) {
100 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::" + ME, "unknown exception");
101 }
102
103 }
104
105 FileWriterCallback::~FileWriterCallback()
106 {
107 delete[] buf_;
108 }
109
110
111 void FileWriterCallback::storeChunk(std::string &tmpDir, std::string &fileName, long chunkNumber, std::string &sep, bool overwrite, const char *content, long length)
112 {
113
114 // private static void storeChunk(File tmpDir, String fileName, long chunkNumber, char sep, boolean overwrite, InputStream is) throws Exception {
115
116 std::string completeFileName;
117 completeFileName.append(tmpDir).append(FILE_SEP).append(fileName).append(sep).append(lexical_cast<std::string>(chunkNumber));
118
119 if (!overwrite) {
120 std::ofstream file(completeFileName.c_str(), ios::in | ios::binary);
121 if (file.is_open()) {
122 std::string txt("file '" + completeFileName + "' exists already and 'overwrite' is set to 'true', can not continue until you manually remove this file");
123 std::string location(ME + "::storeChunk");
124 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
125 }
126 file.close();
127 }
128
129 std::ofstream file(completeFileName.c_str(), ios::out | ios::binary);
130 if (!file.is_open()) {
131 std::string txt("chunk '" + lexical_cast<std::string>(chunkNumber) + "' for file '" + completeFileName + "' could not be written: check the write permissions on the directory");
132 std::string location(ME + "::storeChunk");
133 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
134 }
135
136 try {
137 // log.info(ME, "storing file '" + completeFileName + "'");
138 file.write(content, length);
139 file.close();
140 }
141 catch (exception &ex) {
142 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", ex.what());
143 }
144 catch (...) {
145 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "unknown exception");
146 }
147
148 }
149
150 long FileWriterCallback::extractNumberPostfixFromFile(std::string &filename, std::string &prefix)
151 {
152 if (filename.length() < 1)
153 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::extractNumberPostfixFromFile", "The filename is empty");
154
155 int prefixLength = prefix.length();
156 if (filename.find(prefix) == 0) {
157 std::string postfix = filename.substr(prefixLength);
158 if (postfix.length() < 1)
159 return -1L;
160 try {
161 return lexical_cast<long>(postfix);
162 }
163 catch (...) {
164 return -1L;
165 }
166 }
167 return -1L;
168 }
169
170 using namespace std;
171 void FileWriterCallback::getdir(std::string &dir, std::string &prefix, vector<string> &files)
172 {
173 #ifdef _WIN32
174 // return all files in dir, prefix?
175 HANDLE hFile = NULL; /* Find file handle */
176 WIN32_FIND_DATA FileData; /* Find file info structure */
177
178 char buf[256];
179 int buf_len = 255;
180 if (!GetCurrentDirectory(buf_len,buf)) { return; } // error !
181
182 SetCurrentDirectory(dir.c_str()); // ev. \ / handling?
183
184 hFile = FindFirstFile( "*", &FileData );
185 if ( hFile == INVALID_HANDLE_VALUE) return;
186 while (1) {
187 if (!(FileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) && std::string(FileData.cFileName).find_first_of(prefix) == 0)
188 files.push_back(FileData.cFileName);
189 if ( !FindNextFile( hFile, &FileData) ) break;
190 }
191 FindClose(hFile);
192
193 SetCurrentDirectory(buf);
194 #else
195 DIR *dp;
196 struct dirent *dirp;
197 if((dp = opendir(dir.c_str())) == NULL) {
198 std::string txt("could not retrieve files from directory '" + dir + "'");
199 std::string location(ME + "::getDir");
200 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
201 }
202 while ((dirp = readdir(dp)) != NULL && std::string(dirp->d_name).find_first_of(prefix) == 0) {
203 files.push_back(string(dirp->d_name));
204 }
205 closedir(dp);
206 #endif
207 }
208
209 void FileWriterCallback::getChunkFilenames(std::string &fileName, std::string &sep, std::vector<std::string> &filenames)
210 {
211 // scan for all chunks:
212 std::string prefix("");
213 prefix.append(fileName).append(sep);
214
215 std::vector<std::string> files;
216 getdir(tmpDirectory_, prefix, files);
217
218 std::vector<std::string>::iterator iter = files.begin();
219 std::map<int, std::string> fileMap;
220
221 while (iter != files.end()) {
222 long postfix = extractNumberPostfixFromFile(*iter, prefix);
223 if (postfix > -1L)
224 fileMap.insert(pair<const int, std::string>(postfix, *iter));
225 // else
226 // log.warning("");
227 iter++;
228 }
229 std::map<int, std::string>::const_iterator mapIter = fileMap.begin();
230 while (mapIter != fileMap.end()) {
231 std::string tmp((*mapIter).second);
232 filenames.insert(filenames.end(), tmp);
233 mapIter++;
234 }
235 }
236
237 void FileWriterCallback::putAllChunksTogether(std::string &fileName, std::string &subDir, long expectedChunks, const char *buf, long bufSize, bool isCompleteMsg)
238 {
239 log_.info(ME + "::putAllChunksTogether", "file='" + fileName + "' expectedChunks='" + lexical_cast<std::string>(expectedChunks) + "'");
240 std::string completeFileName(directory_);
241
242 if (subDir != "") {
243 add_fn_part(completeFileName, subDir.c_str());
244 add_fn_part(completeFileName, fileName.c_str());
245 # ifdef _WIN32
246 for (int i = 0; i < completeFileName.size() ; i++ ) {
247 if ( completeFileName[i] == '/' ) completeFileName[i] = '\\';
248 }
249 # endif
250 // check if directories exists, if not create, argument must include the filename
251 create_directorys(completeFileName);
252 }
253 else {
254 completeFileName.append(FILE_SEP).append(fileName);
255 }
256
257 if (!overwrite_) {
258 std::ofstream file(completeFileName.c_str(), ios::in | ios::binary);
259 if (file.is_open()) {
260 std::string txt("file '" + completeFileName + "' exists already and 'overwrite' is set to 'true', can not continue until you manually remove this file");
261 std::string location(ME + "::putAllChunksTogether");
262 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
263 }
264 file.close();
265 }
266
267 try {
268 // first create the lock file
269 std::string lockName;
270 lockName.append(completeFileName).append(lockExtention_);
271 fstream lock(lockName.c_str(), ios::out);
272 lock << "lock" << std::endl;
273 lock.close();
274
275 std::ofstream file(completeFileName.c_str(), ios::out | ios::binary);
276 if (!file.is_open()) {
277 std::string txt("file '" + completeFileName + "' could not be opended");
278 std::string location(ME + "::putAllChunksTogether");
279 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
280 }
281
282 std::vector<std::string> filenames;
283 if (expectedChunks > 0) { // the last one is already added at the end directly
284 std::string sep(".");
285 getChunkFilenames(fileName, sep, filenames); // retrieves the chunks in correct order
286 long length = filenames.size();
287 if (length > expectedChunks)
288 log_.warn(ME, "Too many chunks belonging to '" + fileName + "' are found. They are '" + lexical_cast<std::string>(filenames.size()) + "' but should be '" + lexical_cast<std::string>(expectedChunks) + "'");
289 else if (length < expectedChunks) {
290 std::string tmp;
291 fstream fileTmp(completeFileName.c_str());
292 if (fileTmp.is_open()) { // check if file exists
293 log_.warn(ME, "The number of chunks is '" + lexical_cast<std::string>(filenames.size()) + "' which is less than the expected '" + lexical_cast<std::string>(expectedChunks) + "' but the file '" + completeFileName + "' exists. So we will use the exisiting file (the chunks where probably already deleted)");
294 ::remove(lockName.c_str());
295 return;
296 }
297 else {
298 std::string txt("Too few chunks belonging to '" + fileName + "' are found. They are '" + lexical_cast<std::string>(filenames.size()) + "' but should be '" + lexical_cast<std::string>(expectedChunks) + "'");
299 std::string location(ME + "::putAllChunksTogether");
300 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
301 }
302 }
303 }
304 std::vector<std::string>::const_iterator iter = filenames.begin();
305 // to first create the file (without adding anything since it is closed after each tmp file)
306 fstream tmp(completeFileName.c_str(), ios::out | ios::binary);
307 tmp.close();
308
309 while (iter != filenames.end()) {
310 fstream dest(completeFileName.c_str(), ios::out | ios::app | ios::binary);
311 std::string inName("");
312 inName.append(tmpDirectory_).append(FILE_SEP).append(*iter);
313 fstream in(inName.c_str(), ios::in | ios::binary);
314 if (!in)
315 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "could not read the content of the chunk (exception when opening the file): " + inName + "'");
316 if (!in.is_open())
317 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "the file is not open for reading");
318
319 int numRead = 1;
320 while ( !in.eof() ) {
321 in.read(buf_, bufSize_);
322 numRead = in.gcount();
323 if (numRead > 0)
324 dest.write(buf_, numRead);
325 }
326 in.close();
327 dest.close();
328 iter++;
329 }
330
331 fstream dest(completeFileName.c_str(), ios::out | ios::app | ios::binary);
332 dest.write(buf, bufSize);
333 dest.close();
334
335 // clean up all chunks since complete file created
336 if (!isCompleteMsg && ! keepDumpFiles_) {
337 iter = filenames.begin();
338 while (iter != filenames.end()) {
339 std::string fileToDelete("");
340 fileToDelete.append(tmpDirectory_).append(FILE_SEP).append(*iter);
341 deleteFile(fileToDelete);
342 iter++;
343 }
344 }
345
346 if (::remove(lockName.c_str()) != 0) {
347 std::string location(ME + "::putAllChunksTogether");
348 std::string txt("can not remove lock file '" + lockName + "'");
349 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
350 }
351 }
352 catch (XmlBlasterException &ex) {
353 throw ex;
354 }
355 catch (exception &ex) {
356 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", ex.what());
357 }
358 catch (...) {
359 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "unknown exception");
360 }
361
362 }
363
364 bool FileWriterCallback::deleteFile(std::string &file)
365 {
366 return ::remove(file.c_str()) != 0;
367 }
368
369
370
371 std::string FileWriterCallback::update(const std::string&,
372 org::xmlBlaster::client::key::UpdateKey &updateKey,
373 const unsigned char *content, long contentSize,
374 org::xmlBlaster::client::qos::UpdateQos &updateQos)
375 {
376
377 std::map<std::string, org::xmlBlaster::util::qos::ClientProperty> props =
378 updateQos.getClientProperties();
379
380 std::string filename("");
381 std::string exMsg("");
382 long chunkCount = 0;
383 std::string subDir;
384 bool isLastMsg = true;
385 std::string topic = updateKey.getOid();
386
387 std::map<std::string, org::xmlBlaster::util::qos::ClientProperty>::const_iterator iter;
388
389 iter = props.find(Constants::FILENAME_ATTR);
390 if (iter != props.end())
391 filename = ((*iter).second).getStringValue();
392 else {
393 iter = props.find(Constants::TIMESTAMP_ATTR);
394 if (iter != props.end()) {
395 std::string timestamp = ((*iter).second).getStringValue();
396 filename.append("xbl").append(timestamp).append(".msg");
397 }
398 else {
399 std::string txt("update: the message '" + topic + "' should contain either the filename or the timestamp in the properties, but none was found. Can not create a filename to store the data on.");
400 std::string location(ME + "::putAllChunksTogether");
401 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
402 }
403 }
404
405 iter = props.find("_subdir"); // or define Constants::SUBDIR_ATTR in Constants.h .cpp
406 if (iter != props.end())
407 subDir = ((*iter).second).getStringValue();
408
409 iter = props.find(Constants::JMSX_GROUP_SEQ);
410 if (iter != props.end()) {
411 isLastMsg = false;
412 chunkCount = lexical_cast<int>(((*iter).second).getStringValue());
413 iter = props.find(Constants::JMSX_GROUP_EOF);
414 if (iter != props.end()) {
415 isLastMsg = lexical_cast<bool>(((*iter).second).getStringValue());
416 iter = props.find(Constants::JMSX_GROUP_EX);
417 if (iter != props.end())
418 exMsg = ((*iter).second).getStringValue();
419 }
420 }
421 else
422 isLastMsg = true;
423
424 if (filename.length() < 1) {
425 filename = topic;
426 log_.warn(ME, "The message did not contain any filename nor timestamp. Will write to '" + filename + "'");
427 }
428 log_.trace(ME, "storing file '" + filename + "' on directory '" + directory_ + "'");
429
430 bool isCompleteMsg = isLastMsg && chunkCount == 0L;
431 if (exMsg.length() < 1) { // no exception
432 if (isLastMsg)
433 putAllChunksTogether(filename, subDir, chunkCount, (const char*)content, contentSize, isCompleteMsg);
434 else {
435 std::string sep(".");
436 storeChunk(tmpDirectory_, filename, chunkCount, sep, overwrite_, (const char*)content, contentSize);
437 }
438 }
439 else if (!isCompleteMsg) { // clean up old chunks
440 std::vector<std::string> filenames;
441 std::string sep(".");
442 getChunkFilenames(filename, sep, filenames); // retrieves the chunks in correct order
443 std::vector<std::string>::const_iterator fileIter = filenames.begin();
444 while (fileIter != filenames.end()) {
445 std::string tmp((*fileIter));
446 deleteFile(tmp);
447 fileIter++;
448 }
449 }
450 return "OK";
451 }
452
453
454 }}} // namespaces
455
456
457 // #include <string>
458 using namespace std;
459
460 #ifdef _WIN32
461 # define DIR_SEP '\\'
462 # define DIR_SEP_STR "\\"
463 # include <io.h>
464 #else
465 # define DIR_SEP '/'
466 # define DIR_SEP_STR "/"
467 # include <unistd.h>
468 #endif
469
470 static void add_fn_part(string &fnpath, const char *part)
471 {
472 int len = fnpath.size();
473 if (len) {
474 if (fnpath[len-1] != DIR_SEP) fnpath += DIR_SEP_STR;
475 }
476 fnpath += part;
477 }
478
479 static void create_directorys(const std::string &fnfull)
480 {
481 char *p,*pn,*buf;
482 int part_len;
483 int len = strlen(fnfull.c_str())+1;
484 char *fn = new char[len];
485
486 strcpy(fn, fnfull.c_str());
487 buf = new char[len];
488 p = fn;
489
490 # ifdef _WIN32
491 if ( *(p+1) == ':' ) p += 2;
492 # endif
493
494 while ((pn = strchr(p,DIR_SEP)) != (char *)0) {
495
496 part_len = pn - fn;
497
498 if (!part_len) {
499 p = pn + 1;
500 continue;
501 }
502
503 strncpy(buf,fn, part_len + 1);
504 buf[part_len] = 0;
505
506 if (access(buf,4)) {
507 # ifdef _WIN32
508 CreateDirectory(buf,(LPSECURITY_ATTRIBUTES)0);
509 # else
510 mkdir(buf, (mode_t)0); /* umask setzt mode */
511 # endif
512 }
513 p = pn + 1;
514 }
515
516 delete [] fn;
517 delete [] buf;
518 }
519
520
521 #ifdef _XMLBLASTER_CLASSTEST
522
523 #include <util/Timestamp.h>
524 #include <util/thread/ThreadImpl.h>
525
526 using namespace std;
527 using namespace org::xmlBlaster::client;
528 using namespace org::xmlBlaster::util;
529
530 int main(int args, char* argv[])
531 {
532 // Init the XML platform
533 try
534 {
535 Global& glob = Global::getInstance();
536 glob.initialize(args, argv);
537 I_Log& log = glob.getLog("org.xmlBlaster.client");
538
539 std::string dirName("/home/michele/testWriteFile");
540 std::string tmpDirName("/home/michele/testWriteFile/tmp");
541 std::string lockExtention(".lck");
542 bool overwrite = true;
543 bool keepDumpFiles = false;
544 org::xmlBlaster::contrib::FileWriterCallback callback(glob, dirName, tmpDirName, lockExtention, overwrite, keepDumpFiles);
545
546 std::string test1("dummy.dat.128");
547 std::string test2("dummy.dat.");
548 std::cout << "Value: " << callback.extractNumberPostfixFromFile(test1, test2) << std::endl;
549
550 std::string filename("dummy.dat");
551 std::string sep(".");
552 std::vector<std::string> filenames;
553 callback.getChunkFilenames(filename, sep, filenames);
554 std::vector<std::string>::const_iterator iter = filenames.begin();
555 while (iter != filenames.end()) {
556 std::cout << (*iter) << std::endl;
557 iter++;
558 }
559
560 log.info("PROG", "The program ends");
561 }
562 catch (XmlBlasterException &ex) {
563 std::cout << ex.toXml() << std::endl;
564 // std::cout << ex.getRawMessage() << std::endl;
565 }
566 return 0;
567 }
568
569 #endif
syntax highlighted by Code2HTML, v. 0.9.1