|
@@ -143,7 +143,7 @@ class SubscriptionManager:
|
|
|
this group, instance pair. This includes wildcard subscriptions."""
|
|
|
target = (group, instance)
|
|
|
partone = self.find_sub(group, instance)
|
|
|
- parttwo = self.find_sub(group, "*")
|
|
|
+ parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
|
|
|
return list(set(partone + parttwo))
|
|
|
|
|
|
class MsgQ:
|
|
@@ -429,19 +429,19 @@ class MsgQ:
|
|
|
"""Process a single command. This will split out into one of the
|
|
|
other functions."""
|
|
|
logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
|
|
|
- cmd = routing["type"]
|
|
|
- if cmd == 'send':
|
|
|
+ cmd = routing[CC_HEADER_TYPE]
|
|
|
+ if cmd == CC_COMMAND_SEND:
|
|
|
self.process_command_send(sock, routing, data)
|
|
|
- elif cmd == 'subscribe':
|
|
|
+ elif cmd == CC_COMMAND_SUBSCRIBE:
|
|
|
self.process_command_subscribe(sock, routing, data)
|
|
|
- elif cmd == 'unsubscribe':
|
|
|
+ elif cmd == CC_COMMAND_UNSUBSCRIBE:
|
|
|
self.process_command_unsubscribe(sock, routing, data)
|
|
|
- elif cmd == 'getlname':
|
|
|
+ elif cmd == CC_COMMAND_GET_LNAME:
|
|
|
self.process_command_getlname(sock, routing, data)
|
|
|
- elif cmd == 'ping':
|
|
|
+ elif cmd == CC_COMMAND_PING:
|
|
|
# Command for testing purposes
|
|
|
self.process_command_ping(sock, routing, data)
|
|
|
- elif cmd == 'stop':
|
|
|
+ elif cmd == CC_COMMAND_STOP:
|
|
|
self.stop()
|
|
|
else:
|
|
|
logger.error(MSGQ_INVALID_CMD, cmd)
|
|
@@ -570,11 +570,12 @@ class MsgQ:
|
|
|
return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
|
|
|
|
|
|
def process_command_ping(self, sock, routing, data):
|
|
|
- self.sendmsg(sock, { "type" : "pong" }, data)
|
|
|
+ self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
|
|
|
|
|
|
def process_command_getlname(self, sock, routing, data):
|
|
|
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
|
- self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
|
|
|
+ self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
|
|
|
+ { CC_PAYLOAD_LNAME : lname })
|
|
|
|
|
|
def process_command_send(self, sock, routing, data):
|
|
|
group = routing[CC_HEADER_GROUP]
|
|
@@ -638,15 +639,15 @@ class MsgQ:
|
|
|
self.send_prepared_msg(sock, errmsg)
|
|
|
|
|
|
def process_command_subscribe(self, sock, routing, data):
|
|
|
- group = routing["group"]
|
|
|
- instance = routing["instance"]
|
|
|
+ group = routing[CC_HEADER_GROUP]
|
|
|
+ instance = routing[CC_HEADER_INSTANCE]
|
|
|
if group == None or instance == None:
|
|
|
return # ignore invalid packets entirely
|
|
|
self.subs.subscribe(group, instance, sock)
|
|
|
|
|
|
def process_command_unsubscribe(self, sock, routing, data):
|
|
|
- group = routing["group"]
|
|
|
- instance = routing["instance"]
|
|
|
+ group = routing[CC_HEADER_GROUP]
|
|
|
+ instance = routing[CC_HEADER_INSTANCE]
|
|
|
if group == None or instance == None:
|
|
|
return # ignore invalid packets entirely
|
|
|
self.subs.unsubscribe(group, instance, sock)
|