manager.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from IPy import IP
  2. from django.core.exceptions import ValidationError
  3. from django.utils.translation import ugettext_lazy
  4. from django.db import models, connection
  5. from django.db.models import sql, query
  6. # FIXME decide if we should use custom lookup names instead of overrides.
  7. # FIXME decide if other "standard" lookups should be disabled
  8. # FIXME test "standard" lookups
  9. # FIXME decide if HOST() cast should be ignored (done by backend)
  10. NET_TERMS = {
  11. 'lt': '<',
  12. 'lte': '<=',
  13. 'exact': '=',
  14. 'gte': '>=',
  15. 'gt': '>',
  16. 'contained': '<<',
  17. 'contained_or_equal': '<<=',
  18. 'contains': '>>',
  19. 'contains_or_equals': '>>=',
  20. }
  21. NET_MAPPING = {
  22. 'iexact': 'exact',
  23. 'icontains': 'contains',
  24. 'istartswith': 'startswith',
  25. 'iendswith': 'endswith',
  26. 'iregex': 'regexp',
  27. }
  28. class NetQuery(sql.Query):
  29. query_terms = sql.Query.query_terms.copy()
  30. query_terms.update(NET_TERMS)
  31. def add_filter(self, (filter_string, value), *args, **kwargs):
  32. # IP(...) == '' fails so make sure to force to string while we can
  33. if isinstance(value, IP):
  34. value = unicode(value)
  35. return super(NetQuery, self).add_filter(
  36. (filter_string, value), *args, **kwargs)
  37. class NetWhere(sql.where.WhereNode):
  38. def make_atom(self, child, qn):
  39. table_alias, name, db_type, lookup_type, value_annot, params = child
  40. if lookup_type in NET_MAPPING:
  41. return self.make_atom((table_alias, name, db_type,
  42. NET_MAPPING[lookup_type], value_annot, params), qn)
  43. if db_type in ['cidr', 'inet'] and lookup_type in NET_TERMS:
  44. lookup = '%s.%s %s %%s' % (table_alias, name, NET_TERMS[lookup_type])
  45. return (lookup, params)
  46. return super(NetWhere, self).make_atom(child, qn)
  47. class NetManger(models.Manager):
  48. use_for_related_fields = True
  49. def get_query_set(self):
  50. q = NetQuery(self.model, connection, NetWhere)
  51. return query.QuerySet(self.model, q)
  52. # FIXME formfields etc?
  53. # - regexp field for mac
  54. # - IP try catch for ip and cidr
  55. class _NetAddressField(models.Field):
  56. # FIXME init empty object
  57. # FIXME null and blank handling needs to be done right.
  58. def to_python(self, value):
  59. if value is None:
  60. return value
  61. try:
  62. return IP(value)
  63. except ValueError, e:
  64. raise ValidationError(e)
  65. def get_db_prep_value(self, value):
  66. # FIXME does this need to respect null and blank?
  67. # FIXME does not handle __in
  68. if value is None:
  69. return value
  70. return unicode(self.to_python(value))
  71. def get_db_prep_lookup(self, lookup_type, value):
  72. if value is None:
  73. return value
  74. if lookup_type in NET_MAPPING:
  75. return self.get_db_prep_lookup(
  76. NET_MAPPING[lookup_type], value)
  77. if lookup_type in NET_TERMS:
  78. return [unicode(value)]
  79. return super(_NetAddressField, self).get_db_prep_lookup(
  80. lookup_type, unicode(value))
  81. class InetAddressField(_NetAddressField):
  82. description = "PostgreSQL INET field"
  83. __metaclass__ = models.SubfieldBase
  84. def db_type(self):
  85. return 'inet'
  86. class CidrAddressField(_NetAddressField):
  87. description = "PostgreSQL CIDR field"
  88. __metaclass__ = models.SubfieldBase
  89. def db_type(self):
  90. return 'cidr'
  91. class MACAddressField(models.Field):
  92. # FIXME does this need proper validation?
  93. description = "PostgreSQL MACADDR field"
  94. def __init__(self, *args, **kwargs):
  95. kwargs['max_length'] = 17
  96. super(MACAddressField, self).__init__(*args, **kwargs)
  97. def db_type(self):
  98. return 'macaddr'
  99. class InetTestModel(models.Model):
  100. '''
  101. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  102. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet = %s', (u'10.0.0.1',))
  103. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  104. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet = %s', (u'10.0.0.1',))
  105. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  106. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet = %s', (u'10.0.0.1',))
  107. >>> InetTestModel.objects.filter(inet__contains='10.0.0.1').query.as_sql()
  108. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet >> %s', (u'10.0.0.1',))
  109. >>> InetTestModel.objects.filter(inet__icontains='10.0.0.1').query.as_sql()
  110. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet >> %s', (u'10.0.0.1',))
  111. in
  112. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  113. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet > %s', (u'10.0.0.1',))
  114. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  115. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet >= %s', (u'10.0.0.1',))
  116. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  117. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet < %s', (u'10.0.0.1',))
  118. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  119. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet <= %s', (u'10.0.0.1',))
  120. startswith
  121. istartswith
  122. endswith
  123. iendswith
  124. range
  125. year
  126. month
  127. day
  128. isnull
  129. search
  130. regex
  131. iregex
  132. >>> InetTestModel.objects.filter(inet__contains_or_equals='10.0.0.1').query.as_sql()
  133. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet >>= %s', (u'10.0.0.1',))
  134. >>> InetTestModel.objects.filter(inet__contained='10.0.0.1').query.as_sql()
  135. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet << %s', (u'10.0.0.1',))
  136. >>> InetTestModel.objects.filter(inet__contained_or_equal='10.0.0.1').query.as_sql()
  137. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE foo_inettestmodel.inet <<= %s', (u'10.0.0.1',))
  138. '''
  139. inet = InetAddressField()
  140. objects = NetManger()
  141. class CidrTestModel(models.Model):
  142. '''
  143. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  144. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE foo_cidrtestmodel.cidr = %s', (u'10.0.0.1',))
  145. '''
  146. cidr = CidrAddressField()
  147. objects = NetManger()
  148. class MACTestModel(models.Model):
  149. mac = MACAddressField(null=True)
  150. objects = NetManger()