1 /*------------------------------------------------------------------------------
  2 Name:      FileWriterCallback.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 #include <contrib/FileWriterCallback.h>
  8 #include <util/Global.h>
  9 #include <util/lexical_cast.h>
 10 #include <util/Timestamp.h>
 11 #include <util/dispatch/DispatchManager.h>
 12 #include <util/parser/ParserFactory.h>
 13 #include <cstdio>
 14 #include <cstring>
 15 #if defined(_WINDOWS)
 16 #   ifdef WIN32_LEAN_AND_MEAN
 17 #      undef WIN32_LEAN_AND_MEAN
 18 #   endif
 19 #   define WIN32_LEAN_AND_MEAN 1
 20 #   include <windows.h>
 21 #else
 22 #   include <dirent.h>
 23 #   include <sys/stat.h>  //mkdir()
 24 #endif
 25 #include <fstream>
 26 
 27 static void create_directorys(const std::string &fnfull);
 28 static void add_fn_part(std::string &fnpath, const char *part);
 29 
 30 namespace org { namespace xmlBlaster { namespace contrib {
 31 
 32 using namespace std;
 33 using namespace org::xmlBlaster::util;
 34 using namespace org::xmlBlaster::util::qos;
 35 using namespace org::xmlBlaster::util::dispatch;
 36 using namespace org::xmlBlaster::util::dispatch;
 37 using namespace org::xmlBlaster::util::qos::storage;
 38 using namespace org::xmlBlaster::util::qos::address;
 39 using namespace org::xmlBlaster::authentication;
 40 using namespace org::xmlBlaster::client::protocol;
 41 using namespace org::xmlBlaster::client::key;
 42 using namespace org::xmlBlaster::client::qos;
 43 
 44 FileWriterCallback::FileWriterCallback(org::xmlBlaster::util::Global &global, std::string &dirName, std::string &tmpDirName, std::string &lockExtention, bool overwrite, bool keepDumpFiles)
 45    : ME("FileWriterCallback"),
 46      global_(global),
 47           BUF_SIZE(300000),
 48      dirName_(dirName),
 49      lockExtention_(lockExtention),
 50      overwrite_(overwrite),
 51      tmpDirName_(tmpDirName),
 52      keepDumpFiles_(keepDumpFiles),
 53      directory_(dirName),
 54      tmpDirectory_(tmpDirName),
 55      log_(global.getLog("org.xmlBlaster.contrib"))
 56 {
 57         bufSize_ = 120000;
 58         buf_ = new char[bufSize_];
 59 
 60         try {
 61                 // test creation of files in the directory and temporary directory and throw an exception if it is not possible to
 62                 // create a file
 63                 std::string completeFilename;
 64                 completeFilename.append(dirName).append(FILE_SEP).append("__checkCreation.dat");
 65                 std::ofstream out(completeFilename.c_str());
 66                 if (!out.is_open()) {
 67                         std::string txt("can not open files in directory '" + dirName + "'");
 68                         std::string location(ME + "::" + ME);
 69                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
 70                 }
 71         out.close();
 72                 if (::remove(completeFilename.c_str()) != 0) {
 73                         std::string location(ME + "::" + ME);
 74                         std::string txt("can not remove open files in directory '" + dirName + "'");
 75                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
 76                 }
 77                 
 78                 std::string completeTmpFilename;
 79                 completeTmpFilename.append(tmpDirName).append(FILE_SEP).append("__checkTmpCreation.dat");
 80                 std::ofstream outTmp(completeTmpFilename.c_str());
 81                 if (!outTmp.is_open()) {
 82                         std::string txt("can not open files in temporary directory '" + tmpDirName + "'");
 83                         std::string location(ME + "::" + ME);
 84                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
 85                 }
 86         outTmp.close();
 87                 if (::remove(completeTmpFilename.c_str()) != 0) {
 88                         std::string location(ME + "::" + ME);
 89                         std::string txt("can not remove open files in temporary directory '" + tmpDirName + "'");
 90                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
 91                 }
 92         }
 93         catch (XmlBlasterException &ex) {
 94                 throw ex;
 95         }
 96         catch (exception &ex) {
 97                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::" + ME, ex.what());
 98         }
 99         catch (...) {
100                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::" + ME, "unknown exception");
101         }
102                 
103 }
104 
105 FileWriterCallback::~FileWriterCallback()
106 {
107         delete[] buf_;
108 }
109 
110 
111 void FileWriterCallback::storeChunk(std::string &tmpDir, std::string &fileName, long chunkNumber, std::string &sep, bool overwrite, const char *content, long length) 
112 {
113 
114 //   private static void storeChunk(File tmpDir, String fileName, long chunkNumber, char sep, boolean overwrite, InputStream is) throws Exception {
115 
116         std::string completeFileName;
117         completeFileName.append(tmpDir).append(FILE_SEP).append(fileName).append(sep).append(lexical_cast<std::string>(chunkNumber));
118 
119         if (!overwrite) {
120                 std::ofstream file(completeFileName.c_str(), ios::in | ios::binary);
121                 if (file.is_open()) {
122                         std::string txt("file '" + completeFileName + "' exists already and 'overwrite' is set to 'true', can not continue until you manually remove this file");
123                         std::string location(ME + "::storeChunk");
124                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
125                 }
126                 file.close();
127         }
128         
129         std::ofstream file(completeFileName.c_str(), ios::out | ios::binary);
130         if (!file.is_open()) {
131                 std::string txt("chunk '" + lexical_cast<std::string>(chunkNumber) + "' for file '" + completeFileName + "' could not be written: check the write permissions on the directory");
132                 std::string location(ME + "::storeChunk");
133                 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
134         }
135         
136         try {
137                 // log.info(ME, "storing file '" + completeFileName + "'");
138                 file.write(content, length);
139       file.close();
140    }
141         catch (exception &ex) {
142                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", ex.what());
143         }
144         catch (...) {
145                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "unknown exception");
146         }
147                 
148 }
149 
150 long FileWriterCallback::extractNumberPostfixFromFile(std::string &filename, std::string &prefix) 
151 {
152         if (filename.length() < 1)
153                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::extractNumberPostfixFromFile", "The filename is empty");
154 
155         int prefixLength = prefix.length();
156         if (filename.find(prefix) == 0) {
157         std::string postfix = filename.substr(prefixLength);
158       if (postfix.length() < 1)
159         return -1L;
160                 try {
161                         return lexical_cast<long>(postfix);
162                 }
163                 catch (...) {
164                         return -1L;
165                 }
166         }
167         return -1L;
168 }
169 
170 using namespace std;
171 void FileWriterCallback::getdir(std::string &dir, std::string &prefix, vector<string> &files)
172 {
173 #ifdef _WIN32
174         // return all files in dir, prefix?
175         HANDLE          hFile = NULL;   /*  Find file handle */
176         WIN32_FIND_DATA FileData;       /*  Find file info structure */
177 
178         char buf[256];
179         int  buf_len = 255;
180         if (!GetCurrentDirectory(buf_len,buf)) { return; } // error !
181 
182         SetCurrentDirectory(dir.c_str()); // ev. \ /  handling?
183 
184         hFile = FindFirstFile( "*", &FileData );
185         if ( hFile == INVALID_HANDLE_VALUE) return;
186         while (1) {
187                 if (!(FileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) &&  std::string(FileData.cFileName).find_first_of(prefix) == 0)
188                         files.push_back(FileData.cFileName);
189                 if ( !FindNextFile( hFile, &FileData) ) break;
190         }
191         FindClose(hFile);
192 
193         SetCurrentDirectory(buf);
194 #else
195         DIR *dp;
196         struct dirent *dirp;
197         if((dp  = opendir(dir.c_str())) == NULL) {
198                 std::string txt("could not retrieve files from directory '" + dir + "'");
199                 std::string location(ME + "::getDir");
200                 throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
201    }
202         while ((dirp = readdir(dp)) != NULL &&  std::string(dirp->d_name).find_first_of(prefix) == 0) {
203                 files.push_back(string(dirp->d_name));
204         }
205         closedir(dp);
206 #endif
207 }
208 
209 void FileWriterCallback::getChunkFilenames(std::string &fileName, std::string &sep, std::vector<std::string> &filenames) 
210 {
211         // scan for all chunks:
212         std::string prefix("");
213         prefix.append(fileName).append(sep);
214 
215         std::vector<std::string> files;
216         getdir(tmpDirectory_, prefix, files);
217         
218         std::vector<std::string>::iterator iter = files.begin();
219         std::map<int, std::string> fileMap;
220 
221         while (iter != files.end()) {
222                 long postfix = extractNumberPostfixFromFile(*iter, prefix);
223                 if (postfix > -1L)
224                         fileMap.insert(pair<const int, std::string>(postfix, *iter));
225                 // else
226                 //      log.warning("");
227                 iter++;
228         }
229         std::map<int, std::string>::const_iterator mapIter = fileMap.begin();
230         while (mapIter != fileMap.end()) {
231                 std::string tmp((*mapIter).second);
232                 filenames.insert(filenames.end(), tmp);
233                 mapIter++;
234         }
235 }
236 
237 void FileWriterCallback::putAllChunksTogether(std::string &fileName, std::string &subDir, long expectedChunks, const char *buf, long bufSize, bool isCompleteMsg) 
238 {
239         log_.info(ME + "::putAllChunksTogether", "file='" + fileName + "' expectedChunks='" + lexical_cast<std::string>(expectedChunks) + "'");
240         std::string completeFileName(directory_);
241 
242         if (subDir != "") {
243                 add_fn_part(completeFileName, subDir.c_str());
244                 add_fn_part(completeFileName, fileName.c_str());
245 #               ifdef _WIN32
246                 for (int i = 0; i < completeFileName.size() ; i++ ) {
247                   if ( completeFileName[i] == '/' ) completeFileName[i] = '\\';
248                 }
249 #               endif
250                 // check if directories exists, if not create, argument must include the filename
251                 create_directorys(completeFileName);
252         }
253         else {
254                 completeFileName.append(FILE_SEP).append(fileName);
255         }
256         
257         if (!overwrite_) {
258                 std::ofstream file(completeFileName.c_str(), ios::in | ios::binary);
259                 if (file.is_open()) {
260                         std::string txt("file '" + completeFileName + "' exists already and 'overwrite' is set to 'true', can not continue until you manually remove this file");
261                         std::string location(ME + "::putAllChunksTogether");
262                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
263                 }
264                 file.close();
265         }
266 
267         try {
268                 // first create the lock file
269                 std::string lockName;
270                 lockName.append(completeFileName).append(lockExtention_);
271                 fstream lock(lockName.c_str(), ios::out);
272                 lock << "lock" << std::endl;
273                 lock.close();
274 
275                 std::ofstream file(completeFileName.c_str(), ios::out | ios::binary);
276                 if (!file.is_open()) {
277                         std::string txt("file '" + completeFileName + "' could not be opended");
278                         std::string location(ME + "::putAllChunksTogether");
279                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
280                 }
281 
282                 std::vector<std::string> filenames;
283                 if (expectedChunks > 0) { // the last one is already added at the end directly
284                         std::string sep(".");
285                         getChunkFilenames(fileName, sep, filenames); // retrieves the chunks in correct order
286                         long length = filenames.size();
287                         if (length > expectedChunks)
288                                 log_.warn(ME, "Too many chunks belonging to '" + fileName + "' are found. They are '" + lexical_cast<std::string>(filenames.size()) + "' but should be '" + lexical_cast<std::string>(expectedChunks) + "'");
289                         else if (length < expectedChunks) {
290                                 std::string tmp;
291                                 fstream fileTmp(completeFileName.c_str());
292                                 if (fileTmp.is_open()) {  // check if file exists
293                                         log_.warn(ME, "The number of chunks is '" + lexical_cast<std::string>(filenames.size()) + "' which is less than the expected '" + lexical_cast<std::string>(expectedChunks) + "' but the file '" + completeFileName + "' exists. So we will use the exisiting file (the chunks where probably already deleted)");
294                                         ::remove(lockName.c_str());
295                                         return;
296                                 }
297                                 else {
298                                         std::string txt("Too few chunks belonging to '" + fileName + "' are found. They are '" + lexical_cast<std::string>(filenames.size()) + "' but should be '" + lexical_cast<std::string>(expectedChunks) + "'");
299                                         std::string location(ME + "::putAllChunksTogether");
300                                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
301                                 }
302                         }
303                 }
304                 std::vector<std::string>::const_iterator iter = filenames.begin();
305                 // to first create the file (without adding anything since it is closed after each tmp file)
306                 fstream tmp(completeFileName.c_str(), ios::out | ios::binary);
307                 tmp.close();
308                 
309                 while (iter != filenames.end()) {
310                         fstream dest(completeFileName.c_str(), ios::out | ios::app | ios::binary);
311                         std::string inName("");
312                         inName.append(tmpDirectory_).append(FILE_SEP).append(*iter);
313                         fstream in(inName.c_str(), ios::in | ios::binary);
314                         if (!in)
315                                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "could not read the content of the chunk (exception when opening the file): " + inName + "'");
316                         if (!in.is_open())
317                                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "the file is not open for reading");
318                         
319                         int numRead = 1;
320                         while ( !in.eof() ) {
321                                 in.read(buf_, bufSize_);
322                                 numRead = in.gcount();
323                                 if (numRead > 0)
324                                         dest.write(buf_, numRead);
325                         }
326                         in.close();
327                         dest.close();
328                         iter++;
329                 }
330 
331                 fstream dest(completeFileName.c_str(), ios::out | ios::app | ios::binary);
332                 dest.write(buf, bufSize);
333                 dest.close();
334 
335                 // clean up all chunks since complete file created
336       if (!isCompleteMsg && ! keepDumpFiles_) {
337                         iter = filenames.begin();
338                         while (iter != filenames.end()) {
339                                 std::string fileToDelete("");
340                                 fileToDelete.append(tmpDirectory_).append(FILE_SEP).append(*iter);
341                                 deleteFile(fileToDelete);
342                                 iter++;
343                         }
344       }
345       
346                 if (::remove(lockName.c_str()) != 0) {
347                         std::string location(ME + "::putAllChunksTogether");
348                         std::string txt("can not remove lock file '" + lockName + "'");
349                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
350                 }
351    }
352         catch (XmlBlasterException &ex) {
353                 throw ex;
354         }
355         catch (exception &ex) {
356                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", ex.what());
357         }
358         catch (...) {
359                 throw XmlBlasterException(USER_ILLEGALARGUMENT, ME + "::storeChunk", "unknown exception");
360         }
361                 
362 }
363 
364 bool FileWriterCallback::deleteFile(std::string &file) 
365 {
366         return ::remove(file.c_str()) != 0;
367 }
368 
369 
370 
371 std::string FileWriterCallback::update(const std::string&,
372                        org::xmlBlaster::client::key::UpdateKey &updateKey,
373                        const unsigned char *content, long contentSize,
374                        org::xmlBlaster::client::qos::UpdateQos &updateQos) 
375 {
376 
377         std::map<std::string, org::xmlBlaster::util::qos::ClientProperty> props =
378                 updateQos.getClientProperties();
379         
380         std::string filename("");
381         std::string exMsg("");
382         long chunkCount = 0;
383         std::string subDir;
384         bool isLastMsg = true;
385         std::string topic = updateKey.getOid();
386 
387         std::map<std::string, org::xmlBlaster::util::qos::ClientProperty>::const_iterator iter;
388         
389         iter = props.find(Constants::FILENAME_ATTR);
390         if (iter != props.end())
391                 filename = ((*iter).second).getStringValue();
392         else {
393                 iter = props.find(Constants::TIMESTAMP_ATTR);           
394                 if (iter != props.end()) {
395                         std::string timestamp = ((*iter).second).getStringValue();
396                         filename.append("xbl").append(timestamp).append(".msg");
397                 }
398                 else {
399                         std::string txt("update: the message '" + topic + "' should contain either the filename or the timestamp in the properties, but none was found. Can not create a filename to store the data on.");
400                         std::string location(ME + "::putAllChunksTogether");
401                         throw XmlBlasterException(USER_ILLEGALARGUMENT, location.c_str(), txt.c_str());
402         }
403         }
404         
405         iter = props.find("_subdir"); // or define Constants::SUBDIR_ATTR in Constants.h .cpp
406         if (iter != props.end())
407                 subDir = ((*iter).second).getStringValue();
408         
409         iter = props.find(Constants::JMSX_GROUP_SEQ);           
410         if (iter != props.end()) {
411                 isLastMsg = false;
412                 chunkCount = lexical_cast<int>(((*iter).second).getStringValue());
413                 iter = props.find(Constants::JMSX_GROUP_EOF);
414                 if (iter != props.end()) {
415                         isLastMsg = lexical_cast<bool>(((*iter).second).getStringValue());
416                         iter = props.find(Constants::JMSX_GROUP_EX);
417                         if (iter != props.end())
418                                 exMsg = ((*iter).second).getStringValue();
419                 }
420         }
421         else
422                 isLastMsg = true;
423         
424         if (filename.length() < 1) {
425                 filename = topic;
426       log_.warn(ME, "The message did not contain any filename nor timestamp. Will write to '" + filename + "'");
427         }
428    log_.trace(ME, "storing file '" + filename + "' on directory '" + directory_ + "'");
429 
430         bool isCompleteMsg = isLastMsg && chunkCount == 0L;
431         if (exMsg.length() < 1) { // no exception
432                 if (isLastMsg)
433                         putAllChunksTogether(filename, subDir, chunkCount, (const char*)content, contentSize, isCompleteMsg);
434                 else {
435                         std::string sep(".");
436                         storeChunk(tmpDirectory_, filename, chunkCount, sep, overwrite_, (const char*)content, contentSize);
437                 }
438         }
439    else if (!isCompleteMsg) { // clean up old chunks
440         std::vector<std::string> filenames;
441         std::string sep(".");
442                 getChunkFilenames(filename, sep, filenames); // retrieves the chunks in correct order
443                 std::vector<std::string>::const_iterator fileIter = filenames.begin();
444                 while (fileIter != filenames.end()) {
445                         std::string tmp((*fileIter));
446                         deleteFile(tmp);
447                         fileIter++;
448                 }
449    }
450         return "OK";
451 }
452                        
453 
454 }}} // namespaces
455 
456 
457 // #include <string>
458 using namespace std;
459 
460 #ifdef _WIN32
461 #       define DIR_SEP '\\'
462 #       define DIR_SEP_STR "\\"
463 #       include <io.h>
464 #else
465 #       define DIR_SEP '/'
466 #       define DIR_SEP_STR "/"
467 #       include <unistd.h>
468 #endif
469 
470 static void add_fn_part(string &fnpath, const char *part)
471 {
472         int len = fnpath.size();
473         if (len) {
474                 if (fnpath[len-1] != DIR_SEP) fnpath += DIR_SEP_STR;
475         }
476         fnpath += part;
477 }
478 
479 static void create_directorys(const std::string &fnfull)
480 {
481         char    *p,*pn,*buf;
482         int     part_len;
483         int   len = strlen(fnfull.c_str())+1;
484         char  *fn = new char[len];
485 
486         strcpy(fn, fnfull.c_str());
487         buf = new char[len];
488         p = fn;
489 
490 #       ifdef _WIN32
491                 if ( *(p+1) == ':' ) p += 2;
492 #       endif
493 
494         while ((pn = strchr(p,DIR_SEP)) != (char *)0) {
495 
496                 part_len = pn - fn;
497 
498                 if (!part_len) {
499                         p = pn + 1;
500                         continue;
501                 }
502 
503                 strncpy(buf,fn, part_len + 1);
504                 buf[part_len] = 0;
505 
506                 if (access(buf,4)) {
507 #               ifdef _WIN32
508                         CreateDirectory(buf,(LPSECURITY_ATTRIBUTES)0);
509 #               else
510                         mkdir(buf, (mode_t)0); /* umask setzt mode */
511 #               endif
512                 }
513                 p = pn + 1;
514         }
515 
516         delete [] fn;
517         delete [] buf;
518 }
519 
520 
521 #ifdef _XMLBLASTER_CLASSTEST
522 
523 #include <util/Timestamp.h>
524 #include <util/thread/ThreadImpl.h>
525 
526 using namespace std;
527 using namespace org::xmlBlaster::client;
528 using namespace org::xmlBlaster::util;
529 
530 int main(int args, char* argv[])
531 {
532     // Init the XML platform
533     try
534     {
535             Global& glob = Global::getInstance();
536        glob.initialize(args, argv);
537        I_Log& log = glob.getLog("org.xmlBlaster.client");
538 
539        std::string dirName("/home/michele/testWriteFile");
540        std::string tmpDirName("/home/michele/testWriteFile/tmp");
541        std::string lockExtention(".lck");
542        bool overwrite = true;
543        bool keepDumpFiles = false;
544        org::xmlBlaster::contrib::FileWriterCallback callback(glob, dirName, tmpDirName, lockExtention, overwrite, keepDumpFiles);
545        
546        std::string test1("dummy.dat.128");
547        std::string test2("dummy.dat.");
548        std::cout << "Value: " << callback.extractNumberPostfixFromFile(test1, test2) << std::endl;
549        
550        std::string filename("dummy.dat");
551        std::string sep(".");
552        std::vector<std::string> filenames;
553                  callback.getChunkFilenames(filename, sep, filenames);
554                  std::vector<std::string>::const_iterator iter = filenames.begin();
555                  while (iter != filenames.end()) {
556                         std::cout << (*iter) << std::endl;
557                         iter++;
558                  }
559        
560        log.info("PROG", "The program ends");
561    }
562    catch (XmlBlasterException &ex) {
563       std::cout << ex.toXml() << std::endl;
564       // std::cout << ex.getRawMessage() << std::endl;
565    }
566    return 0;
567 }
568 
569 #endif


syntax highlighted by Code2HTML, v. 0.9.1