#!/usr/bin/env python """ @file persister.py @author Paul Hubbard @date 8/20/09 @brief This is the AMQP persister, that listens to the broker and persists datasets to local netcdf files. @package ooidx.persister Handles saving DAP data to disk as netcdf files, for subsequent use as cache material. It's a pure AMQP listener, though it does send messages for attribute store updates and dataset events. Current design calls for it to be paired with a cache instance on the same filesystem. """ import sys import logging import uuid from urlparse import urlsplit, urlunsplit import simplejson as json from pydap.model import BaseType, SequenceType from pydap.proxy import ArrayProxy, SequenceProxy, VariableProxy from pydap.parsers.dds import DDSParser from pydap.parsers.das import DASParser from pydap.xdr import DapUnpacker from pydap.lib import walk, fix_slice, parse_qs, fix_shn from pydap.responses import netcdf from twisted.web import client from twisted.internet import protocol, reactor from twisted.internet.defer import inlineCallbacks from magnet.preactor import Preactor from magnet.protocol import ClientCreator as MCC from ooici.protocol import Message, ApplicationProtocol from attribute_store.client import ASClient from ooidx.common import routing_keys, simpleSend, generate_local_filename, rewrite_url from ooidx.pub_sub import PubSubClient import cgitb cgitb.enable(format='text') class PersisterProtocol(ApplicationProtocol): """ New interface to persister, refactoring using OOICI AppProto and Message. Instead of vectoring based on first token, use (required) 'command' header to route the messages. Current commands/protocol: * dap_dataset Receive a DAP-format dataset. required format is 'source_url' with original URL, simple string 'das' header with DAS, json-encoded 'dds' header with DDS, json-encoded DODS data as payload, no encoding * get_stats return per-instance stats, uptime, etc as JSON data * ping Return a possibly-empty message, used for liveness checks """ def messageReceived(self, msg): logging.debug('keys in dataset: ' + str(msg.getHeaderKeys())) try: cmd = msg.getHeader('command') except KeyError: logging.error('Missing command field in message') reply = Message(payload='400 Missing command parameter') self.sendMessage(reply) return if cmd == 'get_stats': logging.debug('Got stats request') reply = Message(payload='500 Stats not implmented') self.sendMessage(reply) return if cmd == 'ping': logging.debug('Got a ping request') reply = Message(payload='200 Requester OK') self.sendMessage(reply) return logging.debug('Checking for das/dds headers...') try: src = msg.getHeader('source_url') das = msg.getHeader('das') dds = msg.getHeader('dds') except KeyError: logging.warn('No dataset headers found, ignoring message.') return logging.debug('DAS snippet: ' + das[:80]) logging.debug('DDS snippet: ' + dds[:80]) logging.info('Dataset headers found, attempting to persist dataset') try: self.save_dataset(msg) logging.info('save complete, updating attribute store') self.set_dset_status_cached(src) logging.info('generating event for new dataset') self.generate_new_ds_event(src) logging.info('dataset "%s" persist complete' % src) except: # @todo same prob here w/PEP8 logging.exception('Error saving dataset, attempting to flag in attribute store') self.set_dset_status_error(src) def save_dataset(self, msg): """ Given a single-message dataset, persist to disk. Notes: * Filename is derived from dataset URL * das, dds and source_url headers as documented in messageReceived * Any exceptions should be caught by caller. @note Relies on a single message fitting in memory comfortably. """ # Metadata is json-encoded to preserve multi-line content dds = json.loads(msg.getHeader('dds')) das = json.loads(msg.getHeader('das')) logging.debug('dds: ' + dds) logging.debug('das: ' + das) # Tell pydap to create data structures from metadata dataset = DDSParser(dds).parse() dataset = DASParser(das, dataset).parse() """ Tag global attributes with cache info @todo Design decision - what goes into per-file metadata? @note This is purely OOI code - not pydap at all. """ dataset.attributes['NC_GLOBAL']['ooi-download-timestamp'] = time.time() dataset.attributes['NC_GLOBAL']['ooi-source-url'] = msg.getHeader('source_url') """ Back to pydap code - this block is from open_url in client.py Remove any projections from the url, leaving selections. """ scheme, netloc, path, query, fragment = urlsplit(msg.getHeader('source_url')) projection, selection = parse_qs(query) url = urlunsplit( (scheme, netloc, path, '&'.join(selection), fragment)) # Set data to a Proxy object for BaseType and SequenceType. These # variables can then be sliced to retrieve the data on-the-fly. for var in walk(dataset, BaseType): var.data = ArrayProxy(var.id, url, var.shape) for var in walk(dataset, SequenceType): var.data = SequenceProxy(var.id, url) # Apply the corresponding slices. projection = fix_shn(projection, dataset) for var in projection: target = dataset while var: token, slice_ = var.pop(0) target = target[token] if slice_ and isinstance(target.data, VariableProxy): shape = getattr(target, 'shape', (sys.maxint,)) target.data._slice = fix_slice(slice_, shape) # This block is from open_dods in client.py dds, xdrdata = msg.payload.split('\nData:\n', 1) dataset.data = DapUnpacker(xdrdata, dataset).getvalue() # OK, ready to save. filename = generate_local_filename(msg.getHeader('source_url')) logging.info('Saving %s to filename %s' % (msg.getHeader('source_url'), filename)) netcdf.save(dataset, filename) def generate_new_ds_event(self, source_url): # Generate and send a new-dataset event evt = Message() evt.setHeader('command', 'event') evt.setHeader('event_type', 'new_dataset') evt.setHeader('event_id', str(uuid.uuid4())[:16]) evt.setHeader('source_url', source_url) evt.setHeader('ooi_url', rewrite_url(source_url, newHostname='localhost:8001')) evt.setHeader('userid', 'otto niemand') evt.setHeader('auth_token','nonesuch') evt.payload = '' simpleSend(self.factory.preactor, evt, routing_keys['notification']) def get_cache_address(self): """ Return our 'address' such that cached datasets are marked with same. Used to decide which cache server/service to talk to. Not fully understood yet. """ return 'dx_cache_1' @inlineCallbacks def set_dset_status_cached(self, dset): yield self.set_dset_status(dset, 'cached') @inlineCallbacks def set_dset_status_error(self, dset): yield self.set_dset_status(dset, 'error') @inlineCallbacks def set_dset_status(self, dset, status_str): """ Set dataset status in attribute store """ status = status_str + ':' + self.get_cache_address() logging.info('Updating attribute store to status "%s"' % status) cc = MCC(reactor, self.factory.preactor, ASClient) asc = yield cc.connectMS(routing_keys['attribute_store']) yield asc.write('dataset:%s' % dset, status) class DapClient(client.HTTPPageGetter): """ Subclass HTTPClient and stub out the portions that transmit. We just receive. Used for receiving pages from persister. """ def connectionMade(self): client.HTTPPageGetter.connectionMade(self) self.factory.transport = self.transport def sendCommand(self, command, path): pass def sendHeader(self, name, value): pass def endHeaders(self): pass class DapClientFactory(client.HTTPClientFactory): """ http://twistedmatrix.com/trac/browser/tags/releases/twisted-8.2.0/twisted/web/client.py """ protocol = DapClient class PersisterProtoFactory(protocol.ClientFactory): @inlineCallbacks def startUp(self, preactor): """ New distributor requires us to submit a wildcard subscription so that we'll automatically get all the datasets sent to us. For a first pass, just subscribe using our ooid.common.routingKeys address. """ self.preactor = preactor client = MCC(reactor, preactor, PubSubClient) mc = yield client.connectMS(routing_keys['pub_sub']) yield mc.subscribe('.+?', routing_keys['persister']) mc.transport.loseConnection() protocol = PersisterProtocol @inlineCallbacks def main(): logging.basicConfig(level=logging.INFO, \ format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s') preactor = yield Preactor() f = PersisterProtoFactory() f.startUp(preactor) preactor.connectSimpleConsumer(routing_keys['persister'], f) logging.info('Persister running OK') if __name__ == '__main__': import time time.sleep(5) main() reactor.run()