manager.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. import re
  2. from IPy import IP
  3. from django import forms
  4. from django.db import models, connection
  5. from django.db.models import sql, query
  6. from django.db.models.query_utils import QueryWrapper
  7. from django.utils.encoding import force_unicode
  8. from django.utils.safestring import mark_safe
  9. NET_OPERATORS = {
  10. 'lt': '<',
  11. 'lte': '<=',
  12. 'exact': '=',
  13. 'iexact': '=',
  14. 'gte': '>=',
  15. 'gt': '>',
  16. 'contains': "ILIKE",
  17. 'startswith': "ILIKE",
  18. 'endswith': "ILIKE",
  19. 'regex': '~*',
  20. 'icontains': "ILIKE",
  21. 'istartswith': "ILIKE",
  22. 'iendswith': "ILIKE",
  23. 'iregex': '~*',
  24. 'net_contained': '<<',
  25. 'net_contained_or_equal': '<<=',
  26. 'net_contains': '>>',
  27. 'net_contains_or_equals': '>>=',
  28. }
  29. NET_TEXT_OPERATORS = ['ILIKE', '~*']
  30. class NetQuery(sql.Query):
  31. query_terms = sql.Query.query_terms.copy()
  32. query_terms.update(NET_OPERATORS)
  33. def add_filter(self, (filter_string, value), *args, **kwargs):
  34. # IP(...) == '' fails so make sure to force to string while we can
  35. if isinstance(value, IP):
  36. value = unicode(value)
  37. return super(NetQuery, self).add_filter(
  38. (filter_string, value), *args, **kwargs)
  39. class NetWhere(sql.where.WhereNode):
  40. def make_atom(self, child, qn):
  41. table_alias, name, db_type, lookup_type, value_annot, params = child
  42. if db_type not in ['inet', 'cidr']:
  43. return super(NetWhere, self).make_atom(child, qn)
  44. if table_alias:
  45. field_sql = '%s.%s' % (qn(table_alias), qn(name))
  46. else:
  47. field_sql = qn(name)
  48. if NET_OPERATORS.get(lookup_type, '') in NET_TEXT_OPERATORS:
  49. if db_type == 'inet':
  50. field_sql = 'HOST(%s)' % field_sql
  51. else:
  52. field_sql = 'TEXT(%s)' % field_sql
  53. if isinstance(params, QueryWrapper):
  54. extra, params = params.data
  55. else:
  56. extra = ''
  57. if lookup_type in NET_OPERATORS:
  58. return ('%s %s %%s %s' % (field_sql, NET_OPERATORS[lookup_type], extra), params)
  59. elif lookup_type == 'in':
  60. if not value_annot:
  61. raise sql.datastructures.EmptyResultSet
  62. if extra:
  63. return ('%s IN %s' % (field_sql, extra), params)
  64. return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))), params)
  65. elif lookup_type == 'range':
  66. return ('%s BETWEEN %%s and %%s' % field_sql, params)
  67. elif lookup_type == 'isnull':
  68. return ('%s IS %sNULL' % (field_sql, (not value_annot and 'NOT ' or '')), params)
  69. raise ValueError('Invalid lookup type "%s"' % lookup_type)
  70. class NetManger(models.Manager):
  71. use_for_related_fields = True
  72. def get_query_set(self):
  73. q = NetQuery(self.model, connection, NetWhere)
  74. return query.QuerySet(self.model, q)
  75. class NetInput(forms.Widget):
  76. input_type = 'text'
  77. def render(self, name, value, attrs=None):
  78. # Default forms.Widget compares value != '' which breaks IP...
  79. if value is None: value = ''
  80. final_attrs = self.build_attrs(attrs, type=self.input_type, name=name)
  81. if value:
  82. final_attrs['value'] = force_unicode(value)
  83. return mark_safe(u'<input%s />' % forms.util.flatatt(final_attrs))
  84. class NetAddressFormField(forms.Field):
  85. widget = NetInput
  86. default_error_messages = {
  87. 'invalid': u'Enter a valid IP Address.',
  88. }
  89. def __init__(self, *args, **kwargs):
  90. super(NetAddressFormField, self).__init__(*args, **kwargs)
  91. def clean(self, value):
  92. super(NetAddressFormField, self).clean(value)
  93. if value in (None, ''):
  94. return None
  95. if isinstance(value, IP):
  96. return value
  97. try:
  98. return IP(value)
  99. except ValueError, e:
  100. raise forms.ValidationError(e)
  101. mac_re = re.compile(r'^(([A-F0-9]{2}:){5}[A-F0-9]{2})$')
  102. class MACAddressFormField(forms.RegexField):
  103. default_error_messages = {
  104. 'invalid': u'Enter a valid MAC address.',
  105. }
  106. def __init__(self, *args, **kwargs):
  107. super(MACAddressFormField, self).__init__(mac_re, *args, **kwargs)
  108. class _NetAddressField(models.Field):
  109. empty_strings_allowed = False
  110. def __init__(self, *args, **kwargs):
  111. kwargs['max_length'] = self.max_length
  112. super(_NetAddressField, self).__init__(*args, **kwargs)
  113. def to_python(self, value):
  114. if not value:
  115. value = None
  116. if value is None:
  117. return value
  118. return IP(value)
  119. def get_db_prep_value(self, value):
  120. if value is None:
  121. return value
  122. return unicode(self.to_python(value))
  123. def get_db_prep_lookup(self, lookup_type, value):
  124. if value is None:
  125. return value
  126. if (lookup_type in NET_OPERATORS and
  127. NET_OPERATORS[lookup_type] not in NET_TEXT_OPERATORS):
  128. return [self.get_db_prep_value(value)]
  129. return super(_NetAddressField, self).get_db_prep_lookup(
  130. lookup_type, value)
  131. def formfield(self, **kwargs):
  132. defaults = {'form_class': NetAddressFormField}
  133. defaults.update(kwargs)
  134. return super(_NetAddressField, self).formfield(**defaults)
  135. class InetAddressField(_NetAddressField):
  136. description = "PostgreSQL INET field"
  137. max_length = 39
  138. __metaclass__ = models.SubfieldBase
  139. def db_type(self):
  140. return 'inet'
  141. class CidrAddressField(_NetAddressField):
  142. description = "PostgreSQL CIDR field"
  143. max_length = 43
  144. __metaclass__ = models.SubfieldBase
  145. def db_type(self):
  146. return 'cidr'
  147. class MACAddressField(models.Field):
  148. description = "PostgreSQL MACADDR field"
  149. def __init__(self, *args, **kwargs):
  150. kwargs['max_length'] = 17
  151. super(MACAddressField, self).__init__(*args, **kwargs)
  152. def db_type(self):
  153. return 'macaddr'
  154. def formfield(self, **kwargs):
  155. defaults = {'form_class': MACAddressFormField}
  156. defaults.update(kwargs)
  157. return super(MACAddressField, self).formfield(**defaults)
  158. # ---- TESTS ----
  159. class InetTestModel(models.Model):
  160. '''
  161. >>> cursor = connection.cursor()
  162. >>> InetTestModel(inet='10.0.0.1').save()
  163. >>> InetTestModel(inet=IP('10.0.0.1')).save()
  164. >>> InetTestModel(inet='').save()
  165. Traceback (most recent call last):
  166. ...
  167. IntegrityError: null value in column "inet" violates not-null constraint
  168. <BLANKLINE>
  169. >>> cursor.execute('ROLLBACK')
  170. >>> InetTestModel(inet='az').save()
  171. Traceback (most recent call last):
  172. ...
  173. ValueError: invalid literal for int() with base 10: 'az'
  174. >>> InetTestModel(inet=None).save()
  175. Traceback (most recent call last):
  176. ...
  177. IntegrityError: null value in column "inet" violates not-null constraint
  178. <BLANKLINE>
  179. >>> cursor.execute('ROLLBACK')
  180. >>> InetTestModel().save()
  181. Traceback (most recent call last):
  182. ...
  183. IntegrityError: null value in column "inet" violates not-null constraint
  184. <BLANKLINE>
  185. >>> cursor.execute('ROLLBACK')
  186. >>> InetTestModel.objects.filter(inet='10.0.0.1').query.as_sql()
  187. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  188. >>> InetTestModel.objects.filter(inet__exact='10.0.0.1').query.as_sql()
  189. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  190. >>> InetTestModel.objects.filter(inet__iexact='10.0.0.1').query.as_sql()
  191. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" = %s ', (u'10.0.0.1',))
  192. >>> InetTestModel.objects.filter(inet__net_contains='10.0.0.1').query.as_sql()
  193. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >> %s ', (u'10.0.0.1',))
  194. >>> InetTestModel.objects.filter(inet__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  195. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  196. >>> InetTestModel.objects.filter(inet__gt='10.0.0.1').query.as_sql()
  197. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" > %s ', (u'10.0.0.1',))
  198. >>> InetTestModel.objects.filter(inet__gte='10.0.0.1').query.as_sql()
  199. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >= %s ', (u'10.0.0.1',))
  200. >>> InetTestModel.objects.filter(inet__lt='10.0.0.1').query.as_sql()
  201. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" < %s ', (u'10.0.0.1',))
  202. >>> InetTestModel.objects.filter(inet__lte='10.0.0.1').query.as_sql()
  203. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" <= %s ', (u'10.0.0.1',))
  204. >>> InetTestModel.objects.filter(inet__startswith='10.').query.as_sql()
  205. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'10.%',))
  206. >>> InetTestModel.objects.filter(inet__istartswith='10.').query.as_sql()
  207. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'10.%',))
  208. >>> InetTestModel.objects.filter(inet__endswith='.1').query.as_sql()
  209. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'%.1',))
  210. >>> InetTestModel.objects.filter(inet__iendswith='.1').query.as_sql()
  211. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ILIKE %s ', (u'%.1',))
  212. >>> InetTestModel.objects.filter(inet__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  213. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  214. >>> InetTestModel.objects.filter(inet__year=1).query.as_sql()
  215. Traceback (most recent call last):
  216. ...
  217. ValueError: Invalid lookup type "year"
  218. >>> InetTestModel.objects.filter(inet__month=1).query.as_sql()
  219. Traceback (most recent call last):
  220. ...
  221. ValueError: Invalid lookup type "month"
  222. >>> InetTestModel.objects.filter(inet__day=1).query.as_sql()
  223. Traceback (most recent call last):
  224. ...
  225. ValueError: Invalid lookup type "day"
  226. >>> InetTestModel.objects.filter(inet__isnull=True).query.as_sql()
  227. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IS NULL', ())
  228. >>> InetTestModel.objects.filter(inet__isnull=False).query.as_sql()
  229. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" IS NOT NULL', ())
  230. >>> InetTestModel.objects.filter(inet__search='10').query.as_sql()
  231. Traceback (most recent call last):
  232. ...
  233. ValueError: Invalid lookup type "search"
  234. >>> InetTestModel.objects.filter(inet__regex=u'10').query.as_sql()
  235. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ~* %s ', (u'10',))
  236. >>> InetTestModel.objects.filter(inet__iregex=u'10').query.as_sql()
  237. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE HOST("inet"."inet") ~* %s ', (u'10',))
  238. >>> InetTestModel.objects.filter(inet__net_contains_or_equals='10.0.0.1').query.as_sql()
  239. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" >>= %s ', (u'10.0.0.1',))
  240. >>> InetTestModel.objects.filter(inet__net_contained='10.0.0.1').query.as_sql()
  241. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" << %s ', (u'10.0.0.1',))
  242. >>> InetTestModel.objects.filter(inet__net_contained_or_equal='10.0.0.1').query.as_sql()
  243. ('SELECT "inet"."id", "inet"."inet" FROM "inet" WHERE "inet"."inet" <<= %s ', (u'10.0.0.1',))
  244. '''
  245. inet = InetAddressField()
  246. objects = NetManger()
  247. class Meta:
  248. db_table = 'inet'
  249. class NullInetTestModel(models.Model):
  250. '''
  251. >>> NullInetTestModel(inet='10.0.0.1').save()
  252. >>> NullInetTestModel(inet=IP('10.0.0.1')).save()
  253. >>> NullInetTestModel(inet='').save()
  254. >>> NullInetTestModel(inet=None).save()
  255. >>> NullInetTestModel().save()
  256. '''
  257. inet = InetAddressField(null=True)
  258. objects = NetManger()
  259. class Meta:
  260. db_table = 'nullinet'
  261. class CidrTestModel(models.Model):
  262. '''
  263. >>> CidrTestModel.objects.filter(cidr='10.0.0.1').query.as_sql()
  264. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  265. >>> CidrTestModel.objects.filter(cidr__exact='10.0.0.1').query.as_sql()
  266. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  267. >>> CidrTestModel.objects.filter(cidr__iexact='10.0.0.1').query.as_sql()
  268. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" = %s ', (u'10.0.0.1',))
  269. >>> CidrTestModel.objects.filter(cidr__net_contains='10.0.0.1').query.as_sql()
  270. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >> %s ', (u'10.0.0.1',))
  271. >>> CidrTestModel.objects.filter(cidr__in=['10.0.0.1', '10.0.0.2']).query.as_sql()
  272. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IN (%s, %s)', (u'10.0.0.1', u'10.0.0.2'))
  273. >>> CidrTestModel.objects.filter(cidr__gt='10.0.0.1').query.as_sql()
  274. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" > %s ', (u'10.0.0.1',))
  275. >>> CidrTestModel.objects.filter(cidr__gte='10.0.0.1').query.as_sql()
  276. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >= %s ', (u'10.0.0.1',))
  277. >>> CidrTestModel.objects.filter(cidr__lt='10.0.0.1').query.as_sql()
  278. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" < %s ', (u'10.0.0.1',))
  279. >>> CidrTestModel.objects.filter(cidr__lte='10.0.0.1').query.as_sql()
  280. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" <= %s ', (u'10.0.0.1',))
  281. >>> CidrTestModel.objects.filter(cidr__startswith='10.').query.as_sql()
  282. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'10.%',))
  283. >>> CidrTestModel.objects.filter(cidr__istartswith='10.').query.as_sql()
  284. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'10.%',))
  285. >>> CidrTestModel.objects.filter(cidr__endswith='.1').query.as_sql()
  286. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'%.1',))
  287. >>> CidrTestModel.objects.filter(cidr__iendswith='.1').query.as_sql()
  288. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ILIKE %s ', (u'%.1',))
  289. >>> CidrTestModel.objects.filter(cidr__range=('10.0.0.1', '10.0.0.10')).query.as_sql()
  290. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" BETWEEN %s and %s', (u'10.0.0.1', u'10.0.0.10'))
  291. >>> CidrTestModel.objects.filter(cidr__year=1).query.as_sql()
  292. Traceback (most recent call last):
  293. ...
  294. ValueError: Invalid lookup type "year"
  295. >>> CidrTestModel.objects.filter(cidr__month=1).query.as_sql()
  296. Traceback (most recent call last):
  297. ...
  298. ValueError: Invalid lookup type "month"
  299. >>> CidrTestModel.objects.filter(cidr__day=1).query.as_sql()
  300. Traceback (most recent call last):
  301. ...
  302. ValueError: Invalid lookup type "day"
  303. >>> CidrTestModel.objects.filter(cidr__isnull=True).query.as_sql()
  304. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IS NULL', ())
  305. >>> CidrTestModel.objects.filter(cidr__isnull=False).query.as_sql()
  306. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" IS NOT NULL', ())
  307. >>> CidrTestModel.objects.filter(cidr__search='10').query.as_sql()
  308. Traceback (most recent call last):
  309. ...
  310. ValueError: Invalid lookup type "search"
  311. >>> CidrTestModel.objects.filter(cidr__regex=u'10').query.as_sql()
  312. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ~* %s ', (u'10',))
  313. >>> CidrTestModel.objects.filter(cidr__iregex=u'10').query.as_sql()
  314. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE TEXT("cidr"."cidr") ~* %s ', (u'10',))
  315. >>> CidrTestModel.objects.filter(cidr__net_contains_or_equals='10.0.0.1').query.as_sql()
  316. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" >>= %s ', (u'10.0.0.1',))
  317. >>> CidrTestModel.objects.filter(cidr__net_contained='10.0.0.1').query.as_sql()
  318. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" << %s ', (u'10.0.0.1',))
  319. >>> CidrTestModel.objects.filter(cidr__net_contained_or_equal='10.0.0.1').query.as_sql()
  320. ('SELECT "cidr"."id", "cidr"."cidr" FROM "cidr" WHERE "cidr"."cidr" <<= %s ', (u'10.0.0.1',))
  321. '''
  322. cidr = CidrAddressField()
  323. objects = NetManger()
  324. class Meta:
  325. db_table = 'cidr'
  326. class MACTestModel(models.Model):
  327. mac = MACAddressField(null=True)
  328. objects = NetManger()
  329. class Meta:
  330. db_table = 'mac'