session.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # Copyright (C) 2010,2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. # This module is a mock-up class of isc.cc.session
  16. import sys
  17. import socket
  18. # set a dummy lname
  19. _TEST_LNAME = '123abc@xxxx'
  20. class Queue():
  21. def __init__(self, msg=None, env={}):
  22. self.msg = msg
  23. self.env = env
  24. def dump(self):
  25. return { 'msg': self.msg, 'env': self.env }
  26. class SessionError(Exception):
  27. pass
  28. class SessionTimeout(Exception):
  29. pass
  30. class Session:
  31. def __init__(self, socket_file=None, verbose=False):
  32. self._lname = _TEST_LNAME
  33. self.message_queue = []
  34. self.old_message_queue = []
  35. try:
  36. self._socket = socket.socket()
  37. except socket.error as se:
  38. raise SessionError(se)
  39. self.verbose = verbose
  40. @property
  41. def lname(self):
  42. return self._lname
  43. def close(self):
  44. self._socket.close()
  45. def _clear_queues(self):
  46. while len(self.message_queue) > 0:
  47. self.dequeue()
  48. def _next_sequence(self, que=None):
  49. return len(self.message_queue)
  50. def enqueue(self, msg=None, env={}):
  51. if self._socket._closed:
  52. raise SessionError("Session has been closed.")
  53. seq = self._next_sequence()
  54. env.update({"seq": 0}) # fixed here
  55. que = Queue(msg=msg, env=env)
  56. self.message_queue.append(que)
  57. if self.verbose:
  58. sys.stdout.write("[Session] enqueue: " + str(que.dump()) + "\n")
  59. return seq
  60. def dequeue(self):
  61. if self._socket._closed:
  62. raise SessionError("Session has been closed.")
  63. que = None
  64. try:
  65. que = self.message_queue.pop(0) # always pop at index 0
  66. self.old_message_queue.append(que)
  67. except IndexError:
  68. que = Queue()
  69. if self.verbose:
  70. sys.stdout.write("[Session] dequeue: " + str(que.dump()) + "\n")
  71. return que
  72. def get_queue(self, seq=None):
  73. if self._socket._closed:
  74. raise SessionError("Session has been closed.")
  75. if seq is None:
  76. seq = len(self.message_queue) - 1
  77. que = None
  78. try:
  79. que = self.message_queue[seq]
  80. except IndexError:
  81. raise IndexError
  82. que = Queue()
  83. if self.verbose:
  84. sys.stdout.write("[Session] get_queue: " + str(que.dump()) + "\n")
  85. return que
  86. def group_sendmsg(self, msg, group, instance="*", to="*"):
  87. return self.enqueue(msg=msg, env={
  88. "type": "send",
  89. "from": self._lname,
  90. "to": to,
  91. "group": group,
  92. "instance": instance })
  93. def group_recvmsg(self, nonblock=True, seq=0):
  94. que = self.dequeue()
  95. return que.msg, que.env
  96. def group_reply(self, routing, msg):
  97. return self.enqueue(msg=msg, env={
  98. "type": "send",
  99. "from": self._lname,
  100. "to": routing["from"],
  101. "group": routing["group"],
  102. "instance": routing["instance"],
  103. "reply": routing["seq"] })
  104. def get_message(self, group, to='*'):
  105. if self._socket._closed:
  106. raise SessionError("Session has been closed.")
  107. que = Queue()
  108. for q in self.message_queue:
  109. if q.env['group'] == group:
  110. self.message_queue.remove(q)
  111. self.old_message_queue.append(q)
  112. que = q
  113. if self.verbose:
  114. sys.stdout.write("[Session] get_message: " + str(que.dump()) + "\n")
  115. return q.msg
  116. def group_subscribe(self, group, instance = "*"):
  117. if self._socket._closed:
  118. raise SessionError("Session has been closed.")
  119. def group_unsubscribe(self, group, instance = "*"):
  120. if self._socket._closed:
  121. raise SessionError("Session has been closed.")