...
ID | IEP-97 | ||||||||
Author | Anton Vinogradov | ||||||||
Sponsor | |||||||||
Created |
| ||||||||
Status |
|
...
A possible solution is to transform the byte arrays they provided during the marshaling/unmarshalling phase. This will cover both layers, messaging (network) and storage (in-memory + persist).
GridBinaryMarshaller already transforms objects to bytes.
And, all we need is to transform and wrap these bytes.
For example,
and the idea is just to transform the given array somehow and add a special prefix GridBinaryMarshaller#TRANSFORMED == -3 at the beginning to make it distinguishable from untransformed data.
For example,
We need All we need is to cover all CacheObjects.
...
Most of them have the following structure:
...
Code Block | ||||
---|---|---|---|---|
| ||||
protected byte[] valueBytesFromValue(CacheObjectValueContext ctx) throws IgniteCheckedException { byte[] bytes = ctx.kernalContext().cacheObjects().marshal(ctx, val); return CacheObjectTransformerCacheObjectTransformerUtils.transformIfNecessary(bytes, ctx); } protected Object valueFromValueBytes(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { byte[] bytes = CacheObjectTransformerCacheObjectTransformerUtils.restoreIfNecessary(valBytes, ctx); return ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, ldr); } public void prepareMarshal(CacheObjectValueContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = valueBytesFromValue(ctx); } public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { if (val == null) val = valueFromValueBytes(ctx, ldr); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
private Object obj; // Deserialized value. Value converted to the Java class instance. private byte[] arr; // Serialized bytes. Value! |
(De)serialization is similar to (un)marshalling, it's a process to gain java a Java class instance from bytes or vice versa, but it happens at different times and code layers.
...
In a lucky circumstance, BinaryObjectImpl requires no marshalling, serialization already generates bytes which that can be used as marshalled bytes.
...
Code Block | ||||
---|---|---|---|---|
| ||||
private Object obj; // Deserialized value. Value converted to the Java class instance. private byte[] arr; // Serialized bytes. Value! private byte[] valBytes; // Marshalled value bytes. |
...
It's not possible to just replace arr with valBytes because, unlike , for example, from CacheObjectImpl arr is not just a mashalled bytes, it's an object's value that is required, for example, to provide hashCode/schemaId/typeId/objectField, and we must keep it as is.
So, BinaryObjectImpl requires valBytes to/from arr conversion:
Code Block | ||||
---|---|---|---|---|
| ||||
private byte[] arrayFromValueBytes(CacheObjectValueContext ctx) { return CacheObjectTransformerCacheObjectTransformerUtils.restoreIfNecessary(valBytes, ctx); } private byte[] valueBytesFromArray(CacheObjectValueContext ctx) { return CacheObjectTransformerCacheObjectTransformerUtils.transformIfNecessary(arr, start, arr.length, ctx); } public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { if (arr == null) arr = arrayFromValueBytes(ctx); } public void prepareMarshal(CacheObjectValueContext ctx) { if (valBytes == null) valBytes = valueBytesFromArray(ctx); } |
...
Some customers may want to encrypt the data, some to compress it, while some just keep it as is.
So, we must provide a simple way to append any transformation.
The simplest way is to use Service Provider Interface (IgniteSpi):
Code Block | ||||
---|---|---|---|---|
| ||||
public interface CacheObjectTransformerSpiCacheObjectTransformerManager extends IgniteSpi { /** Additional space required to store the transformed data. */ public int OVERHEAD = 6; GridCacheSharedManager { /** * Transforms the data. * * @param bytes Byte array contains theoriginal Original data. * @param offset Data offset. * @param length Data length. * @return Byte array contains the transformed data started with non-filled area @return Transformed data (started with {@link #OVERHEADGridBinaryMarshaller#TRANSFORMED} when size.restorable) * or @throws{@code IgniteCheckedExceptionnull} when transformation is not possible/suitable. */ public byte[]@Nullable ByteBuffer transform(byte[] bytes, int offset, int length) throws IgniteCheckedExceptionByteBuffer original); /** * Restores the data. * * @param bytes Byte array ending with the transformed data. * @param offset Transformed data offset. * @param length Original data length. * @return Byte array contains the restored@return Restored data. */ public byte[]ByteBuffer restore(byte[] bytes, int offset, int lengthByteBuffer transformed); } |
This API is known for the overhead used to store transformed data and allows it to work with byte arrays with custom offsets, which is necessary to guarantee performance.
Every customer may implement this interface in a proper way if necessary and specify it via plugin configuration:
Code Block | ||||
---|---|---|---|---|
| ||||
IgniteConfiguration getConfiguration() { IgniteConfiguration cfg = ... cfg.setCacheObjectTransformerSpisetPluginProviders(new XXXTransformerSpiXXXPluginProvider()); return // Which provides some XXXCacheObjectTransformerManager() return cfg; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class CacheObjectTransformerSpiAdapterCompressionTransformer extends IgniteSpiAdapterCacheObjectTransformerAdapter implements CacheObjectTransformerSpi { ... /** * Transforms the data. protected ByteBuffer transform(ByteBuffer original) throws IgniteCheckedException { * * @param original Original data. * @return Transformed data. int overhead = 5; // Transformed flag + length. * @throws IgniteCheckedException whenint transformationorigSize is not possible/suitable.= original.remaining(); */ int protectedlim abstract= ByteBufferorigSize transform(ByteBuffer original) throws IgniteCheckedException; /** * Restores the data. *- overhead; if (lim <= 0) * @param transformed Transformed data. * @param length Original data length. * @return Restored data. return null; // Compression is not profitable. */ ByteBuffer protectedcompressed abstract= ByteBuffer restorebyteBuffer(ByteBufferoverhead transformed,+ int length); } |
Code Block | ||||
---|---|---|---|---|
| ||||
class CompressionTransformerSpi extends CacheObjectTransformerSpiAdapter { private static final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); protected ByteBuffer transform(ByteBuffer original) throws IgniteCheckedException { int lim = original.remaining() - CacheObjectTransformerSpi.OVERHEAD; if (lim <= 0(int)Zstd.compressBound(origSize)); compressed.put(TRANSFORMED); compressed.putInt(origSize); int size = Zstd.compress(compressed, original, 1); if (size >= lim) return throw new IgniteCheckedException("null; // Compression is not profitable."); ByteBuffer compressed = byteBuffer(lim); Zstd.compress(compressed, original, 1); compressed.flip(); return compressed; } protected ByteBuffer restore(ByteBuffer transformed, int length) { ByteBuffer restored = byteBuffer(lengthtransformed.getInt()); Zstd.decompress(restored, transformed); restored.flip(); return restored; } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
class EncryptionTransformerSpiEncryptionTransformer extends CacheObjectTransformerSpiAdapterCacheObjectTransformerAdapter { private static final int SHIFT = 42; // Secret! protected ByteBuffer transform(ByteBuffer original) throws IgniteCheckedException { ByteBuffer transformed = byteBuffer(original.remaining() + 1); // Same capacity is required. transformed.put(TRANSFORMED); while (original.hasRemaining()) transformed.put((byte)(original.get() + SHIFT)); transformed.flip(); return transformed; } protected ByteBuffer restore(ByteBuffer transformed, int length) { ByteBuffer restored = byteBuffer(lengthtransformed.remaining()); // Same size. while (transformed.hasRemaining()) restored.put((byte)(transformed.get() - SHIFT)); restored.flip(); return restored; } } |
...