views.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. from __future__ import unicode_literals
  2. import base64
  3. from Crypto.PublicKey import RSA
  4. from rest_framework.exceptions import ValidationError
  5. from rest_framework.permissions import IsAuthenticated
  6. from rest_framework.response import Response
  7. from rest_framework.viewsets import ModelViewSet, ViewSet
  8. from django.http import HttpResponseBadRequest
  9. from secrets import filters
  10. from secrets.exceptions import InvalidKey
  11. from secrets.models import Secret, SecretRole, SessionKey, UserKey
  12. from utilities.api import FieldChoicesViewSet, WritableSerializerMixin
  13. from . import serializers
  14. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  15. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  16. ERR_PRIVKEY_MISSING = "Private key was not provided."
  17. ERR_PRIVKEY_INVALID = "Invalid private key."
  18. #
  19. # Field choices
  20. #
  21. class SecretsFieldChoicesViewSet(FieldChoicesViewSet):
  22. fields = ()
  23. #
  24. # Secret Roles
  25. #
  26. class SecretRoleViewSet(ModelViewSet):
  27. queryset = SecretRole.objects.all()
  28. serializer_class = serializers.SecretRoleSerializer
  29. permission_classes = [IsAuthenticated]
  30. filter_class = filters.SecretRoleFilter
  31. #
  32. # Secrets
  33. #
  34. class SecretViewSet(WritableSerializerMixin, ModelViewSet):
  35. queryset = Secret.objects.select_related(
  36. 'device__primary_ip4', 'device__primary_ip6', 'role',
  37. ).prefetch_related(
  38. 'role__users', 'role__groups',
  39. )
  40. serializer_class = serializers.SecretSerializer
  41. write_serializer_class = serializers.WritableSecretSerializer
  42. filter_class = filters.SecretFilter
  43. master_key = None
  44. def _get_encrypted_fields(self, serializer):
  45. """
  46. Since we can't call encrypt() on the serializer like we can on the Secret model, we need to calculate the
  47. ciphertext and hash values by encrypting a dummy copy. These can be passed to the serializer's save() method.
  48. """
  49. s = Secret(plaintext=serializer.validated_data['plaintext'])
  50. s.encrypt(self.master_key)
  51. return ({
  52. 'ciphertext': s.ciphertext,
  53. 'hash': s.hash,
  54. })
  55. def initial(self, request, *args, **kwargs):
  56. super(SecretViewSet, self).initial(request, *args, **kwargs)
  57. if request.user.is_authenticated():
  58. # Read session key from HTTP cookie or header if it has been provided. The session key must be provided in
  59. # order to encrypt/decrypt secrets.
  60. if 'session_key' in request.COOKIES:
  61. session_key = base64.b64decode(request.COOKIES['session_key'])
  62. elif 'HTTP_X_SESSION_KEY' in request.META:
  63. session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
  64. else:
  65. session_key = None
  66. # We can't encrypt secret plaintext without a session key.
  67. if self.action in ['create', 'update'] and session_key is None:
  68. raise ValidationError("A session key must be provided when creating or updating secrets.")
  69. # Attempt to retrieve the master key for encryption/decryption if a session key has been provided.
  70. if session_key is not None:
  71. try:
  72. sk = SessionKey.objects.get(userkey__user=request.user)
  73. self.master_key = sk.get_master_key(session_key)
  74. except (SessionKey.DoesNotExist, InvalidKey):
  75. raise ValidationError("Invalid session key.")
  76. def retrieve(self, request, *args, **kwargs):
  77. secret = self.get_object()
  78. # Attempt to decrypt the secret if the master key is known
  79. if self.master_key is not None:
  80. secret.decrypt(self.master_key)
  81. serializer = self.get_serializer(secret)
  82. return Response(serializer.data)
  83. def list(self, request, *args, **kwargs):
  84. queryset = self.filter_queryset(self.get_queryset())
  85. page = self.paginate_queryset(queryset)
  86. if page is not None:
  87. # Attempt to decrypt all secrets if the master key is known
  88. if self.master_key is not None:
  89. secrets = []
  90. for secret in page:
  91. secret.decrypt(self.master_key)
  92. secrets.append(secret)
  93. serializer = self.get_serializer(secrets, many=True)
  94. else:
  95. serializer = self.get_serializer(page, many=True)
  96. return self.get_paginated_response(serializer.data)
  97. serializer = self.get_serializer(queryset, many=True)
  98. return Response(serializer.data)
  99. def perform_create(self, serializer):
  100. serializer.save(**self._get_encrypted_fields(serializer))
  101. def perform_update(self, serializer):
  102. serializer.save(**self._get_encrypted_fields(serializer))
  103. class GetSessionKeyViewSet(ViewSet):
  104. """
  105. Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
  106. key is POSTed with the name `private_key`. An example:
  107. curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
  108. --data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
  109. This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
  110. {
  111. "session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
  112. }
  113. This endpoint accepts one optional parameter: `preserve_key`. If True and a session key exists, the existing session
  114. key will be returned instead of a new one.
  115. """
  116. permission_classes = [IsAuthenticated]
  117. def create(self, request):
  118. # Read private key
  119. private_key = request.POST.get('private_key', None)
  120. if private_key is None:
  121. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  122. # Validate user key
  123. try:
  124. user_key = UserKey.objects.get(user=request.user)
  125. except UserKey.DoesNotExist:
  126. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  127. if not user_key.is_active():
  128. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  129. # Validate private key
  130. master_key = user_key.get_master_key(private_key)
  131. if master_key is None:
  132. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  133. try:
  134. current_session_key = SessionKey.objects.get(userkey__user_id=request.user.pk)
  135. except SessionKey.DoesNotExist:
  136. current_session_key = None
  137. if current_session_key and request.GET.get('preserve_key', False):
  138. # Retrieve the existing session key
  139. key = current_session_key.get_session_key(master_key)
  140. else:
  141. # Create a new SessionKey
  142. SessionKey.objects.filter(userkey__user=request.user).delete()
  143. sk = SessionKey(userkey=user_key)
  144. sk.save(master_key=master_key)
  145. key = sk.key
  146. # Encode the key using base64. (b64decode() returns a bytestring under Python 3.)
  147. encoded_key = base64.b64encode(key).decode()
  148. # Craft the response
  149. response = Response({
  150. 'session_key': encoded_key,
  151. })
  152. # If token authentication is not in use, assign the session key as a cookie
  153. if request.auth is None:
  154. response.set_cookie('session_key', value=encoded_key)
  155. return response
  156. class GenerateRSAKeyPairViewSet(ViewSet):
  157. """
  158. This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
  159. {
  160. "public_key": "<public key>",
  161. "private_key": "<private key>"
  162. }
  163. """
  164. permission_classes = [IsAuthenticated]
  165. def list(self, request):
  166. # Determine what size key to generate
  167. key_size = request.GET.get('key_size', 2048)
  168. if key_size not in range(2048, 4097, 256):
  169. key_size = 2048
  170. # Export RSA private and public keys in PEM format
  171. key = RSA.generate(key_size)
  172. private_key = key.exportKey('PEM')
  173. public_key = key.publickey().exportKey('PEM')
  174. return Response({
  175. 'private_key': private_key,
  176. 'public_key': public_key,
  177. })