manager.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from IPy import IP
  2. from django.core.exceptions import ValidationError
  3. from django.utils.translation import ugettext_lazy
  4. from django.db import models, connection
  5. from django.db.models import sql, query
  6. NET_TERMS = {
  7. 'lt': '%s < %%s',
  8. 'lte': '%s <= %%s',
  9. 'exact': '%s = %%s',
  10. 'gte': '%s >= %%s',
  11. 'gt': '%s > %%s',
  12. 'net_contained': '%s << %%s',
  13. 'net_contained_or_equal': '%s <<= %%s',
  14. 'net_contains': '%s >> %%s',
  15. 'net_contains_or_equals': '%s >>= %%s',
  16. }
  17. NET_TERMS_MAPPING = {
  18. 'iexact': 'exact',
  19. 'icontains': 'contains',
  20. 'istartswith': 'startswith',
  21. 'iendswith': 'endswith',
  22. 'iregex': 'regex',
  23. }
  24. class NetQuery(sql.Query):
  25. query_terms = sql.Query.query_terms.copy()
  26. query_terms.update(NET_TERMS)
  27. def add_filter(self, (filter_string, value), *args, **kwargs):
  28. # IP(...) == '' fails so make sure to force to string while we can
  29. if isinstance(value, IP):
  30. value = unicode(value)
  31. return super(NetQuery, self).add_filter(
  32. (filter_string, value), *args, **kwargs)
  33. class NetWhere(sql.where.WhereNode):
  34. def make_atom(self, child, qn):
  35. table_alias, name, db_type, lookup_type, value_annot, params = child
  36. field_sql = '%s.%s' % (qn(table_alias), qn(name))
  37. if db_type not in ['inet', 'cidr']:
  38. return super(NetWhere, self).make_atom(child, qn)
  39. elif lookup_type in NET_TERMS_MAPPING:
  40. lookup_type = NET_TERMS_MAPPING[lookup_type]
  41. child = (table_alias, name, db_type, lookup_type, value_annot, params)
  42. return self.make_atom(child, qn)
  43. elif lookup_type in NET_TERMS:
  44. return (NET_TERMS[lookup_type] % field_sql, params)
  45. elif lookup_type == 'in':
  46. return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))), params)
  47. elif lookup_type == 'range':
  48. return ('%s BETWEEN %%s and %%s' % (field_sql), params)
  49. elif lookup_type == 'isnull':
  50. return ('%s IS %sNULL' % (field_sql, (not value_annot and 'NOT ' or '')), params)
  51. else:
  52. return super(NetWhere, self).make_atom(child, qn)
  53. class NetManger(models.Manager):
  54. use_for_related_fields = True
  55. def get_query_set(self):
  56. q = NetQuery(self.model, connection, NetWhere)
  57. return query.QuerySet(self.model, q)
  58. # FIXME formfields etc?
  59. # - regexp field for mac
  60. # - IP try catch for ip and cidr
  61. class _NetAddressField(models.Field):
  62. # FIXME init empty object
  63. # FIXME null and blank handling needs to be done right.
  64. def to_python(self, value):
  65. if value is None:
  66. return value
  67. try:
  68. return IP(value)
  69. except ValueError, e:
  70. raise ValidationError(e)
  71. def get_db_prep_value(self, value):
  72. # FIXME does this need to respect null and blank?
  73. # FIXME does not handle __in
  74. if value is None:
  75. return value
  76. return unicode(self.to_python(value))
  77. def get_db_prep_lookup(self, lookup_type, value):
  78. if value is None:
  79. return value
  80. if lookup_type in ['year', 'month', 'day']:
  81. raise ValueError('Invalid lookup type "%s"' % lookup_type)
  82. if lookup_type in NET_TERMS_MAPPING:
  83. return self.get_db_prep_lookup(
  84. NET_TERMS_MAPPING[lookup_type], value)
  85. if lookup_type in NET_TERMS:
  86. return [self.get_db_prep_value(value)]
  87. return super(_NetAddressField, self).get_db_prep_lookup(
  88. lookup_type, value)
  89. class InetAddressField(_NetAddressField):
  90. description = "PostgreSQL INET field"
  91. __metaclass__ = models.SubfieldBase
  92. def db_type(self):
  93. return 'inet'
  94. class CidrAddressField(_NetAddressField):
  95. description = "PostgreSQL CIDR field"
  96. __metaclass__ = models.SubfieldBase
  97. def db_type(self):
  98. return 'cidr'
  99. class MACAddressField(models.Field):
  100. # FIXME does this need proper validation?
  101. description = "PostgreSQL MACADDR field"
  102. def __init__(self, *args, **kwargs):
  103. kwargs['max_length'] = 17
  104. super(MACAddressField, self).__init__(*args, **kwargs)
  105. def db_type(self):
  106. return 'macaddr'
  107. class InetTestModel(models.Model):
  108. '''
  109. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  110. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  111. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  112. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  113. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  114. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s', (u'10.0.0.1',))
  115. >>> InetTestModel.objects.filter(inet__net_contains='10.0.0.1').query.as_sql()
  116. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >> %s', (u'10.0.0.1',))
  117. >>> InetTestModel.objects.filter(inet__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  118. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  119. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  120. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" > %s', (u'10.0.0.1',))
  121. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  122. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >= %s', (u'10.0.0.1',))
  123. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  124. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" < %s', (u'10.0.0.1',))
  125. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  126. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <= %s', (u'10.0.0.1',))
  127. >>> InetTestModel.objects.filter(inet__startswith='10.').query.as_sql()
  128. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'10.%',))
  129. >>> InetTestModel.objects.filter(inet__istartswith='10.').query.as_sql()
  130. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'10.%',))
  131. >>> InetTestModel.objects.filter(inet__endswith='.1').query.as_sql()
  132. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'%.1',))
  133. >>> InetTestModel.objects.filter(inet__iendswith='.1').query.as_sql()
  134. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet")::text LIKE %s ', (u'%.1',))
  135. >>> InetTestModel.objects.filter(inet__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  136. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  137. >>> InetTestModel.objects.filter(inet__year=1).query.as_sql()
  138. Traceback (most recent call last):
  139. ...
  140. ValueError: Invalid lookup type "year"
  141. >>> InetTestModel.objects.filter(inet__month=1).query.as_sql()
  142. Traceback (most recent call last):
  143. ...
  144. ValueError: Invalid lookup type "month"
  145. >>> InetTestModel.objects.filter(inet__day=1).query.as_sql()
  146. Traceback (most recent call last):
  147. ...
  148. ValueError: Invalid lookup type "day"
  149. >>> InetTestModel.objects.filter(inet__isnull=True).query.as_sql()
  150. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IS NULL', ())
  151. >>> InetTestModel.objects.filter(inet__isnull=False).query.as_sql()
  152. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IS NOT NULL', ())
  153. >>> InetTestModel.objects.filter(inet__search='10').query.as_sql()
  154. Traceback (most recent call last):
  155. ...
  156. NotImplementedError: Full-text search is not implemented for this database backend
  157. >>> InetTestModel.objects.filter(inet__regex=u'10').query.as_sql()
  158. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~ %s ', (u'10',))
  159. >>> InetTestModel.objects.filter(inet__iregex=u'10').query.as_sql()
  160. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~ %s ', (u'10',))
  161. >>> InetTestModel.objects.filter(inet__net_contains_or_equals='10.0.0.1').query.as_sql()
  162. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >>= %s', (u'10.0.0.1',))
  163. >>> InetTestModel.objects.filter(inet__net_contained='10.0.0.1').query.as_sql()
  164. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" << %s', (u'10.0.0.1',))
  165. >>> InetTestModel.objects.filter(inet__net_contained_or_equal='10.0.0.1').query.as_sql()
  166. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <<= %s', (u'10.0.0.1',))
  167. '''
  168. inet = InetAddressField()
  169. objects = NetManger()
  170. class CidrTestModel(models.Model):
  171. '''
  172. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  173. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" = %s', (u'10.0.0.1',))
  174. '''
  175. cidr = CidrAddressField()
  176. objects = NetManger()
  177. class MACTestModel(models.Model):
  178. mac = MACAddressField(null=True)
  179. objects = NetManger()