#!/usr/bin/env python """ @file cache.py @author Paul Hubbard @date 11/23/09 @brief OOI Data Exchange cache Serves DAP data into AMQP. Based on fetcher.py @package ooidx.cache Does DAP->AMQP function inside the cloud """ import logging import os from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet import protocol from magnet.preactor import Preactor from magnet.protocol import ClientCreator as MCC from attribute_store.client import ASClient from ooidx.common import routing_keys, rewrite_url, simpleSend, generate_local_filename from ooici.protocol import Message from ooidx.fetcher import FetcherProtocol class CacheClient(): """ Messaging-based client code for the cache interface. Just constructs and sends messages to the actual cache. """ def get_single_url(self, preactor, src_url, send_to): """ @brief Request a single http document be sent to a listener @param preactor Preactor instance @param src_url Source URL address @param send_to Destination in broker @retval None """ msg = Message() msg.setHeader('command', 'get_single_url') msg.setHeader('url', src_url) msg.setHeader('send_to', send_to) simpleSend(preactor, msg, routing_keys['cache']) def purge_dataset(self, preactor, src_url): """ @brief Purge a dataset from the system. This means to delete the disk (netcdf) file and also remove it from the attribute store. @note Two operations means a window of incoherency while the delete happens. @param preactor Pocket reactor, used for messaging the AS @param src_url Original URL of dataset to delete """ msg = Message() msg.setHeader('command', 'purge_dataset') msg.setHeader('url', src_url) simpleSend(preactor, msg, routing_keys['cache']) class CacheProtocol(FetcherProtocol): """ Specialization of fetcher protocol - just implements get_single_url and purge. @note Assumes actual DAP server is at localhost:80, with data in the root directory. Derives heavily from fetcher, and just overrides the inner get_document method to rewrite the URL from canonical to local. Everything else is reused! """ @inlineCallbacks def purge_dataset(self, dsUrl): localName = generate_local_filename(dsUrl) logging.info('About to delete "%s", original URL "%s"' % (localName, dsUrl)) try: logging.debug('Removing file') os.remove(localName) logging.debug('File deleted OK') except: logging.exception('Error deleting file, ignoring') try: logging.info('Deleting from attribute store') cc = MCC(reactor, self.factory.preactor, ASClient) asc = yield cc.connectMS(routing_keys['attribute_store']) yield asc.delete('dataset:%s' % dsUrl) logging.info('Deleted OK from attribute store.') # @bug need a specific exception here, as deferred will fail except: logging.exception('Error deleting from attribute store!') logging.critical('Attribute store may have bad status for "%s"' % dsUrl) @inlineCallbacks def get_document(self, dsUrl): """ @brief Main entry point. Get a single URL and send into broker. @note payload shows up as a unicode string and must be converted to ASCII for the twisted web client. @note We parse the hostname out of the URL because the HTTP request will fail if the hostname isn't set. """ srcUrl = dsUrl.encode('ascii') # Rewrite URL to be localhost, root directory # @todo This just rewrites back to amoeba.... need Hyrax newUrl = rewrite_url(srcUrl, newHostname='ooici.net:8001') logging.debug('Starting fetch of "%s" from "%s"' % (srcUrl, newUrl)) # _get_page is inherited from the fetcher, leave as-is msg = yield self._get_page(newUrl) # @todo error handling on get_page? logging.debug('Fetch completed OK.') returnValue(msg) class CPFactory(protocol.ClientFactory): """Protocol factory class for CacheProtocol""" def set_preactor(self, preactor): self.preactor = preactor protocol = CacheProtocol def get_address(self): """ Returns a static address for this instance of the cache software. @todo Get a 'real' amqp address from messaging system, i.e. Magnet """ return 'dx_cache_1' @inlineCallbacks def main(): logging.basicConfig(level=logging.INFO, \ format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s') preactor = yield Preactor() f = CPFactory() f.set_preactor(preactor) preactor.connectSimpleConsumer(routing_keys['cache'], f) logging.info('Cache front end running OK') if __name__ == '__main__': main() reactor.run()