bes  Updated for version 3.20.8
GlobalMetadataStore.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2018 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <fcntl.h> // for posix_advise
28 #include <unistd.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <string>
35 #include <fstream>
36 #include <sstream>
37 #include <functional>
38 #include <DAS.h>
39 #include <memory>
40 #include <sys/stat.h>
41 
42 #include <DapObj.h>
43 #include <DDS.h>
44 #include <DMR.h>
45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
49 
50 #include "PicoSHA2/picosha2.h"
51 
52 #include "TempFile.h"
53 #include "TheBESKeys.h"
54 #include "BESUtil.h"
55 #include "BESLog.h"
56 #include "BESContextManager.h"
57 #include "BESDebug.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
61 
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
64 
65 #include "GlobalMetadataStore.h"
66 
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69 
70 #ifdef HAVE_ATEXIT
71 #define AT_EXIT(x) atexit((x))
72 #else
73 #define AT_EXIT(x)
74 #endif
75 
85 #undef SYMETRIC_ADD_RESPONSES
86 
87 #define prolog std::string("GlobalMetadataStore::").append(__func__).append("() - ")
88 
89 using namespace std;
90 using namespace libdap;
91 using namespace bes;
92 
93 static const unsigned int default_cache_size = 20; // 20 GB
94 static const string default_cache_prefix = "mds";
95 static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
96 static const string default_ledger_name = "mds_ledger.txt";
97 
98 static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
99 static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
100 static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
101 static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
102 static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
103 
104 GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
105 bool GlobalMetadataStore::d_enabled = true;
106 
121 void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
122 {
123  static const int BUFFER_SIZE = 16*1024;
124 
125 #if _POSIX_C_SOURCE >= 200112L
126  /* Advise the kernel of our access pattern. */
127  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
128  if (status != 0)
129  ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
130 #endif
131 
132  char buf[BUFFER_SIZE + 1];
133 
134  while(int bytes_read = read(fd, buf, BUFFER_SIZE))
135  {
136  if(bytes_read == -1)
137  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
138  if (!bytes_read)
139  break;
140 
141  os.write(buf, bytes_read);
142  }
143 }
144 
157 void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
158 {
159  static const int BUFFER_SIZE = 1024;
160 
161 #if _POSIX_C_SOURCE >= 200112L
162  /* Advise the kernel of our access pattern. */
163  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
164  if (status != 0)
165  ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
166 #endif
167 
168  char buf[BUFFER_SIZE + 1];
169  size_t bytes_read = read(fd, buf, BUFFER_SIZE);
170 
171  if(bytes_read == (size_t)-1)
172  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
173 
174  if (bytes_read == 0)
175  return;
176 
177  // Every valid DMR/++ response in the MDS starts with:
178  // <?xml version="1.0" encoding="ISO‌-8859-1"?>
179  //
180  // and has one of two kinds of <Dataset...> tags
181  // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
182  // 2: <Dataset xmlns="..." ... >
183  //
184  // Assume it is well formed and always includes the prolog,
185  // but might not use <CR> <CRLF> chars
186 
187  // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
188  size_t i = 0;
189  while (buf[i++] != '>')
190  ; // 'i' now points one char past the xml prolog
191  os.write(buf, i);
192 
193  // transfer <Dataset ...> with new value for xml:base
194  size_t s = i; // start of <Dataset ...>
195  size_t j = 0;
196  char xml_base_literal[] = "xml:base";
197  while (i < bytes_read) {
198  if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
199  os.write(buf + s, i - s);
200  os << " xml:base=\"" << xml_base << "\"";
201  break;
202  }
203  else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
204  os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
205  while (buf[i++] != '=')
206  ; // read/discard '="..."'
207  while (buf[i++] != '"')
208  ;
209  while (buf[i++] != '"')
210  ;
211  os << "=\"" << xml_base << "\""; // write the new xml:base value
212  break;
213  }
214  else if (buf[i] == xml_base_literal[j]) {
215  ++j;
216  }
217  else {
218  j = 0;
219  }
220 
221  ++i;
222  }
223 
224  // transfer the rest
225  os.write(buf + i, bytes_read - i);
226 
227  // Now, if the response is more than 1k, use faster code to finish the tx
228  transfer_bytes(fd, os);
229 }
230 
231 unsigned long GlobalMetadataStore::get_cache_size_from_config()
232 {
233  bool found;
234  string size;
235  unsigned long size_in_megabytes = default_cache_size;
236  TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
237  if (found) {
238  BESDEBUG(DEBUG_KEY,
239  "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
240  istringstream iss(size);
241  iss >> size_in_megabytes;
242  }
243 
244  return size_in_megabytes;
245 }
246 
247 string GlobalMetadataStore::get_cache_prefix_from_config()
248 {
249  bool found;
250  string prefix = default_cache_prefix;
251  TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
252  if (found) {
253  BESDEBUG(DEBUG_KEY,
254  "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
255  prefix = BESUtil::lowercase(prefix);
256  }
257 
258  return prefix;
259 }
260 
261 // If the cache prefix is the empty string, the cache is turned off.
262 string GlobalMetadataStore::get_cache_dir_from_config()
263 {
264  bool found;
265 
266  string cacheDir = default_cache_dir;
267  TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
268  if (found) {
269  BESDEBUG(DEBUG_KEY,
270  "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
271  }
272 
273  return cacheDir;
274 }
275 
290 
308 GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
309 {
310  if (d_enabled && d_instance == 0) {
311  d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
312  d_enabled = d_instance->cache_enabled();
313  if (!d_enabled) {
314  delete d_instance;
315  d_instance = 0;
316 
317  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
318  }
319  else {
320  AT_EXIT(delete_instance);
321 
322  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
323  }
324  }
325 
326  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
327 
328  return d_instance;
329 }
330 
338 GlobalMetadataStore::get_instance()
339 {
340  if (d_enabled && d_instance == 0) {
341  d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
342  get_cache_size_from_config());
343  d_enabled = d_instance->cache_enabled();
344  if (!d_enabled) {
345  delete d_instance;
346  d_instance = NULL;
347  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
348  }
349  else {
350  AT_EXIT(delete_instance);
351 
352  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
353  }
354  }
355 
356  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
357 
358  return d_instance;
359 }
361 
365 void
366 GlobalMetadataStore::initialize()
367 {
368  bool found;
369 
370  TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
371  if (found) {
372  BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
373  }
374  else {
375  d_ledger_name = default_ledger_name;
376  }
377 
378  ofstream of(d_ledger_name.c_str(), ios::app);
379 
380  // By default, use UTC in the logs.
381  string local_time = "no";
382  TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
383  d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
384 }
385 
401 GlobalMetadataStore::GlobalMetadataStore()
402  : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
403 {
404  initialize();
405 }
406 
407 GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
408  unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
409 {
410  initialize();
411 }
413 
418 static void dump_time(ostream &os, bool use_local_time)
419 {
420  time_t now;
421  time(&now);
422  char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
423  int status = 0;
424 
425  // From StackOverflow:
426  // This will work too, if your compiler doesn't support %F or %T:
427  // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
428  //
429  // Apologies for the twisted logic - UTC is the default. Override to
430  // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
431  if (!use_local_time)
432  status = strftime(buf, sizeof buf, "%FT%T%Z", gmtime(&now));
433  else
434  status = strftime(buf, sizeof buf, "%FT%T%Z", localtime(&now));
435 
436  if (!status)
437  ERROR_LOG(prolog << "Error getting time for Metadata Store ledger.");
438 
439  os << buf;
440 }
441 
445 void
447 {
448  // open just once, <- done SBL 11.7.19
449 
450  int fd; // value-result parameter;
451  if (get_exclusive_lock(d_ledger_name, fd)) {
452  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
453  if (of) {
454  try {
455  dump_time(of, d_use_local_time);
456  of << " " << d_ledger_entry << endl;
457  VERBOSE("MDS Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
458  unlock_and_close(d_ledger_name); // closes fd
459  }
460  catch (...) {
461  unlock_and_close(d_ledger_name);
462  throw;
463  }
464  }
465  else {
466  ERROR_LOG(prolog << "Warning: Metadata store could not write to its ledger file.");
467  unlock_and_close(d_ledger_name);
468  }
469  }
470  else {
471  throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
472  }
473 }
474 
481 string
482 GlobalMetadataStore::get_hash(const string &name)
483 {
484  if (name.empty())
485  throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
486 
487  return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
488 }
489 
507 {
508  if (d_dds) {
509  D4BaseTypeFactory factory;
510  DMR dmr(&factory, *d_dds);
511  XMLWriter xml;
512  dmr.print_dap4(xml);
513  os << xml.get_doc();
514  }
515  else if (d_dmr) {
516  XMLWriter xml;
517  d_dmr->print_dap4(xml);
518  os << xml.get_doc();
519  }
520  else {
521  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
522  }
523 }
524 
527  if (d_dds)
528  d_dds->print(os);
529  else if (d_dmr)
530  d_dmr->getDDS()->print(os);
531  else
532  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
533 }
534 
537  if (d_dds)
538  d_dds->print_das(os);
539  else if (d_dmr)
540  d_dmr->getDDS()->print_das(os);
541  else
542  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
543 }
544 
559 bool
560 GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
561  const string &response_name)
562 {
563  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
564 
565  string item_name = get_cache_file_name(key, false /*mangle*/);
566 
567  int fd;
568  if (create_and_lock(item_name, fd)) {
569  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
570 
571  // Get an output stream directed at the locked cache file
572  ofstream response(item_name.c_str(), ios::out|ios::app);
573  if (!response.is_open())
574  throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
575 
576  try {
577  // for the different writers, look at the StreamDAP struct in the class
578  // definition. jhrg 2.27.18
579  writer(response); // different writers can write the DDS, DAS or DMR
580 
581  // Compute/update/maintain the cache size? This is extra work
582  // that might never be used. It also locks the cache...
583  if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
584  // This enables the call to update_cache_info() below.
586 
587  unsigned long long size = update_cache_info(item_name);
588  if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
589  }
590 
591  unlock_and_close(item_name);
592  }
593  catch (...) {
594  // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
595  response.close();
596  this->purge_file(item_name);
597  unlock_and_close(item_name);
598  throw;
599  }
600 
601  VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
602  d_ledger_entry.append(" ").append(key);
603 
604  return true;
605  }
606  else if (get_read_lock(item_name, fd)) {
607  // We found the key; didn't add this because it was already here
608  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
609  unlock_and_close(item_name);
610 
611  ERROR_LOG(prolog << "Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
612 
613  return false;
614  }
615  else {
616  throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
617  }
618 }
619 
634 
646 bool
647 GlobalMetadataStore::add_responses(DDS *dds, const string &name)
648 {
649  // Start the index entry
650  d_ledger_entry = string("add DDS ").append(name);
651 
652  // I'm appending the 'dds r' string to the name before hashing so that
653  // the different hashes for the file's DDS, DAS, ..., are all very different.
654  // This will be useful if we use S3 instead of EFS for the Metadata Store.
655  //
656  // The helper() also updates the ledger string.
657  StreamDDS write_the_dds_response(dds);
658  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
659 
660  StreamDAS write_the_das_response(dds);
661  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
662 
663 #if SYMETRIC_ADD_RESPONSES
664  StreamDMR write_the_dmr_response(dds);
665  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
666 #endif
667 
668  write_ledger(); // write the index line
669 
670 #if SYMETRIC_ADD_RESPONSES
671  return (stored_dds && stored_das && stored_dmr);
672 #else
673  return (stored_dds && stored_das);
674 #endif
675 }
676 
688 bool
689 GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
690 {
691  // Start the index entry
692  d_ledger_entry = string("add DMR ").append(name);
693 
694  // I'm appending the 'dds r' string to the name before hashing so that
695  // the different hashes for the file's DDS, DAS, ..., are all very different.
696  // This will be useful if we use S3 instead of EFS for the Metadata Store.
697  //
698  // The helper() also updates the ledger string.
699 #if SYMETRIC_ADD_RESPONSES
700  StreamDDS write_the_dds_response(dmr);
701  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
702 
703  StreamDAS write_the_das_response(dmr);
704  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
705 #endif
706 
707  StreamDMR write_the_dmr_response(dmr);
708  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
709 
710  write_ledger(); // write the index line
711 
712 #if SYMETRIC_ADD_RESPONSES
713  return (stored_dds && stored_das && stored_dmr);
714 #else
715  return(stored_dmr /* && stored_dmrpp */);
716 #endif
717 }
719 
733 GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
734 {
735  BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
736 
737  if (name.empty())
738  throw BESInternalError("An empty name string was received by "
739  "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
740 
741  string item_name = get_cache_file_name(get_hash(name + suffix), false);
742  int fd;
743  MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
744  BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
745 
746  if (lock())
747  INFO_LOG(prolog << "MDS Cache hit for '" << name << "' and response " << object_name << endl);
748  else
749  INFO_LOG(prolog << "MDS Cache miss for '" << name << "' and response " << object_name << endl);
750 
751  return lock;
752  }
753 
774 {
775  return get_read_lock_helper(name,"dmr_r","DMR");
776 }//end is_dmr_available(string)
777 
780 {
781  //call get_read_lock_helper
782  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
783  if (lock()){
784 
785  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
786 
787  if(reload){
788  lock.clearLock();
789  return lock;
790  }//end if
791  else{
792  return lock;
793  }//end else
794 
795  }//end if(is locked)
796  else{
797  return lock;
798  }//end else
799 
800 }//end is_dmr_available(BESContainer)
801 
803 GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
804 {
805  //call get_read_lock_helper
806  MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
807  if (lock()){
808 
809  bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
810 
811  if(reload){
812  lock.clearLock();
813  return lock;
814  }//end if
815  else{
816  return lock;
817  }//end else
818 
819  }//end if(is locked)
820  else{
821  return lock;
822  }//end else
823 
824 }//end is_dmr_available(string, string, string)
825 
835 {
836  return get_read_lock_helper(name,"dds_r","DDS");
837 }//end is_dds_available(string)
838 
851 {
852  //call get_read_lock_helper
853  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
854  if (lock()){
855 
856  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
857 
858  if(reload){
859  lock.clearLock();
860  return lock;
861  }//end if
862  else{
863  return lock;
864  }//end else
865 
866  }//end if(is locked)
867  else{
868  return lock;
869  }//end else
870 
871 }//end is_dds_available(BESContainer)
872 
882 {
883  return get_read_lock_helper(name,"das_r","DAS");
884 }//end is_das_available(string)
885 
888 {
889  //return get_read_lock_helper(name, "das_r", "DAS");
890  //call get_read_lock_helper
891  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
892  if (lock()){
893 
894  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
895 
896  if(reload){
897  lock.clearLock();
898  return lock;
899  }//end if
900  else{
901  return lock;
902  }//end else
903 
904  }//end if(is locked)
905  else{
906  return lock;
907  }//end else
908 
909 }//end is_das_available(BESContainer)
910 
931 {
932  return get_read_lock_helper(name,"dmrpp_r","DMR++");
933 }//end is_dmrpp_available(string)
934 
937 {
938  //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
939  //call get_read_lock_helper
940  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
941  if (lock()){
942 
943  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
944 
945  if(reload){
946  lock.clearLock();
947  return lock;
948  }//end if
949  else{
950  return lock;
951  }//end else
952 
953  }//end if(is locked)
954  else{
955  return lock;
956  }//end else
957 
958 }//end is_dmrpp_available(BESContainer)
959 
970 bool
971 GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
972 {
973  //use type with find_handler() to get handler
974  BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
975 
976  //use handler.get_lmt()
977  time_t file_time = besRH->get_lmt(realName);
978 
979  //get the cache time of the handler
980  time_t cache_time = get_cache_lmt(relativeName, suffix);
981 
982  //compare file lmt and time of creation of cache
983  if (file_time > cache_time){
984  return true;
985  }//end if(file > cache)
986  else {
987  return false;
988  }//end else
989 }
990 
998 time_t
999 GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
1000 {
1001  string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1002  struct stat statbuf;
1003 
1004  if (stat(item_name.c_str(), &statbuf) == -1){
1005  throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1006  }//end if(error)
1007 
1008  return statbuf.st_mtime;
1009 }//end get_cache_lmt()
1010 
1013 
1021 void
1022 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1023 {
1024  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1025  int fd; // value-result parameter;
1026  if (get_read_lock(item_name, fd)) {
1027  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1028  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1029  try {
1030  transfer_bytes(fd, os);
1031  unlock_and_close(item_name); // closes fd
1032  }
1033  catch (...) {
1034  unlock_and_close(item_name);
1035  throw;
1036  }
1037  }
1038  else {
1039  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1040  }
1041 }
1042 
1051 void
1052 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1053  const string &object_name)
1054 {
1055  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1056  int fd; // value-result parameter;
1057  if (get_read_lock(item_name, fd)) {
1058  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1059  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1060  try {
1061  insert_xml_base(fd, os, xml_base);
1062 
1063  transfer_bytes(fd, os);
1064  unlock_and_close(item_name); // closes fd
1065  }
1066  catch (...) {
1067  unlock_and_close(item_name);
1068  throw;
1069  }
1070  }
1071  else {
1072  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1073  }
1074 }
1076 
1083 void
1084 GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1085 {
1086  write_response_helper(name, os, "dds_r", "DDS");
1087 }
1088 
1095 void
1096 GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1097 {
1098  write_response_helper(name, os, "das_r", "DAS");
1099 }
1100 
1107 void
1108 GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1109 {
1110  bool found = false;
1111  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1112  if (!found) {
1113 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1114  write_response_helper(name, os, "dmr_r", "DMR");
1115 #else
1116  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1117 #endif
1118  }
1119  else {
1120  write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1121  }
1122 }
1123 
1130 void
1131 GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1132 {
1133  bool found = false;
1134  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1135  if (!found) {
1136 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1137  write_response_helper(name, os, "dmrpp_r", "DMR++");
1138 #else
1139  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1140 #endif
1141  }
1142  else {
1143  write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1144  }
1145 }
1146 
1154 bool
1155 GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1156 {
1157  string hash = get_hash(name + suffix);
1158  if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1159  VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
1160  d_ledger_entry.append(" ").append(hash);
1161  return true;
1162  }
1163  else {
1164  ERROR_LOG(prolog << "Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
1165  }
1166 
1167  return false;
1168 }
1169 
1176 bool
1178 {
1179  // Start the index entry
1180  d_ledger_entry = string("remove ").append(name);
1181 
1182  bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1183 
1184  bool removed_das = remove_response_helper(name, "das_r", "DAS");
1185 
1186  bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1187 
1188  bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1189 
1190  write_ledger(); // write the index line
1191 
1192 #if SYMETRIC_ADD_RESPONSES
1193  return (removed_dds && removed_das && removed_dmr);
1194 #else
1195  return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1196 #endif
1197 }
1198 
1211 DMR *
1213 {
1214  stringstream oss;
1215  write_dmr_response(name, oss); // throws BESInternalError if not found
1216 
1217  D4BaseTypeFactory d4_btf;
1218  auto_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1219 
1220  D4ParserSax2 parser;
1221  parser.intern(oss.str(), dmr.get());
1222 
1223  dmr->set_factory(0);
1224 
1225  return dmr.release();
1226 }
1227 
1249 DDS *
1251 {
1252  TempFile dds_tmp(get_cache_directory() + "/opendapXXXXXX");
1253 
1254  fstream dds_fs(dds_tmp.get_name().c_str(), std::fstream::out);
1255  try {
1256  write_dds_response(name, dds_fs); // throws BESInternalError if not found
1257  dds_fs.close();
1258  }
1259  catch (...) {
1260  dds_fs.close();
1261  throw;
1262  }
1263 
1264  BaseTypeFactory btf;
1265  auto_ptr<DDS> dds(new DDS(&btf));
1266  dds->parse(dds_tmp.get_name());
1267 
1268  TempFile das_tmp(get_cache_directory() + "/opendapXXXXXX");
1269  fstream das_fs(das_tmp.get_name().c_str(), std::fstream::out);
1270  try {
1271  write_das_response(name, das_fs); // throws BESInternalError if not found
1272  das_fs.close();
1273  }
1274  catch (...) {
1275  das_fs.close();
1276  throw;
1277  }
1278 
1279  auto_ptr<DAS> das(new DAS());
1280  das->parse(das_tmp.get_name());
1281 
1282  dds->transfer_attributes(das.get());
1283  dds->set_factory(0);
1284 
1285  return dds.release();
1286 }
1287 
1288 
1289 void
1290 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1291  string suffix = "das_r";
1292  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1293  int fd; // value-result parameter;
1294  if (get_read_lock(item_name, fd)) {
1295  VERBOSE("Metadata store: Cache hit: read " << " response for '" << name << "'." << endl);
1296  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1297  try {
1298  // Just generate the DAS by parsing from the file
1299  das->parse(item_name);
1300  unlock_and_close(item_name); // closes fd
1301  }
1302  catch (...) {
1303  unlock_and_close(item_name);
1304  throw;
1305  }
1306  }
1307  else {
1308  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1309  }
1310 
1311 }
1312 
A container is something that holds data. E.G., a netcdf file or a database entry.
Definition: BESContainer.h:65
std::string get_relative_name() const
Get the relative name of the object in this container.
Definition: BESContainer.h:186
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
Definition: BESContainer.h:232
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Definition: BESContainer.h:180
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
Definition: BESUtil.cc:200
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:339
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:71
Store the DAP metadata responses.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
std::string get_hash(const std::string &name)
static void transfer_bytes(int fd, std::ostream &os)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Get a new temporary file.
Definition: TempFile.h:46
std::string get_name() const
Definition: TempFile.h:70
Unlock and close the MDS item when the ReadLock goes out of scope.
Instantiate with a DDS or DMR and use to write the DAS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DMR response.
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.