SparkStreaming - Kryo框架的关键功能

1.概述

Kryo是一个Java序列化框架。本文将尝试着探索Kryo框架的关键功能,并用示例展示出来。

2. Maven依赖

<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>4.0.1</version>
</dependency>

可以在Maven Central上找到最新版本。

3. Kryo基础

Kryo的工作方式,以及如何使用它来序列化和反序列化对象。

3.1 介绍

该框架将Kryo类作为其所有功能的主要入口点。

此类协调序列化过程,并将将对象转换为字节形式。字节准备好后,将使用Output对象将它们写入流。这样,它们可以存储在文件,数据库中或通过网络传输。稍后,当需要该对象时,将使用Input实例读取这些字节并将其解码为Java对象。

3.2 序列化对象

首先初始化测试用例的一些变量:

@Before
public void init() {
    kryo = new Kryo();
    output = new Output(new FileOutputStream("file.dat"));
    input = new Input(new FileInputStream("file.dat"));
}

使用Kryo写和读对象:

@Test
public void givenObject_whenSerializing_thenReadCorrectly() {
    Object someObject = "Some string";
 
    kryo.writeClassAndObject(output, someObject);
    output.close();
 
    Object theObject = kryo.readClassAndObject(input);
    input.close();
 
    assertEquals(theObject, "Some string");
}

请注意对close()方法的调用。这是必需的,因为OutputInput类分别继承自OutputStreamInputStream

序列化多个对象同样简单:

@Test
public void givenObjects_whenSerializing_thenReadCorrectly() {
    String someString = "Multiple Objects";
    Date someDate = new Date(915170400000L);
 
    kryo.writeObject(output, someString);
    kryo.writeObject(output, someDate);
    output.close();
 
    String readString = kryo.readObject(input, String.class);
    Date readDate = kryo.readObject(input, Date.class);
    input.close();
 
    assertEquals(readString, "Multiple Objects");
    assertEquals(readDate.getTime(), 915170400000L);
}

请注意,我们正在将适当的类传递给readObject()方法,这使我们的代码无需转换。

4.序列化器

在本节中,我们将显示哪些序列化器已经可用,然后我们将创建自己的序列化器。

4.1 默认序列化器

当Kryo序列化一个对象时,它将创建一个先前注册的Serializer类的实例,以将其转换为字节。这些称为默认序列化器,无需我们任何设置即可使用。

该库已经提供了几个处理列表,映射,枚举等的序列化器。如果找不到给定类的序列化器,则使用FieldSerializer,它可以处理几乎任何类型的对象。

让我们看看它的样子。首先,让我们创建一个Person类:

public class Person {
    private String name = "John Doe";
    private int age = 18;
    private Date birthDate = new Date(933191282821L);
 
    // standard constructors, getters, and setters
}

现在,让我们从此类中编写一个对象,然后将其读回:

@Test
public void givenPerson_whenSerializing_thenReadCorrectly() {
    Person person = new Person();
 
    kryo.writeObject(output, person);
    output.close();
 
    Person readPerson = kryo.readObject(input, Person.class);
    input.close();
 
    assertEquals(readPerson.getName(), "John Doe");
}

注意,由于FieldSerializer是为我们自动创建的,因此无需指定任何内容来序列化Person对象。

4.2 自定义序列化器

如果需要对序列化过程进行更多控制,则有两个选择;我们可以编写自己的Serializer类,并在Kryo中注册它,或者让该类自行处理序列化。

为了演示第一个选项,让我们创建一个扩展Serializer的类:

public class PersonSerializer extends Serializer<Person> {
 
    public void write(Kryo kryo, Output output, Person object) {
        output.writeString(object.getName());
        output.writeLong(object.getBirthDate().getTime());
    }
 
    public Person read(Kryo kryo, Input input, Class<Person> type) {
        Person person = new Person();
        person.setName(input.readString());
        long birthDate = input.readLong();
        person.setBirthDate(new Date(birthDate));
        person.setAge(calculateAge(birthDate));
        return person;
    }
 
    private int calculateAge(long birthDate) {
        // Some custom logic
        return 18;
    }
}

现在,让我们对其进行测试:

@Test
public void givenPerson_whenUsingCustomSerializer_thenReadCorrectly() {
    Person person = new Person();
    person.setAge(0);
     
    kryo.register(Person.class, new PersonSerializer());
    kryo.writeObject(output, person);
    output.close();
 
    Person readPerson = kryo.readObject(input, Person.class);
    input.close();
 
    assertEquals(readPerson.getName(), "John Doe");
    assertEquals(readPerson.getAge(), 18);
}

请注意,即使我们之前将年龄字段设置为0,年龄字段也等于18。

我们还可以使用@DefaultSerializer注释让Kryo知道我们每次需要处理Person对象时都想使用PersonSerializer。这有助于避免调用register()方法:

@DefaultSerializer(PersonSerializer.class)
public class Person implements KryoSerializable {
    // ...
}

对于第二个选项,让我们修改Person类以扩展KryoSerializable接口:

public class Person implements KryoSerializable {
    // ...
 
    public void write(Kryo kryo, Output output) {
        output.writeString(name);
        // ...
    }
 
    public void read(Kryo kryo, Input input) {
        name = input.readString();
        // ...
    }
}

由于此选项的测试用例等于前一个,因此此处不包括。但是,您可以在本文的源代码中找到它。

4.3 Java序列化器

在零星的情况下,Kryo将无法序列化一个类。如果发生这种情况,并且不能编写自定义序列化程序,则可以使用JavaSerializer使用标准的Java序列化机制。这就要求该类照常实现Serializable接口。

这是一个使用上述序列化器的示例:

public class ComplexObject implements Serializable {
    private String name = "Bael";
     
    // standard getters and setters
}
@Test
public void givenJavaSerializable_whenSerializing_thenReadCorrectly() {
    ComplexClass complexObject = new ComplexClass();
    kryo.register(ComplexClass.class, new JavaSerializer());
 
    kryo.writeObject(output, complexObject);
    output.close();
 
    ComplexClass readComplexObject = kryo.readObject(input, ComplexClass.class);
    input.close();
 
    assertEquals(readComplexObject.getName(), "Bael");
}

5.结论

以上探索了Kryo库最关键的功能。序列化了多个简单对象,并使用FieldSerializer类处理自定义对象。我们还创建了一个自定义序列化程序,并演示了如何在需要时退回到标准Java序列化机制。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。