123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # coding: utf-8
- from __future__ import unicode_literals, absolute_import
- import re
- import textwrap
- import logging
- log = logging.getLogger(__name__)
- class LineBuffer(object):
- r"""
- Buffer bytes read in from a connection and serve complete lines back.
- >>> b = LineBuffer()
- >>> len(b)
- 0
- >>> b.feed(b'foo\nbar')
- >>> len(b)
- 7
- >>> list(b.lines()) == [b'foo']
- True
- >>> len(b)
- 3
- >>> b.feed(b'bar\r\nbaz\n')
- >>> list(b.lines()) == [b'barbar', b'baz']
- True
- >>> len(b)
- 0
- The buffer will not perform any decoding.
- >>> b.feed(b'Ol\xe9\n')
- >>> list(b.lines()) == [b'Ol\xe9']
- True
- The LineBuffer should also act as an iterable.
- >>> b.feed(b'iterate\nthis\n')
- >>> for line, expected in zip(b, [b'iterate', b'this']):
- ... assert line == expected
- """
- line_sep_exp = re.compile(b'\r?\n')
- def __init__(self):
- self.buffer = b''
- def feed(self, bytes):
- self.buffer += bytes
- def lines(self):
- lines = self.line_sep_exp.split(self.buffer)
- # save the last, unfinished, possibly empty line
- self.buffer = lines.pop()
- return iter(lines)
- def __iter__(self):
- return self.lines()
- def __len__(self):
- return len(self.buffer)
- class DecodingLineBuffer(LineBuffer):
- r"""
- Like LineBuffer, but decode the output (default assumes UTF-8).
- >>> utf8_word = b'Ol\xc3\xa9'
- >>> b = DecodingLineBuffer()
- >>> b.feed(b'bar\r\nbaz\n' + utf8_word + b'\n')
- >>> list(b.lines())
- ['bar', 'baz', 'Ol\xe9']
- >>> len(b)
- 0
- Some clients will feed latin-1 or other encodings. If your client should
- support docoding from these clients (and not raise a UnicodeDecodeError),
- set errors='replace':
- >>> b = DecodingLineBuffer()
- >>> b.errors = 'replace'
- >>> b.feed(b'Ol\xe9\n')
- >>> list(b.lines()) == ['Ol\ufffd']
- True
- >>> b = DecodingLineBuffer()
- >>> b.feed(b'Ol\xe9\n')
- >>> list(b.lines())
- Traceback (most recent call last):
- ...
- UnicodeDecodeError: ...
- """
- encoding = 'utf-8'
- errors = 'strict'
- def lines(self):
- for line in super(DecodingLineBuffer, self).lines():
- try:
- yield line.decode(self.encoding, self.errors)
- except UnicodeDecodeError:
- self.handle_exception()
- def handle_exception(self):
- msg = textwrap.dedent("""
- Unknown encoding encountered. See 'Decoding Input'
- in https://pypi.python.org/pypi/irc for details.
- """)
- log.warning(msg)
- raise
- class LenientDecodingLineBuffer(LineBuffer):
- r"""
- Like LineBuffer, but decode the output. First try UTF-8 and if that
- fails, use latin-1, which decodes all byte strings.
- >>> b = LenientDecodingLineBuffer()
- >>> utf8_word = b'Ol\xc3\xa9'
- >>> b.feed(utf8_word + b'\n')
- >>> b.feed(b'Ol\xe9\n')
- >>> list(b.lines())
- ['Ol\xe9', 'Ol\xe9']
- """
- def lines(self):
- for line in super(LenientDecodingLineBuffer, self).lines():
- try:
- yield line.decode('utf-8', 'strict')
- except UnicodeDecodeError:
- yield line.decode('latin-1')
|