Browse Source

Introduced ability to decrypt secrets by sending the user's private key in an HTTP header

Jeremy Stretch 8 years ago
parent
commit
2408d78f47
1 changed files with 55 additions and 81 deletions
  1. 55 81
      netbox/secrets/api/views.py

+ 55 - 81
netbox/secrets/api/views.py

@@ -1,11 +1,9 @@
+import base64
 from Crypto.PublicKey import RSA
 
-from django.shortcuts import get_object_or_404
+from django.http import HttpResponseBadRequest
 
-from rest_framework import generics
-from rest_framework import status
 from rest_framework.authentication import BasicAuthentication, SessionAuthentication
-from rest_framework.exceptions import PermissionDenied
 from rest_framework.permissions import IsAuthenticated
 from rest_framework.renderers import JSONRenderer
 from rest_framework.response import Response
@@ -39,8 +37,8 @@ class SecretRoleViewSet(ModelViewSet):
 # Secrets
 #
 
-# TODO: Need to implement custom create() and update() methods to handle secret encryption, and custom list() and
-# retrieve() methods to handle decryption.
+# TODO: Need to implement custom create() and update() methods to handle secret encryption.
+# TODO: Figure out a better way of transmitting the private key
 class SecretViewSet(WritableSerializerMixin, ModelViewSet):
     queryset = Secret.objects.select_related(
         'device__primary_ip4', 'device__primary_ip6', 'role',
@@ -56,97 +54,73 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet):
     authentication_classes = [BasicAuthentication, SessionAuthentication]
     permission_classes = [IsAuthenticated]
 
+    # The user's private key must be sent as a header (X-Private-Key). To overcome limitations around sending line
+    # breaks in a header field, the key must be base64-encoded and stripped of all whitespace. This is a temporary
+    # kludge until a more elegant approach can be devised.
+    def _get_private_key(self, request):
+        private_key_b64 = request.META.get('HTTP_X_PRIVATE_KEY', None)
+        if private_key_b64 is None:
+            return None
+        # TODO: Handle invalid encoding
+        return base64.b64decode(private_key_b64)
 
-# TODO: Delete
-class SecretListView(generics.GenericAPIView):
-    """
-    List secrets (filterable). If a private key is POSTed, attempt to decrypt each Secret.
-    """
-    queryset = Secret.objects.select_related('device__primary_ip4', 'device__primary_ip6', 'role')\
-        .prefetch_related('role__users', 'role__groups')
-    serializer_class = serializers.SecretSerializer
-    filter_class = SecretFilter
-    renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer]
-    permission_classes = [IsAuthenticated]
+    def retrieve(self, request, *args, **kwargs):
+        private_key = self._get_private_key(request)
+        secret = self.get_object()
 
-    def get(self, request, private_key=None):
-        queryset = self.filter_queryset(self.get_queryset())
+        # Attempt to unlock the Secret if a private key was provided
+        if private_key is not None:
 
-        # Attempt to decrypt each Secret if a private key was provided.
-        if private_key:
             try:
-                uk = UserKey.objects.get(user=request.user)
+                user_key = UserKey.objects.get(user=request.user)
             except UserKey.DoesNotExist:
-                return Response(
-                    {'error': ERR_USERKEY_MISSING},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
-            if not uk.is_active():
-                return Response(
-                    {'error': ERR_USERKEY_INACTIVE},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
-            master_key = uk.get_master_key(private_key)
-            if master_key is not None:
-                for s in queryset:
-                    if s.decryptable_by(request.user):
-                        s.decrypt(master_key)
-            else:
-                return Response(
-                    {'error': ERR_PRIVKEY_INVALID},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
+                return HttpResponseBadRequest(ERR_USERKEY_MISSING)
+            if not user_key.is_active():
+                return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
 
-        serializer = self.get_serializer(queryset, many=True)
-        return Response(serializer.data)
+            master_key = user_key.get_master_key(private_key)
+            if master_key is None:
+                return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
 
-    def post(self, request):
-        return self.get(request, private_key=request.POST.get('private_key'))
+            secret.decrypt(master_key)
 
+        serializer = self.get_serializer(secret)
+        return Response(serializer.data)
 
-# TODO: Delete
-class SecretDetailView(generics.GenericAPIView):
-    """
-    Retrieve a single Secret. If a private key is POSTed, attempt to decrypt the Secret.
-    """
-    queryset = Secret.objects.select_related('device__primary_ip4', 'device__primary_ip6', 'role')\
-        .prefetch_related('role__users', 'role__groups')
-    serializer_class = serializers.SecretSerializer
-    renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer]
-    permission_classes = [IsAuthenticated]
+    def list(self, request, *args, **kwargs):
+        private_key = self._get_private_key(request)
+        queryset = self.filter_queryset(self.get_queryset())
 
-    def get(self, request, pk, private_key=None):
-        secret = get_object_or_404(Secret, pk=pk)
+        # Attempt to unlock the Secrets if a private key was provided
+        master_key = None
+        if private_key is not None:
 
-        # Attempt to decrypt the Secret if a private key was provided.
-        if private_key:
             try:
-                uk = UserKey.objects.get(user=request.user)
+                user_key = UserKey.objects.get(user=request.user)
             except UserKey.DoesNotExist:
-                return Response(
-                    {'error': ERR_USERKEY_MISSING},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
-            if not uk.is_active():
-                return Response(
-                    {'error': ERR_USERKEY_INACTIVE},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
-            if not secret.decryptable_by(request.user):
-                raise PermissionDenied(detail="You do not have permission to decrypt this secret.")
-            master_key = uk.get_master_key(private_key)
+                return HttpResponseBadRequest(ERR_USERKEY_MISSING)
+            if not user_key.is_active():
+                return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
+
+            master_key = user_key.get_master_key(private_key)
             if master_key is None:
-                return Response(
-                    {'error': ERR_PRIVKEY_INVALID},
-                    status=status.HTTP_400_BAD_REQUEST
-                )
-            secret.decrypt(master_key)
+                return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
 
-        serializer = self.get_serializer(secret)
-        return Response(serializer.data)
+        # Pagination
+        page = self.paginate_queryset(queryset)
+        if page is not None:
+            secrets = []
+            if master_key is not None:
+                for secret in page:
+                    secret.decrypt(master_key)
+                    secrets.append(secret)
+                serializer = self.get_serializer(secrets, many=True)
+            else:
+                serializer = self.get_serializer(page, many=True)
+            return self.get_paginated_response(serializer.data)
 
-    def post(self, request, pk):
-        return self.get(request, pk, private_key=request.POST.get('private_key'))
+        serializer = self.get_serializer(queryset, many=True)
+        return Response(serializer.data)
 
 
 class RSAKeyGeneratorView(APIView):