#!/usr/bin/env python import sys import os try: import json except ImportError: import simplejson as json import urllib2 from twisted.internet import reactor from twisted.internet import defer from twisted.internet.defer import inlineCallbacks from twisted.internet import protocol from twisted.protocols import basic from twisted.python import log from twisted.application import service from magnet.protocol import ClientCreator from magnet.protocol import LogProtocol from cpe.scheduler.deploy import DeployBoard, ProvisionerClient from cpe.scheduler.events import EventBoard, EventClient WORK_CONSUMER_TYPE = "work_consumer" WORK_PRODUCER_TYPE = "work_producer" BROKER_TYPE = "sensor_aggregator" # @todo supposed to get these from attribute store DEPLOY_TYPES = [WORK_CONSUMER_TYPE, WORK_PRODUCER_TYPE, BROKER_TYPE] CLOUD_NAME = "nimbus" CONFIG_URL = "http://amoeba.ucsd.edu/cpe/datastore/aminames" CPE_BROKER = "amoeba.ucsd.edu" def get(url): resource = url req = urllib2.Request(resource) return urllib2.urlopen(req).read() dataLogger = log.LogPublisher() dataLog = dataLogger.msg class TestPolicy(object): def __init__(self, eventBoard, deployBoard, deployClient): """ Policies have three APIs to work with: @param eventBoard provides notification of sensor inputs @param deployBoard provides state and notification of provisioning environment @param deployClient provides functionality to start/stop instances """ self.eventBoard = eventBoard self.deployBoard = deployBoard self.deployClient = deployClient self.broker_data = {"bootstrap" : "role=%s CPE_BROKER=%s" %(BROKER_TYPE, CPE_BROKER)} # these are filled in by the broker launch callback self.consumer_data = {} self.producer_data = {} self.queueSizeThreshold = 100 self.queueSizeLowThreshold = 80 self.minWorkConsumers = 1 self.maxWorkConsumers = 15 self.minWorkProducers = 1 self.maxWorkProducers = 5 def setup(self): self.eventBoard.registerCallback("queue_size", self.queueSizeCallback) brokerDeploy = self.deployClient.getDeploy(BROKER_TYPE) brokerDeploy.registerStartCallback(self.brokerStartedCallback) self.deployClient.startInstances(BROKER_TYPE, 1, self.broker_data) def queueSizeCallback(self, event): log.msg("got queue size event: "+ str(event)) try: size = int(event["size"]) except ValueError: log.msg("got invalid queue_size event") return self.logState(size) deploy = self.deployClient.getDeploy(WORK_CONSUMER_TYPE) if (size >= self.queueSizeThreshold): if deploy.getNumberPending() == 0: if self.maxWorkConsumers > deploy.getNumberRunning(): log.msg("over threshold, starting an instance") self.deployClient.startInstances(WORK_CONSUMER_TYPE, 1, self.consumer_data) else: log.msg("over threshold but the max number of instances are already running") else: log.msg("over threshold but there are instances already starting up") elif (size < self.queueSizeLowThreshold): if deploy.getNumberTerminating() == 0: if self.minWorkConsumers < deploy.getNumberRunning(): log.msg("under threshold, terminating an instance") inst = [deploy.running.keys()[0]] self.deployClient.stopInstances(inst) else: log.msg("under low threshold but the min number of instances are running") else: log.msg("under low threshold but there are instances already terminating") def brokerStartedCallback(self, instance): """ Prototype: assume this is the first and only broker first starting up...start up a work producer and a work consumer. """ log.msg("broker started. starting initial work consumer and work producer instances") self.consumer_data = {"bootstrap" : "role=%s WORK_BROKER=%s" % (WORK_CONSUMER_TYPE, instance.host)} self.producer_data = {"bootstrap" : "role=%s WORK_BROKER=%s" % (WORK_PRODUCER_TYPE, instance.host)} self.deployClient.startInstances(WORK_PRODUCER_TYPE, 1, self.producer_data) self.deployClient.startInstances(WORK_CONSUMER_TYPE, 1, self.consumer_data) def brokerStoppedCallback(self, instance): """ If the broker goes down, stop all producers and consumers. """ def workproducerStartedCallback(self, instance): """ Do something when a work producer starts """ def workproducerStoppedCallback(self, instance): """ Make sure the min number of producers are running if the broker is running. """ def workconsumerStartedCallback(self, instance): """ Do somethinf if a work consumer starts """ def workconsumerStoppedCallback(self, instance): """ Make sure the min number of work consumers are running if the broker is running. """ def logState(self, queue_size): deploy = self.deployClient.getDeploy(WORK_CONSUMER_TYPE) state = {"queue_size" : queue_size, "running" : deploy.getNumberRunning(), "pending" : deploy.getNumberPending(), "terminating" : deploy.getNumberTerminating()} dataLog(json.dumps(state)) @inlineCallbacks def main(): from magnet.preactor import Preactor preactor = yield Preactor() deployBoard = DeployBoard(deployTypes=DEPLOY_TYPES, clouds=[CLOUD_NAME]) provClientCreator = ClientCreator(reactor, preactor, ProvisionerClient, deployBoard, CLOUD_NAME) provClient = yield provClientCreator.connectMS('actions-ec2') eventBoard = EventBoard() # uh there is no sensor aggregator service to attach to right now. # let's fake it for now! # Update: Now there is! No more of that old faking! Now, new faking! eventClientCreator = ClientCreator(reactor, preactor, EventClient, eventBoard) # eventClient = yield eventClientCreator.connectWorkConsumer('events') eventClient = yield eventClientCreator.connectSimpleConsumer('events') policy = TestPolicy(eventBoard=eventBoard, deployBoard=deployBoard, deployClient=provClient) policy.setup() log_context = "scheduler" LogProtocol.log_context = log_context log_client_creator = ClientCreator(reactor, preactor, LogProtocol) log_client = yield log_client_creator.connectSimpleProducer('log') log.addObserver(log_client.sendLog) data_log_client_creator = ClientCreator(reactor, preactor, LogProtocol) data_log_client = yield log_client_creator.connectSimpleProducer('cpe-data') dataLogger.addObserver(data_log_client.sendLog) application = service.Application('scheduler') main()