manager.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. from IPy import IP
  2. from django.db import models, connection
  3. from django.db.models import sql, query
  4. from django.db.models.query_utils import QueryWrapper
  5. NET_OPERATORS = {
  6. 'lt': '<',
  7. 'lte': '<=',
  8. 'exact': '=',
  9. 'iexact': '=',
  10. 'gte': '>=',
  11. 'gt': '>',
  12. 'contains': "ILIKE",
  13. 'startswith': "ILIKE",
  14. 'endswith': "ILIKE",
  15. 'regex': '~*',
  16. 'icontains': "ILIKE",
  17. 'istartswith': "ILIKE",
  18. 'iendswith': "ILIKE",
  19. 'iregex': '~*',
  20. 'net_contained': '<<',
  21. 'net_contained_or_equal': '<<=',
  22. 'net_contains': '>>',
  23. 'net_contains_or_equals': '>>=',
  24. }
  25. NET_TEXT_OPERATORS = ['ILIKE', '~*']
  26. class NetQuery(sql.Query):
  27. query_terms = sql.Query.query_terms.copy()
  28. query_terms.update(NET_OPERATORS)
  29. def add_filter(self, (filter_string, value), *args, **kwargs):
  30. # IP(...) == '' fails so make sure to force to string while we can
  31. if isinstance(value, IP):
  32. value = unicode(value)
  33. return super(NetQuery, self).add_filter(
  34. (filter_string, value), *args, **kwargs)
  35. class NetWhere(sql.where.WhereNode):
  36. def make_atom(self, child, qn):
  37. table_alias, name, db_type, lookup_type, value_annot, params = child
  38. if db_type not in ['inet', 'cidr']:
  39. return super(NetWhere, self).make_atom(child, qn)
  40. if table_alias:
  41. field_sql = '%s.%s' % (qn(table_alias), qn(name))
  42. else:
  43. field_sql = qn(name)
  44. if NET_OPERATORS.get(lookup_type, '') in NET_TEXT_OPERATORS:
  45. if db_type == 'inet':
  46. field_sql = 'HOST(%s)' % field_sql
  47. else:
  48. field_sql = 'TEXT(%s)' % field_sql
  49. if isinstance(params, QueryWrapper):
  50. extra, params = params.data
  51. else:
  52. extra = ''
  53. if lookup_type in NET_OPERATORS:
  54. return ('%s %s %%s %s' % (field_sql, NET_OPERATORS[lookup_type], extra), params)
  55. elif lookup_type == 'in':
  56. if not value_annot:
  57. raise sql.datastructures.EmptyResultSet
  58. if extra:
  59. return ('%s IN %s' % (field_sql, extra), params)
  60. return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))), params)
  61. elif lookup_type == 'range':
  62. return ('%s BETWEEN %%s and %%s' % field_sql, params)
  63. elif lookup_type == 'isnull':
  64. return ('%s IS %sNULL' % (field_sql, (not value_annot and 'NOT ' or '')), params)
  65. raise ValueError('Invalid lookup type "%s"' % lookup_type)
  66. class NetManger(models.Manager):
  67. use_for_related_fields = True
  68. def get_query_set(self):
  69. q = NetQuery(self.model, connection, NetWhere)
  70. return query.QuerySet(self.model, q)
  71. class _NetAddressField(models.Field):
  72. empty_strings_allowed = False
  73. def __init__(self, *args, **kwargs):
  74. kwargs['max_length'] = self.max_length
  75. super(_NetAddressField, self).__init__(*args, **kwargs)
  76. def to_python(self, value):
  77. if not value:
  78. value = None
  79. if value is None:
  80. return value
  81. return IP(value)
  82. def get_db_prep_value(self, value):
  83. if value is None:
  84. return value
  85. return unicode(self.to_python(value))
  86. def get_db_prep_lookup(self, lookup_type, value):
  87. if value is None:
  88. return value
  89. if (lookup_type in NET_OPERATORS and
  90. NET_OPERATORS[lookup_type] not in NET_TEXT_OPERATORS):
  91. return [self.get_db_prep_value(value)]
  92. return super(_NetAddressField, self).get_db_prep_lookup(
  93. lookup_type, value)
  94. class InetAddressField(_NetAddressField):
  95. description = "PostgreSQL INET field"
  96. max_length = 39
  97. __metaclass__ = models.SubfieldBase
  98. def db_type(self):
  99. return 'inet'
  100. class CidrAddressField(_NetAddressField):
  101. description = "PostgreSQL CIDR field"
  102. max_length = 43
  103. __metaclass__ = models.SubfieldBase
  104. def db_type(self):
  105. return 'cidr'
  106. class MACAddressField(models.Field):
  107. # FIXME does this need proper validation?
  108. description = "PostgreSQL MACADDR field"
  109. def __init__(self, *args, **kwargs):
  110. kwargs['max_length'] = 17
  111. super(MACAddressField, self).__init__(*args, **kwargs)
  112. def db_type(self):
  113. return 'macaddr'
  114. class InetTestModel(models.Model):
  115. '''
  116. >>> cursor = connection.cursor()
  117. >>> InetTestModel(inet='10.0.0.1').save()
  118. >>> InetTestModel(inet=IP('10.0.0.1')).save()
  119. >>> InetTestModel(inet='').save()
  120. Traceback (most recent call last):
  121. ...
  122. IntegrityError: null value in column "inet" violates not-null constraint
  123. <BLANKLINE>
  124. >>> cursor.execute('ROLLBACK')
  125. >>> InetTestModel(inet='az').save()
  126. Traceback (most recent call last):
  127. ...
  128. ValueError: invalid literal for int() with base 10: 'az'
  129. >>> InetTestModel(inet=None).save()
  130. Traceback (most recent call last):
  131. ...
  132. IntegrityError: null value in column "inet" violates not-null constraint
  133. <BLANKLINE>
  134. >>> cursor.execute('ROLLBACK')
  135. >>> InetTestModel().save()
  136. Traceback (most recent call last):
  137. ...
  138. IntegrityError: null value in column "inet" violates not-null constraint
  139. <BLANKLINE>
  140. >>> cursor.execute('ROLLBACK')
  141. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  142. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  143. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  144. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  145. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  146. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  147. >>> InetTestModel.objects.filter(inet__net_contains='10.0.0.1').query.as_sql()
  148. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >> %s ', (u'10.0.0.1',))
  149. >>> InetTestModel.objects.filter(inet__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  150. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  151. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  152. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" > %s ', (u'10.0.0.1',))
  153. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  154. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >= %s ', (u'10.0.0.1',))
  155. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  156. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" < %s ', (u'10.0.0.1',))
  157. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  158. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" <= %s ', (u'10.0.0.1',))
  159. >>> InetTestModel.objects.filter(inet__startswith='10.').query.as_sql()
  160. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'10.%',))
  161. >>> InetTestModel.objects.filter(inet__istartswith='10.').query.as_sql()
  162. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'10.%',))
  163. >>> InetTestModel.objects.filter(inet__endswith='.1').query.as_sql()
  164. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'%.1',))
  165. >>> InetTestModel.objects.filter(inet__iendswith='.1').query.as_sql()
  166. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'%.1',))
  167. >>> InetTestModel.objects.filter(inet__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  168. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  169. >>> InetTestModel.objects.filter(inet__year=1).query.as_sql()
  170. Traceback (most recent call last):
  171. ...
  172. ValueError: Invalid lookup type "year"
  173. >>> InetTestModel.objects.filter(inet__month=1).query.as_sql()
  174. Traceback (most recent call last):
  175. ...
  176. ValueError: Invalid lookup type "month"
  177. >>> InetTestModel.objects.filter(inet__day=1).query.as_sql()
  178. Traceback (most recent call last):
  179. ...
  180. ValueError: Invalid lookup type "day"
  181. >>> InetTestModel.objects.filter(inet__isnull=True).query.as_sql()
  182. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IS NULL', ())
  183. >>> InetTestModel.objects.filter(inet__isnull=False).query.as_sql()
  184. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IS NOT NULL', ())
  185. >>> InetTestModel.objects.filter(inet__search='10').query.as_sql()
  186. Traceback (most recent call last):
  187. ...
  188. ValueError: Invalid lookup type "search"
  189. >>> InetTestModel.objects.filter(inet__regex=u'10').query.as_sql()
  190. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ~* %s ', (u'10',))
  191. >>> InetTestModel.objects.filter(inet__iregex=u'10').query.as_sql()
  192. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ~* %s ', (u'10',))
  193. >>> InetTestModel.objects.filter(inet__net_contains_or_equals='10.0.0.1').query.as_sql()
  194. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >>= %s ', (u'10.0.0.1',))
  195. >>> InetTestModel.objects.filter(inet__net_contained='10.0.0.1').query.as_sql()
  196. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" << %s ', (u'10.0.0.1',))
  197. >>> InetTestModel.objects.filter(inet__net_contained_or_equal='10.0.0.1').query.as_sql()
  198. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" <<= %s ', (u'10.0.0.1',))
  199. '''
  200. inet = InetAddressField()
  201. objects = NetManger()
  202. class Meta:
  203. db_table = 'inet'
  204. class NullInetTestModel(models.Model):
  205. '''
  206. >>> NullInetTestModel(inet='10.0.0.1').save()
  207. >>> NullInetTestModel(inet=IP('10.0.0.1')).save()
  208. >>> NullInetTestModel(inet='').save()
  209. >>> NullInetTestModel(inet=None).save()
  210. >>> NullInetTestModel().save()
  211. '''
  212. inet = InetAddressField(null=True)
  213. objects = NetManger()
  214. class Meta:
  215. db_table = 'nullinet'
  216. class CidrTestModel(models.Model):
  217. '''
  218. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  219. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  220. >>> CidrTestModel.objects.filter(cidr__exact='10.0.0.1').query.as_sql()
  221. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  222. >>> CidrTestModel.objects.filter(cidr__iexact='10.0.0.1').query.as_sql()
  223. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  224. >>> CidrTestModel.objects.filter(cidr__net_contains='10.0.0.1').query.as_sql()
  225. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >> %s ', (u'10.0.0.1',))
  226. >>> CidrTestModel.objects.filter(cidr__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  227. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  228. >>> CidrTestModel.objects.filter(cidr__gt='10.0.0.1').query.as_sql()
  229. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" > %s ', (u'10.0.0.1',))
  230. >>> CidrTestModel.objects.filter(cidr__gte='10.0.0.1').query.as_sql()
  231. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >= %s ', (u'10.0.0.1',))
  232. >>> CidrTestModel.objects.filter(cidr__lt='10.0.0.1').query.as_sql()
  233. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" < %s ', (u'10.0.0.1',))
  234. >>> CidrTestModel.objects.filter(cidr__lte='10.0.0.1').query.as_sql()
  235. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" <= %s ', (u'10.0.0.1',))
  236. >>> CidrTestModel.objects.filter(cidr__startswith='10.').query.as_sql()
  237. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'10.%',))
  238. >>> CidrTestModel.objects.filter(cidr__istartswith='10.').query.as_sql()
  239. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'10.%',))
  240. >>> CidrTestModel.objects.filter(cidr__endswith='.1').query.as_sql()
  241. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'%.1',))
  242. >>> CidrTestModel.objects.filter(cidr__iendswith='.1').query.as_sql()
  243. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'%.1',))
  244. >>> CidrTestModel.objects.filter(cidr__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  245. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  246. >>> CidrTestModel.objects.filter(cidr__year=1).query.as_sql()
  247. Traceback (most recent call last):
  248. ...
  249. ValueError: Invalid lookup type "year"
  250. >>> CidrTestModel.objects.filter(cidr__month=1).query.as_sql()
  251. Traceback (most recent call last):
  252. ...
  253. ValueError: Invalid lookup type "month"
  254. >>> CidrTestModel.objects.filter(cidr__day=1).query.as_sql()
  255. Traceback (most recent call last):
  256. ...
  257. ValueError: Invalid lookup type "day"
  258. >>> CidrTestModel.objects.filter(cidr__isnull=True).query.as_sql()
  259. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IS NULL', ())
  260. >>> CidrTestModel.objects.filter(cidr__isnull=False).query.as_sql()
  261. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IS NOT NULL', ())
  262. >>> CidrTestModel.objects.filter(cidr__search='10').query.as_sql()
  263. Traceback (most recent call last):
  264. ...
  265. ValueError: Invalid lookup type "search"
  266. >>> CidrTestModel.objects.filter(cidr__regex=u'10').query.as_sql()
  267. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ~* %s ', (u'10',))
  268. >>> CidrTestModel.objects.filter(cidr__iregex=u'10').query.as_sql()
  269. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ~* %s ', (u'10',))
  270. >>> CidrTestModel.objects.filter(cidr__net_contains_or_equals='10.0.0.1').query.as_sql()
  271. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >>= %s ', (u'10.0.0.1',))
  272. >>> CidrTestModel.objects.filter(cidr__net_contained='10.0.0.1').query.as_sql()
  273. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" << %s ', (u'10.0.0.1',))
  274. >>> CidrTestModel.objects.filter(cidr__net_contained_or_equal='10.0.0.1').query.as_sql()
  275. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" <<= %s ', (u'10.0.0.1',))
  276. '''
  277. cidr = CidrAddressField()
  278. objects = NetManger()
  279. class Meta:
  280. db_table = 'cidr'
  281. class MACTestModel(models.Model):
  282. mac = MACAddressField(null=True)
  283. objects = NetManger()
  284. class Meta:
  285. db_table = 'mac'