#!/usr/bin/env python """ @file controller.py @author Paul Hubbard @date 8/192009 @brief DX cache controller prototype: Orchestration, routing and intelligence. @note ApplicationProtocol, http status codes, as simple as possible. @package ooidx.controller Cache controller """ from twisted.internet import reactor, protocol from twisted.internet.defer import inlineCallbacks, returnValue from attribute_store.client import ASClient from ooidx.fetcher import FetcherClient from ooidx.cache import CacheClient from ooidx.common import routing_keys, parseMsg, baseDapUrl from magnet.preactor import Preactor from magnet.protocol import ClientCreator as MCC from ooici.protocol import Message, ApplicationProtocol from ooidx.common import RequestResponseAP, dumpMessage import simplejson as json import uuid import time import logging class ControllerClient(RequestResponseAP): """ Client-side interface class, presents an RPC interface on top of messaging. Now refactored to use AppProto instead of lineReceiver. Returns AppProtocol messages! """ def dap_get(self, dsetUrl): msg = self._gen_msg('dap_get', url=dsetUrl) return self.makeRequest(msg) def _gen_msg(self, command, url=None, sendTo=None): # Convenience routine to construct an outgoing message msg = Message() msg.setHeader('command', command) if url: msg.setHeader('url', url) if sendTo: msg.setHeader('send_to', sendTo) return msg def stats_get(self): msg = self._gen_msg('get_statistics') return self.makeRequest(msg) def dap_register(self, dsetUrl): msg = self._gen_msg('dap_register', url=dsetUrl) return self.makeRequest(msg) def dap_purge(self, dsetUrl): msg = self._gen_msg('dap_purge', url=dsetUrl) return self.makeRequest(msg) class CacheControllerProtocol(ApplicationProtocol): """ DX cache controller protocol. The brains of the outfit, messaging-driven. """ def nmr(self, msg): # this is silenced if log level is INFO or higher. dumpMessage(msg) # Look for must-exist header try: cmd = msg.getHeader('command') except: logging.exception('Missing command header, ignoring') return """ commands are sent to matching internal method of format cmd_command_name which use self.msg to extract what they need. """ method_name = 'cmd_' + cmd if (hasattr(self, method_name)): self.msg = msg logging.debug('command vector %s found' % cmd) self.method_name() else: logging.error('Command "%s" not found' % cmd) logging.debug('End of message handler') def messageReceived(self, msg): # this is silenced if log level is INFO or higher. dumpMessage(msg) # Look for must-exist header try: cmd = msg.getHeader('command') except: logging.exception('Missing command header, ignoring') return try: dsetUrl = msg.getHeader('url') except: pass # Dispatch if cmd == 'dap_get': self.dap_get(dsetUrl) elif cmd == 'dap_register': self.dap_register(dsetUrl) elif cmd == 'dap_purge': self.cache_purge_dataset(dsetUrl) elif cmd == 'get_statistics': msg = Message(payload=self.factory.get_statistics()) self.sendMessage(msg) else: msg = Message(payload='404 Command not found') self.sendMessage(msg) logging.debug('End of message handler') ######################## def dap_register(self, dsetUrl): logging.debug('Registering dataset "%s"' % dsetUrl) self.fetcher_dap_get_dset(dsetUrl) self.factory._stats_increment('dap_registers') self.sendMessage(Message(payload='200 OK')) @inlineCallbacks def dap_get(self, dsetUrl): """ Proxy is asking for DAP data. We reply to proxy with a routing key and orchestrate the transfer of data with fetcher (or cache) and persister. """ baseUrl = baseDapUrl(dsetUrl) logging.debug('Querying AS for "%s"' % baseUrl) # is it cached already? reply = yield self.query_attr_store(baseUrl) rc, contents = parseMsg(reply) if rc != 200: self.factory._cache_miss() logging.info('Cache miss on dataset "%s". Start single fetch first...' % baseUrl) self.fetcher_dap_get_single(dsetUrl) logging.debug('Starting dataset fetch of "%s"' % baseUrl) self.fetcher_dap_get_dset(baseUrl) else: # Parse value into status:address status, address = contents[0].split(':') logging.info('Dataset "%s", status "%s", address: "%s"' % (baseUrl, status, address)) # if rc in ['no-cache', 'in-progress', 'cached', 'error']: if status == 'cached': """ Gotta tell the cache to send it to the user/subscriber @todo address specific cache instance! """ logging.debug('Sending request to the cache at ' + address) self.cache_get_single(dsetUrl) elif status == 'in-progress': logging.warn('Transfer already in progress, re-fetching single') self.fetcher_get_single(dsetUrl) else: """ Status is error or unknown, re-fetch as less-efficient failsafe. """ logging.error('Unknown cache status "%s", re-fetching' % status) self.fetcher_dap_get_single(dsetUrl) def fetcher_dap_get_dset(self, dsetUrl): """ Tells fetcher to download a dataset """ self.factory._dataset_register() fc = FetcherClient() fc.dap_get_dataset(self.factory.preactor, dsetUrl) def fetcher_dap_get_single(self, dsetUrl): """ Tell fetcher to get a single URL, using a temp address. """ self.factory._cache_miss() self.factory._stats_increment('dap_gets') # @todo Create transient data pipe/address from fetcher to proxy rkey = self.gen_send_temp_address() fc = FetcherClient() fc.get_single_url(self.factory.preactor, dsetUrl, rkey) def cache_get_single(self, url): self.factory._cache_hit() # @todo Create transient data pipe/address from fetcher to proxy rkey = self.gen_send_temp_address() cc = CacheClient() cc.get_single_url(self.factory.preactor, url, rkey) def cache_purge_dataset(self, url): self.factory._dataset_purge() cc = CacheClient() cc.purge_dataset(self.factory.preactor, url) @inlineCallbacks def query_attr_store(self, dsetUrl): self.factory._as_queries() cc = MCC(reactor, self.factory.preactor, ASClient) asc = yield cc.connectMS(routing_keys['attribute_store']) ans = yield asc.read('dataset:%s' % baseDapUrl(dsetUrl)) asc.transport.loseConnection() returnValue(ans) def gen_send_temp_address(self): """ Assuming a connection, generate and send a temporary routing address. """ rkey = str(uuid.uuid4()) logging.debug('Sending address "%s" back to proxy' % rkey) msg = Message(payload=rkey) msg.setHeader('rc', 200) self.sendMessage(msg) return rkey class CCFactory(protocol.ClientFactory): """ Protocol factory for cache controller. Also maintains cross-instance usage statistics. """ def initialize(self, preactor): """ Required init call - save preactor for later use in ClientCreator calls. Also overloaded to add in init calls. """ self.preactor = preactor self._init_stats() def get_statistics(self): """ Returns json-formatted data about uptime, accesses and such. Mainly for the web management front-end. """ self.stats['uptime'] = int(time.time() - self.stats['startTime']) return json.dumps(self.stats) def _init_stats(self): logging.debug('Initializing statistics') self.stats = {} self.stats['startTime'] = time.time() self.stats['uptime'] = 0 self.stats['accesses'] = 0 self.stats['cacheHits'] = 0 self.stats['cacheMisses'] = 0 self.stats['dap_gets'] = 0 self.stats['dap_registers'] = 0 self.stats['dap_purges'] = 0 self.stats['as_queries'] = 0 """ @todo Pull version number from setup.py @todo Brainstorm other metrics to track and return """ self.stats['version'] = '0.1' def _as_queries(self): self._stats_increment('as_queries') def _stats_increment(self, which): self.stats[which] = self.stats[which] + 1 def _dataset_register(self): self._stats_increment('dap_registers') def _dataset_purge(self): self._stats_increment('dap_purges') def _dap_access(self, which): self._stats_increment('accesses') self._stats_increment(which) def _cache_hit(self): self._dap_access('cacheHits') def _cache_miss(self): self._dap_access('cacheMisses') def _stats_dump(self): for field in self.stats.keys(): logging.debug(field + ' : ' + self.stats[field]) protocol = CacheControllerProtocol @inlineCallbacks def main(): logging.basicConfig(level=logging.INFO, \ format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s') preactor = yield Preactor() f = CCFactory() f.initialize(preactor) preactor.listenMS(routing_keys['controller'], f) logging.info('Cache controller running OK') if __name__ == '__main__': main() reactor.run()