#!/usr/bin/env python """ @file ooidx/distributor.py @author Paul Hubbard @date 12-15-0 @package ooidx.distributor User-level fanout exchange service for data exchange. @brief Pull address lists from attribute store, send message to everyone on the list. """ import logging import simplejson as json from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks from twisted.internet import protocol from magnet.preactor import Preactor from ooidx.common import routing_keys, simpleSend from ooici.protocol import ApplicationProtocol class DistributorProtocol(ApplicationProtocol): """ Implements the distributor. Protocol: Header 'command' must contain 'send_to_list' Header 'dest_list' should be a json-encoded list of destination addresses, which cannot be empty. @note Stateless and simple, just a placeholder for the real AMQP-based mcast. """ def messageReceived(self, msg): logging.debug(msg.getHeaderKeys()) try: cmd = msg.getHeader('command') except KeyError: logging.error('Missing command field in message') return if cmd == 'send_to_list': logging.info('Got broadcast request') self.send_to_list(msg) return # Unknown commands caught here logging.warn('Unknown command ' + cmd) def send_to_list(self, msg): #Send-to list is json-encoded, unpack. try: dest_list = json.loads(msg.getHeader('dest_list')) logging.debug('list is ' + str(dest_list)) except: logging.exception('Error parsing destination list!') return for send_to in dest_list: logging.debug('Sending to %s...' % send_to) """ Just send the same message - receivers should not even notice the extra dest_list header. """ simpleSend(self.factory.preactor, msg, send_to) logging.debug('Send completed') logging.info('Broadcast complete') class DistributorProtocolFactory(protocol.ClientFactory): """ Protocol factor for the distributor; as is the common ooidx pattern all it does is set the protocol pointer and save a reference to the pocket reactor. """ def set_preactor(self, preactor): self.preactor = preactor protocol = DistributorProtocol @inlineCallbacks def main(): logging.basicConfig(level=logging.DEBUG, \ format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s') preactor = yield Preactor() f = DistributorProtocolFactory() f.set_preactor(preactor) preactor.connectSimpleConsumer(routing_keys['distributor'], f) logging.info('Distributor running OK') if __name__ == '__main__': main() reactor.run()