buffer.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # coding: utf-8
  2. from __future__ import unicode_literals, absolute_import
  3. import re
  4. import textwrap
  5. import logging
  6. log = logging.getLogger(__name__)
  7. class LineBuffer(object):
  8. r"""
  9. Buffer bytes read in from a connection and serve complete lines back.
  10. >>> b = LineBuffer()
  11. >>> len(b)
  12. 0
  13. >>> b.feed(b'foo\nbar')
  14. >>> len(b)
  15. 7
  16. >>> list(b.lines()) == [b'foo']
  17. True
  18. >>> len(b)
  19. 3
  20. >>> b.feed(b'bar\r\nbaz\n')
  21. >>> list(b.lines()) == [b'barbar', b'baz']
  22. True
  23. >>> len(b)
  24. 0
  25. The buffer will not perform any decoding.
  26. >>> b.feed(b'Ol\xe9\n')
  27. >>> list(b.lines()) == [b'Ol\xe9']
  28. True
  29. The LineBuffer should also act as an iterable.
  30. >>> b.feed(b'iterate\nthis\n')
  31. >>> for line, expected in zip(b, [b'iterate', b'this']):
  32. ... assert line == expected
  33. """
  34. line_sep_exp = re.compile(b'\r?\n')
  35. def __init__(self):
  36. self.buffer = b''
  37. def feed(self, bytes):
  38. self.buffer += bytes
  39. def lines(self):
  40. lines = self.line_sep_exp.split(self.buffer)
  41. # save the last, unfinished, possibly empty line
  42. self.buffer = lines.pop()
  43. return iter(lines)
  44. def __iter__(self):
  45. return self.lines()
  46. def __len__(self):
  47. return len(self.buffer)
  48. class DecodingLineBuffer(LineBuffer):
  49. r"""
  50. Like LineBuffer, but decode the output (default assumes UTF-8).
  51. >>> utf8_word = b'Ol\xc3\xa9'
  52. >>> b = DecodingLineBuffer()
  53. >>> b.feed(b'bar\r\nbaz\n' + utf8_word + b'\n')
  54. >>> list(b.lines())
  55. ['bar', 'baz', 'Ol\xe9']
  56. >>> len(b)
  57. 0
  58. Some clients will feed latin-1 or other encodings. If your client should
  59. support docoding from these clients (and not raise a UnicodeDecodeError),
  60. set errors='replace':
  61. >>> b = DecodingLineBuffer()
  62. >>> b.errors = 'replace'
  63. >>> b.feed(b'Ol\xe9\n')
  64. >>> list(b.lines()) == ['Ol\ufffd']
  65. True
  66. >>> b = DecodingLineBuffer()
  67. >>> b.feed(b'Ol\xe9\n')
  68. >>> list(b.lines())
  69. Traceback (most recent call last):
  70. ...
  71. UnicodeDecodeError: ...
  72. """
  73. encoding = 'utf-8'
  74. errors = 'strict'
  75. def lines(self):
  76. for line in super(DecodingLineBuffer, self).lines():
  77. try:
  78. yield line.decode(self.encoding, self.errors)
  79. except UnicodeDecodeError:
  80. self.handle_exception()
  81. def handle_exception(self):
  82. msg = textwrap.dedent("""
  83. Unknown encoding encountered. See 'Decoding Input'
  84. in https://pypi.python.org/pypi/irc for details.
  85. """)
  86. log.warning(msg)
  87. raise
  88. class LenientDecodingLineBuffer(LineBuffer):
  89. r"""
  90. Like LineBuffer, but decode the output. First try UTF-8 and if that
  91. fails, use latin-1, which decodes all byte strings.
  92. >>> b = LenientDecodingLineBuffer()
  93. >>> utf8_word = b'Ol\xc3\xa9'
  94. >>> b.feed(utf8_word + b'\n')
  95. >>> b.feed(b'Ol\xe9\n')
  96. >>> list(b.lines())
  97. ['Ol\xe9', 'Ol\xe9']
  98. """
  99. def lines(self):
  100. for line in super(LenientDecodingLineBuffer, self).lines():
  101. try:
  102. yield line.decode('utf-8', 'strict')
  103. except UnicodeDecodeError:
  104. yield line.decode('latin-1')