manager.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from IPy import IP
  2. from django.core.exceptions import ValidationError
  3. from django.db import models, connection
  4. from django.db.models import sql, query
  5. NET_TERMS = {
  6. 'inet_lt': '<',
  7. 'inet_lte': '<=',
  8. 'inet_exact': '=',
  9. 'inet_gte': '>=',
  10. 'inet_gt': '>',
  11. 'inet_not': '<>',
  12. 'inet_is_contained': '<<',
  13. 'inet_is_contained_or_equal': '<<=',
  14. 'inet_contains': '>>',
  15. 'inet_contains': '>>=',
  16. }
  17. class NetQuery(sql.Query):
  18. query_terms = sql.Query.query_terms.copy()
  19. query_terms.update(NET_TERMS)
  20. def add_filter(self, (filter_string, value), *args, **kwargs):
  21. if isinstance(value, IP):
  22. value = unicode(value)
  23. return super(NetQuery, self).add_filter((filter_string, value), *args, **kwargs)
  24. class NetWhere(sql.where.WhereNode):
  25. def make_atom(self, child, qn):
  26. table_alias, name, db_type, lookup_type, value_annot, params = child
  27. if db_type in ['cidr', 'inet'] and lookup_type in NET_TERMS:
  28. return ('%s.%s %s inet %%s' % (table_alias, name, NET_TERMS[lookup_type]), params)
  29. return super(NetWhere, self).make_atom(child, qn)
  30. class NetManger(models.Manager):
  31. def get_query_set(self):
  32. q = NetQuery(self.model, connection, NetWhere)
  33. return query.QuerySet(self.model, q)
  34. class _NetAddressField(models.Field):
  35. def to_python(self, value):
  36. if not value:
  37. return None
  38. try:
  39. return IP(value)
  40. except ValueError, e:
  41. raise ValidationError(e)
  42. def get_db_prep_value(self, value):
  43. return unicode(self.to_python(value))
  44. def get_db_prep_lookup(self, lookup_type, value):
  45. value = unicode(value)
  46. if lookup_type in INET_TERMS:
  47. return [value]
  48. return super(_NetAddressField, self).get_db_prep_lookup(lookup_type, value)
  49. class InetAddressField(_NetAddressField):
  50. description = "Postgresql inet field"
  51. __metaclass__ = models.SubfieldBase
  52. def db_type(self):
  53. return 'inet'
  54. class CidrAddressField(_NetAddressField):
  55. description = "Postgresql cidr field"
  56. __metaclass__ = models.SubfieldBase
  57. def db_type(self):
  58. return 'cidr'
  59. class MACAddressField(models.Field):
  60. description = "Postgresql macaddr field"
  61. def __init__(self, *args, **kwargs):
  62. kwargs['max_length'] = 17
  63. super(MACAddressField, self).__init__(*args, **kwargs)
  64. def db_type(self):
  65. return 'macaddr'
  66. class Foo(models.Model):
  67. inet = InetAddressField()
  68. test = CidrAddressField()
  69. mac = MACAddressField()
  70. objects = NetManger()