Source code for eventsourcing.compressor

import zlib
from abc import ABC, abstractmethod


[docs]class Compressor(ABC):
[docs] @abstractmethod def compress(self, data: bytes) -> bytes: """ Compress bytes. """
[docs] @abstractmethod def decompress(self, data: bytes) -> bytes: """ Decompress bytes. """
[docs]class ZlibCompressor(Compressor):
[docs] def compress(self, data: bytes) -> bytes: """ Compress bytes using zlib. """ return zlib.compress(data)
[docs] def decompress(self, data: bytes) -> bytes: """ Decompress bytes using zlib. """ return zlib.decompress(data)