manager.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. from IPy import IP
  2. from django.core.exceptions import ValidationError
  3. from django.db import models, connection
  4. from django.db.models import sql, query
  5. from django.db.models.query_utils import QueryWrapper
  6. from django.utils.translation import ugettext_lazy
  7. NET_OPERATORS = {
  8. 'lt': '<',
  9. 'lte': '<=',
  10. 'exact': '=',
  11. 'iexact': '=',
  12. 'gte': '>=',
  13. 'gt': '>',
  14. 'contains': "ILIKE",
  15. 'startswith': "ILIKE",
  16. 'endswith': "ILIKE",
  17. 'regex': '~*',
  18. 'icontains': "ILIKE",
  19. 'istartswith': "ILIKE",
  20. 'iendswith': "ILIKE",
  21. 'iregex': '~*',
  22. 'net_contained': '<<',
  23. 'net_contained_or_equal': '<<=',
  24. 'net_contains': '>>',
  25. 'net_contains_or_equals': '>>=',
  26. }
  27. NET_TEXT_OPERATORS = ['ILIKE', '~*']
  28. class NetQuery(sql.Query):
  29. query_terms = sql.Query.query_terms.copy()
  30. query_terms.update(NET_OPERATORS)
  31. def add_filter(self, (filter_string, value), *args, **kwargs):
  32. # IP(...) == '' fails so make sure to force to string while we can
  33. if isinstance(value, IP):
  34. value = unicode(value)
  35. return super(NetQuery, self).add_filter(
  36. (filter_string, value), *args, **kwargs)
  37. class NetWhere(sql.where.WhereNode):
  38. def make_atom(self, child, qn):
  39. table_alias, name, db_type, lookup_type, value_annot, params = child
  40. if db_type not in ['inet', 'cidr']:
  41. return super(NetWhere, self).make_atom(child, qn)
  42. if table_alias:
  43. field_sql = '%s.%s' % (qn(table_alias), qn(name))
  44. else:
  45. field_sql = qn(name)
  46. if NET_OPERATORS.get(lookup_type, '') in NET_TEXT_OPERATORS:
  47. field_sql = 'HOST(%s)' % field_sql
  48. if isinstance(params, QueryWrapper):
  49. extra, params = params.data
  50. else:
  51. extra = ''
  52. if lookup_type in NET_OPERATORS:
  53. return ('%s %s %%s %s' % (field_sql, NET_OPERATORS[lookup_type], extra), params)
  54. elif lookup_type == 'in':
  55. if not value_annot:
  56. raise sql.datastructures.EmptyResultSet
  57. if extra:
  58. return ('%s IN %s' % (field_sql, extra), params)
  59. return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))), params)
  60. elif lookup_type == 'range':
  61. return ('%s BETWEEN %%s and %%s' % field_sql, params)
  62. elif lookup_type == 'isnull':
  63. return ('%s IS %sNULL' % (field_sql, (not value_annot and 'NOT ' or '')), params)
  64. raise ValueError('Invalid lookup type "%s"' % lookup_type)
  65. class NetManger(models.Manager):
  66. use_for_related_fields = True
  67. def get_query_set(self):
  68. q = NetQuery(self.model, connection, NetWhere)
  69. return query.QuerySet(self.model, q)
  70. # FIXME formfields etc?
  71. # - regexp field for mac
  72. # - IP try catch for ip and cidr
  73. class _NetAddressField(models.Field):
  74. # FIXME init empty object
  75. # FIXME null and blank handling needs to be done right.
  76. def to_python(self, value):
  77. if value is None:
  78. return value
  79. try:
  80. return IP(value)
  81. except ValueError, e:
  82. raise ValidationError(e)
  83. def get_db_prep_value(self, value):
  84. # FIXME does this need to respect null and blank?
  85. # FIXME does not handle __in
  86. if value is None:
  87. return value
  88. return unicode(self.to_python(value))
  89. def get_db_prep_lookup(self, lookup_type, value):
  90. if value is None:
  91. return value
  92. if (lookup_type in NET_OPERATORS and
  93. NET_OPERATORS[lookup_type] not in NET_TEXT_OPERATORS):
  94. return [self.get_db_prep_value(value)]
  95. return super(_NetAddressField, self).get_db_prep_lookup(
  96. lookup_type, value)
  97. class InetAddressField(_NetAddressField):
  98. description = "PostgreSQL INET field"
  99. __metaclass__ = models.SubfieldBase
  100. def db_type(self):
  101. return 'inet'
  102. class CidrAddressField(_NetAddressField):
  103. description = "PostgreSQL CIDR field"
  104. __metaclass__ = models.SubfieldBase
  105. def db_type(self):
  106. return 'cidr'
  107. class MACAddressField(models.Field):
  108. # FIXME does this need proper validation?
  109. description = "PostgreSQL MACADDR field"
  110. def __init__(self, *args, **kwargs):
  111. kwargs['max_length'] = 17
  112. super(MACAddressField, self).__init__(*args, **kwargs)
  113. def db_type(self):
  114. return 'macaddr'
  115. class InetTestModel(models.Model):
  116. '''
  117. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  118. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s ', (u'10.0.0.1',))
  119. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  120. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s ', (u'10.0.0.1',))
  121. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  122. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" = %s ', (u'10.0.0.1',))
  123. >>> InetTestModel.objects.filter(inet__net_contains='10.0.0.1').query.as_sql()
  124. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >> %s ', (u'10.0.0.1',))
  125. >>> InetTestModel.objects.filter(inet__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  126. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  127. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  128. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" > %s ', (u'10.0.0.1',))
  129. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  130. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >= %s ', (u'10.0.0.1',))
  131. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  132. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" < %s ', (u'10.0.0.1',))
  133. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  134. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <= %s ', (u'10.0.0.1',))
  135. >>> InetTestModel.objects.filter(inet__startswith='10.').query.as_sql()
  136. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ILIKE %s ', (u'10.%',))
  137. >>> InetTestModel.objects.filter(inet__istartswith='10.').query.as_sql()
  138. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ILIKE %s ', (u'10.%',))
  139. >>> InetTestModel.objects.filter(inet__endswith='.1').query.as_sql()
  140. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ILIKE %s ', (u'%.1',))
  141. >>> InetTestModel.objects.filter(inet__iendswith='.1').query.as_sql()
  142. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ILIKE %s ', (u'%.1',))
  143. >>> InetTestModel.objects.filter(inet__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  144. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  145. >>> InetTestModel.objects.filter(inet__year=1).query.as_sql()
  146. Traceback (most recent call last):
  147. ...
  148. ValueError: Invalid lookup type "year"
  149. >>> InetTestModel.objects.filter(inet__month=1).query.as_sql()
  150. Traceback (most recent call last):
  151. ...
  152. ValueError: Invalid lookup type "month"
  153. >>> InetTestModel.objects.filter(inet__day=1).query.as_sql()
  154. Traceback (most recent call last):
  155. ...
  156. ValueError: Invalid lookup type "day"
  157. >>> InetTestModel.objects.filter(inet__isnull=True).query.as_sql()
  158. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IS NULL', ())
  159. >>> InetTestModel.objects.filter(inet__isnull=False).query.as_sql()
  160. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" IS NOT NULL', ())
  161. >>> InetTestModel.objects.filter(inet__search='10').query.as_sql()
  162. Traceback (most recent call last):
  163. ...
  164. ValueError: Invalid lookup type "search"
  165. >>> InetTestModel.objects.filter(inet__regex=u'10').query.as_sql()
  166. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~* %s ', (u'10',))
  167. >>> InetTestModel.objects.filter(inet__iregex=u'10').query.as_sql()
  168. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE HOST("foo_inettestmodel"."inet") ~* %s ', (u'10',))
  169. >>> InetTestModel.objects.filter(inet__net_contains_or_equals='10.0.0.1').query.as_sql()
  170. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" >>= %s ', (u'10.0.0.1',))
  171. >>> InetTestModel.objects.filter(inet__net_contained='10.0.0.1').query.as_sql()
  172. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" << %s ', (u'10.0.0.1',))
  173. >>> InetTestModel.objects.filter(inet__net_contained_or_equal='10.0.0.1').query.as_sql()
  174. ('SELECT "foo_inettestmodel"."id", "foo_inettestmodel"."inet" FROM "foo_inettestmodel" WHERE "foo_inettestmodel"."inet" <<= %s ', (u'10.0.0.1',))
  175. '''
  176. inet = InetAddressField()
  177. objects = NetManger()
  178. class CidrTestModel(models.Model):
  179. '''
  180. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  181. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" = %s ', (u'10.0.0.1',))
  182. >>> CidrTestModel.objects.filter(cidr__exact='10.0.0.1').query.as_sql()
  183. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" = %s ', (u'10.0.0.1',))
  184. >>> CidrTestModel.objects.filter(cidr__iexact='10.0.0.1').query.as_sql()
  185. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" = %s ', (u'10.0.0.1',))
  186. >>> CidrTestModel.objects.filter(cidr__net_contains='10.0.0.1').query.as_sql()
  187. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" >> %s ', (u'10.0.0.1',))
  188. >>> CidrTestModel.objects.filter(cidr__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  189. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  190. >>> CidrTestModel.objects.filter(cidr__gt='10.0.0.1').query.as_sql()
  191. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" > %s ', (u'10.0.0.1',))
  192. >>> CidrTestModel.objects.filter(cidr__gte='10.0.0.1').query.as_sql()
  193. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" >= %s ', (u'10.0.0.1',))
  194. >>> CidrTestModel.objects.filter(cidr__lt='10.0.0.1').query.as_sql()
  195. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" < %s ', (u'10.0.0.1',))
  196. >>> CidrTestModel.objects.filter(cidr__lte='10.0.0.1').query.as_sql()
  197. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" <= %s ', (u'10.0.0.1',))
  198. >>> CidrTestModel.objects.filter(cidr__startswith='10.').query.as_sql()
  199. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ILIKE %s ', (u'10.%',))
  200. >>> CidrTestModel.objects.filter(cidr__istartswith='10.').query.as_sql()
  201. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ILIKE %s ', (u'10.%',))
  202. >>> CidrTestModel.objects.filter(cidr__endswith='.1').query.as_sql()
  203. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ILIKE %s ', (u'%.1',))
  204. >>> CidrTestModel.objects.filter(cidr__iendswith='.1').query.as_sql()
  205. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ILIKE %s ', (u'%.1',))
  206. >>> CidrTestModel.objects.filter(cidr__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  207. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  208. >>> CidrTestModel.objects.filter(cidr__year=1).query.as_sql()
  209. Traceback (most recent call last):
  210. ...
  211. ValueError: Invalid lookup type "year"
  212. >>> CidrTestModel.objects.filter(cidr__month=1).query.as_sql()
  213. Traceback (most recent call last):
  214. ...
  215. ValueError: Invalid lookup type "month"
  216. >>> CidrTestModel.objects.filter(cidr__day=1).query.as_sql()
  217. Traceback (most recent call last):
  218. ...
  219. ValueError: Invalid lookup type "day"
  220. >>> CidrTestModel.objects.filter(cidr__isnull=True).query.as_sql()
  221. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" IS NULL', ())
  222. >>> CidrTestModel.objects.filter(cidr__isnull=False).query.as_sql()
  223. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" IS NOT NULL', ())
  224. >>> CidrTestModel.objects.filter(cidr__search='10').query.as_sql()
  225. Traceback (most recent call last):
  226. ...
  227. ValueError: Invalid lookup type "search"
  228. >>> CidrTestModel.objects.filter(cidr__regex=u'10').query.as_sql()
  229. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ~* %s ', (u'10',))
  230. >>> CidrTestModel.objects.filter(cidr__iregex=u'10').query.as_sql()
  231. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE TEXT("foo_cidrtestmodel"."cidr") ~* %s ', (u'10',))
  232. >>> CidrTestModel.objects.filter(cidr__net_contains_or_equals='10.0.0.1').query.as_sql()
  233. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" >>= %s ', (u'10.0.0.1',))
  234. >>> CidrTestModel.objects.filter(cidr__net_contained='10.0.0.1').query.as_sql()
  235. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" << %s ', (u'10.0.0.1',))
  236. >>> CidrTestModel.objects.filter(cidr__net_contained_or_equal='10.0.0.1').query.as_sql()
  237. ('SELECT "foo_cidrtestmodel"."id", "foo_cidrtestmodel"."cidr" FROM "foo_cidrtestmodel" WHERE "foo_cidrtestmodel"."cidr" <<= %s ', (u'10.0.0.1',))
  238. '''
  239. cidr = CidrAddressField()
  240. objects = NetManger()
  241. class MACTestModel(models.Model):
  242. mac = MACAddressField(null=True)
  243. objects = NetManger()