pssecret-server/pssecret_server/utils.py

68 lines
1.7 KiB
Python
Raw Permalink Normal View History

from functools import lru_cache
from uuid import uuid4
from cryptography.fernet import Fernet
from redis.asyncio import Redis
from redis.exceptions import ResponseError
from redis.typing import ResponseT
2025-01-01 19:01:10 +00:00
from pssecret_server.models import Secret
def encrypt_secret(data: Secret, fernet: Fernet) -> Secret:
encrypted = fernet.encrypt(data.data.encode()).decode()
return Secret(data=encrypted)
def decrypt_secret(secret: bytes, fernet: Fernet) -> bytes:
return fernet.decrypt(secret)
async def get_new_key(redis: Redis) -> str:
"""Returns free Redis key"""
while True:
new_key = str(uuid4())
if not await redis.exists(new_key):
return new_key
async def save_secret(data: Secret, redis: Redis) -> str:
"""Save passed data, returns retrieval key"""
new_key = await get_new_key(redis)
await redis.setex(new_key, 60 * 60 * 24, data.data)
return new_key
@lru_cache
async def _is_getdel_available(redis: Redis) -> bool:
"""Checks the availability of GETDEL command on the Redis server instance
GETDEL is not available in Redis prior to version 6.2
"""
try:
await redis.getdel("test:getdel:availability")
except ResponseError:
return False
return True
async def getdel(redis: Redis, key: str) -> ResponseT:
"""Gets the value of key and deletes the key
Depending on the capabilities of Redis server this function
will either call GETDEL command, either first call GETSET with empty string
and DEL right after that.
"""
result: ResponseT
if await _is_getdel_available(redis):
result = await redis.getdel(key)
else:
result = await redis.getset(key, "")
await redis.delete(key)
return result