|
@@ -1,5 +1,7 @@
|
|
|
import base64
|
|
|
+from Crypto.Cipher import XOR
|
|
|
from Crypto.PublicKey import RSA
|
|
|
+import os
|
|
|
|
|
|
from django.http import HttpResponseBadRequest
|
|
|
|
|
@@ -20,6 +22,7 @@ from . import serializers
|
|
|
|
|
|
ERR_USERKEY_MISSING = "No UserKey found for the current user."
|
|
|
ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
|
|
|
+ERR_PRIVKEY_MISSING = "Private key was not provided."
|
|
|
ERR_PRIVKEY_INVALID = "Invalid private key."
|
|
|
|
|
|
|
|
@@ -38,7 +41,6 @@ class SecretRoleViewSet(ModelViewSet):
|
|
|
#
|
|
|
|
|
|
# TODO: Need to implement custom create() and update() methods to handle secret encryption.
|
|
|
-# TODO: Figure out a better way of transmitting the private key
|
|
|
class SecretViewSet(WritableSerializerMixin, ModelViewSet):
|
|
|
queryset = Secret.objects.select_related(
|
|
|
'device__primary_ip4', 'device__primary_ip6', 'role',
|
|
@@ -54,58 +56,33 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet):
|
|
|
authentication_classes = [BasicAuthentication, SessionAuthentication]
|
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
|
|
- # The user's private key must be sent as a header (X-Private-Key). To overcome limitations around sending line
|
|
|
- # breaks in a header field, the key must be base64-encoded and stripped of all whitespace. This is a temporary
|
|
|
- # kludge until a more elegant approach can be devised.
|
|
|
- def _get_private_key(self, request):
|
|
|
- private_key_b64 = request.META.get('HTTP_X_PRIVATE_KEY', None)
|
|
|
- if private_key_b64 is None:
|
|
|
+ def _get_master_key(self, request):
|
|
|
+ cached_key = request.session.get('cached_key', None)
|
|
|
+ session_key = request.COOKIES.get('session_key', None)
|
|
|
+ if cached_key is None or session_key is None:
|
|
|
return None
|
|
|
- # TODO: Handle invalid encoding
|
|
|
- return base64.b64decode(private_key_b64)
|
|
|
|
|
|
- def retrieve(self, request, *args, **kwargs):
|
|
|
- private_key = self._get_private_key(request)
|
|
|
- secret = self.get_object()
|
|
|
-
|
|
|
- # Attempt to unlock the Secret if a private key was provided
|
|
|
- if private_key is not None:
|
|
|
+ cached_key = base64.b64decode(cached_key)
|
|
|
+ session_key = base64.b64decode(session_key)
|
|
|
|
|
|
- try:
|
|
|
- user_key = UserKey.objects.get(user=request.user)
|
|
|
- except UserKey.DoesNotExist:
|
|
|
- return HttpResponseBadRequest(ERR_USERKEY_MISSING)
|
|
|
- if not user_key.is_active():
|
|
|
- return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
|
|
|
+ xor = XOR.new(session_key)
|
|
|
+ master_key = xor.encrypt(cached_key)
|
|
|
+ return master_key
|
|
|
|
|
|
- master_key = user_key.get_master_key(private_key)
|
|
|
- if master_key is None:
|
|
|
- return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
|
|
|
+ def retrieve(self, request, *args, **kwargs):
|
|
|
+ master_key = self._get_master_key(request)
|
|
|
+ secret = self.get_object()
|
|
|
|
|
|
+ if master_key is not None:
|
|
|
secret.decrypt(master_key)
|
|
|
|
|
|
serializer = self.get_serializer(secret)
|
|
|
return Response(serializer.data)
|
|
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
|
- private_key = self._get_private_key(request)
|
|
|
+ master_key = self._get_master_key(request)
|
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
|
|
|
|
- # Attempt to unlock the Secrets if a private key was provided
|
|
|
- master_key = None
|
|
|
- if private_key is not None:
|
|
|
-
|
|
|
- try:
|
|
|
- user_key = UserKey.objects.get(user=request.user)
|
|
|
- except UserKey.DoesNotExist:
|
|
|
- return HttpResponseBadRequest(ERR_USERKEY_MISSING)
|
|
|
- if not user_key.is_active():
|
|
|
- return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
|
|
|
-
|
|
|
- master_key = user_key.get_master_key(private_key)
|
|
|
- if master_key is None:
|
|
|
- return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
|
|
|
-
|
|
|
# Pagination
|
|
|
page = self.paginate_queryset(queryset)
|
|
|
if page is not None:
|
|
@@ -145,3 +122,44 @@ class RSAKeyGeneratorView(APIView):
|
|
|
'private_key': private_key,
|
|
|
'public_key': public_key,
|
|
|
})
|
|
|
+
|
|
|
+
|
|
|
+class GetSessionKey(APIView):
|
|
|
+ """
|
|
|
+ Cache an encrypted copy of the master key derived from the submitted private key.
|
|
|
+ """
|
|
|
+ permission_classes = [IsAuthenticated]
|
|
|
+
|
|
|
+ def post(self, request):
|
|
|
+
|
|
|
+ # Read private key
|
|
|
+ private_key = request.POST.get('private_key', None)
|
|
|
+ if private_key is None:
|
|
|
+ return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
|
|
|
+
|
|
|
+ # Validate user key
|
|
|
+ try:
|
|
|
+ user_key = UserKey.objects.get(user=request.user)
|
|
|
+ except UserKey.DoesNotExist:
|
|
|
+ return HttpResponseBadRequest(ERR_USERKEY_MISSING)
|
|
|
+ if not user_key.is_active():
|
|
|
+ return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
|
|
|
+
|
|
|
+ # Validate private key
|
|
|
+ master_key = user_key.get_master_key(private_key)
|
|
|
+ if master_key is None:
|
|
|
+ return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
|
|
|
+
|
|
|
+ # Generate a random 256-bit encryption key
|
|
|
+ session_key = os.urandom(32)
|
|
|
+ xor = XOR.new(session_key)
|
|
|
+ cached_key = xor.encrypt(master_key)
|
|
|
+
|
|
|
+ # Save XORed copy of the master key
|
|
|
+ request.session['cached_key'] = base64.b64encode(cached_key)
|
|
|
+
|
|
|
+ response = Response({
|
|
|
+ 'session_key': base64.b64encode(session_key),
|
|
|
+ })
|
|
|
+ response.set_cookie('session_key', base64.b64encode(session_key))
|
|
|
+ return response
|