manager.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. from IPy import IP
  2. from django.core.exceptions import ValidationError
  3. from django.utils.translation import ugettext_lazy
  4. from django.db import models, connection
  5. from django.db.models import sql, query
  6. # FIXME decide if we should use custom lookup names instead of overrides.
  7. # FIXME decide if other "standard" lookups should be disabled
  8. # FIXME test "standard" lookups
  9. # FIXME decide if HOST() cast should be ignored (done by backend)
  10. NET_TERMS = {
  11. 'lt': '<',
  12. 'lte': '<=',
  13. 'exact': '=',
  14. 'gte': '>=',
  15. 'gt': '>',
  16. 'contained': '<<',
  17. 'contained_or_equal': '<<=',
  18. 'contains': '>>',
  19. 'contains_or_equals': '>>=',
  20. }
  21. NET_TERMS_SPECIAL = {
  22. 'in': None,
  23. 'range': None,
  24. }
  25. # FIXME rethink caps with respect to IPV6, all should be insensitive...
  26. NET_MAPPING = {
  27. 'iexact': 'exact',
  28. 'icontains': 'contains',
  29. 'istartswith': 'startswith',
  30. 'iendswith': 'endswith',
  31. 'iregex': 'regex',
  32. }
  33. class NetQuery(sql.Query):
  34. query_terms = sql.Query.query_terms.copy()
  35. query_terms.update(NET_TERMS)
  36. def add_filter(self, (filter_string, value), *args, **kwargs):
  37. # IP(...) == '' fails so make sure to force to string while we can
  38. if isinstance(value, IP):
  39. value = unicode(value)
  40. return super(NetQuery, self).add_filter(
  41. (filter_string, value), *args, **kwargs)
  42. class NetWhere(sql.where.WhereNode):
  43. def make_atom(self, child, qn):
  44. table_alias, name, db_type, lookup_type, value_annot, params = child
  45. field_sql = '%s.%s' % (qn(table_alias), qn(name))
  46. if lookup_type in NET_MAPPING:
  47. return self.make_atom((table_alias, name, db_type,
  48. NET_MAPPING[lookup_type], value_annot, params), qn)
  49. if db_type in ['cidr', 'inet'] and lookup_type in NET_TERMS:
  50. lookup = '%s %s %%s' % (field_sql, NET_TERMS[lookup_type])
  51. return (lookup, params)
  52. elif db_type in ['cidr', 'inet'] and lookup_type in NET_TERMS_SPECIAL:
  53. if lookup_type == 'in':
  54. return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))), params)
  55. if lookup_type == 'range':
  56. return ('%s BETWEEN %%s and %%s' % (field_sql), params)
  57. return super(NetWhere, self).make_atom(child, qn)
  58. class NetManger(models.Manager):
  59. use_for_related_fields = True
  60. def get_query_set(self):
  61. q = NetQuery(self.model, connection, NetWhere)
  62. return query.QuerySet(self.model, q)
  63. # FIXME formfields etc?
  64. # - regexp field for mac
  65. # - IP try catch for ip and cidr
  66. class _NetAddressField(models.Field):
  67. # FIXME init empty object
  68. # FIXME null and blank handling needs to be done right.
  69. def to_python(self, value):
  70. if value is None:
  71. return value
  72. try:
  73. return IP(value)
  74. except ValueError, e:
  75. raise ValidationError(e)
  76. def get_db_prep_value(self, value):
  77. # FIXME does this need to respect null and blank?
  78. # FIXME does not handle __in
  79. if value is None:
  80. return value
  81. return unicode(self.to_python(value))
  82. def get_db_prep_lookup(self, lookup_type, value):
  83. if value is None:
  84. return value
  85. if lookup_type in ['year', 'month', 'day']:
  86. raise ValueError('Invalid lookup type "%s"' % lookup_type)
  87. if lookup_type in NET_MAPPING:
  88. return self.get_db_prep_lookup(
  89. NET_MAPPING[lookup_type], value)
  90. if lookup_type in NET_TERMS:
  91. return [self.get_db_prep_value(value)]
  92. return super(_NetAddressField, self).get_db_prep_lookup(
  93. lookup_type, value)
  94. class InetAddressField(_NetAddressField):
  95. description = "PostgreSQL INET field"
  96. __metaclass__ = models.SubfieldBase
  97. def db_type(self):
  98. return 'inet'
  99. class CidrAddressField(_NetAddressField):
  100. description = "PostgreSQL CIDR field"
  101. __metaclass__ = models.SubfieldBase
  102. def db_type(self):
  103. return 'cidr'
  104. class MACAddressField(models.Field):
  105. # FIXME does this need proper validation?
  106. description = "PostgreSQL MACADDR field"
  107. def __init__(self, *args, **kwargs):
  108. kwargs['max_length'] = 17
  109. super(MACAddressField, self).__init__(*args, **kwargs)
  110. def db_type(self):
  111. return 'macaddr'
  112. class InetTestModel(models.Model):
  113. '''
  114. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  115. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  116. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  117. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  118. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  119. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  120. >>> InetTestModel.objects.filter(inet__contains='10.0.0.1').query.as_sql()
  121. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >> %s', (u'10.0.0.1',))
  122. >>> InetTestModel.objects.filter(inet__icontains='10.0.0.1').query.as_sql()
  123. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >> %s', (u'10.0.0.1',))
  124. >>> InetTestModel.objects.filter(inet__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  125. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  126. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  127. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" > %s', (u'10.0.0.1',))
  128. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  129. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >= %s', (u'10.0.0.1',))
  130. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  131. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" < %s', (u'10.0.0.1',))
  132. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  133. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <= %s', (u'10.0.0.1',))
  134. >>> InetTestModel.objects.filter(inet__startswith='10.').query.as_sql()
  135. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'10.%',))
  136. >>> InetTestModel.objects.filter(inet__istartswith='10.').query.as_sql()
  137. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'10.%',))
  138. >>> InetTestModel.objects.filter(inet__endswith='.1').query.as_sql()
  139. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'%.1',))
  140. >>> InetTestModel.objects.filter(inet__iendswith='.1').query.as_sql()
  141. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'%.1',))
  142. >>> InetTestModel.objects.filter(inet__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  143. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  144. >>> InetTestModel.objects.filter(inet__year=1).query.as_sql()
  145. Traceback (most recent call last):
  146. ...
  147. ValueError: Invalid lookup type "year"
  148. >>> InetTestModel.objects.filter(inet__month=1).query.as_sql()
  149. Traceback (most recent call last):
  150. ...
  151. ValueError: Invalid lookup type "month"
  152. >>> InetTestModel.objects.filter(inet__day=1).query.as_sql()
  153. Traceback (most recent call last):
  154. ...
  155. ValueError: Invalid lookup type "day"
  156. >>> InetTestModel.objects.filter(inet__isnull=True).query.as_sql()
  157. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet) IS NULL', ())
  158. >>> InetTestModel.objects.filter(inet__isnull=False).query.as_sql()
  159. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IS NOT NULL', ())
  160. >>> InetTestModel.objects.filter(inet__search='10').query.as_sql()
  161. Traceback (most recent call last):
  162. ...
  163. NotImplementedError: Full-text search is not implemented for this database backend
  164. >>> InetTestModel.objects.filter(inet__regex='10').query.as_sql()
  165. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~ %s ', (u'10',))
  166. >>> InetTestModel.objects.filter(inet__iregex='10').query.as_sql()
  167. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~ %s ', (u'10',))
  168. >>> InetTestModel.objects.filter(inet__contains_or_equals='10.0.0.1').query.as_sql()
  169. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >>= %s', (u'10.0.0.1',))
  170. >>> InetTestModel.objects.filter(inet__contained='10.0.0.1').query.as_sql()
  171. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" << %s', (u'10.0.0.1',))
  172. >>> InetTestModel.objects.filter(inet__contained_or_equal='10.0.0.1').query.as_sql()
  173. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <<= %s', (u'10.0.0.1',))
  174. '''
  175. inet = InetAddressField()
  176. objects = NetManger()
  177. class CidrTestModel(models.Model):
  178. '''
  179. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  180. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" = %s', (u'10.0.0.1',))
  181. '''
  182. cidr = CidrAddressField()
  183. objects = NetManger()
  184. class MACTestModel(models.Model):
  185. mac = MACAddressField(null=True)
  186. objects = NetManger()