Pulsar 2.5.0 之 Schema 进阶

Pulsar 2.5.0 之 Schema 进阶

官网原文标题《Pulsar Schema》
翻译时间:2020-02-15
官网原文地址:http://pulsar.apache.org/docs/en/schema-understand/ 
译者:了解schema 类型与应用场景、 schema版本管理、schema工作流程。

schema 基本概念

  1. SchemaInfo

    ​ SchemaInfo 是定义schema一种数据结构.

    ​ SchemaInfo是基于每个主题存储和执行的,不能存储在命名空间或租户级别.

    ​ SchemaInfo包含以下字段:

字段 说明
name Schema 名称(发现和topic名称相同)
type Schema 类型,类型常用的JSON格式,其他详见官方文档
schema Schema 数据是一个由 8 位无符号字节和模式类型特定组成的序列,这一属性可能是内部使用
schemaDefinition 具体业务数据结构
properties 存放用户自定义属性

示例:schemaInfo是一个JSON字符串

{
    "name": "test-string-schema",
    "type": "STRING",
    "schema": "",
    "properties": {}
}

支持多种模式类型,主要分为两类

  • 原始类型
  • 复杂类型

Pulsar 支持的原始类型如下

Primitive Type Description
BOOLEAN A binary value
INT8 A 8-bit signed integer
INT16 A 16-bit signed integer
INT32 A 32-bit signed integer
INT64 A 64-bit signed integer
FLOAT A single precision (32-bit) IEEE 754 floating-point number
DOUBLE A double-precision (64-bit) IEEE 754 floating-point number
BYTES A sequence of 8-bit unsigned bytes
STRING A Unicode character sequence
TIMESTAMP (DATE, TIME) A logic type represents a specific instant in time with millisecond precision. It stores the number of milliseconds since January 1, 1970, 00:00:00 GMT as an INT64 value

对于原始类型,Pulsar不在SchemaInfo中存储任何模式数据。SchemaInfo中的类型用于确定如何序列化和反序列化数据。

一些基本模式实现可以使用属性来存储特定于实现的可调设置。例如,字符串架构可以使用属性存储编码字符集以序列化和反序列化字符串。

Pulsar schema types 和语言特定原始类型之间的转换如下

Schema Type Java Type Python Type Go Type
BOOLEAN boolean bool bool
INT8 byte int8
INT16 short int16
INT32 int int32
INT64 long int64
FLOAT float float float32
DOUBLE double float float64
BYTES byte[], ByteBuffer, ByteBuf bytes []byte
STRING string str string
TIMESTAMP java.sql.Timestamp
TIME java.sql.Time
DATE java.util.Date

此示例演示如何使用string schema。

使用string schema创建生产者并发送消息

Producer producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();

使用string schema 创建消费者并接收消息

Consumer consumer = client.newConsumer(Schema.STRING).create();
consumer.receive();
复杂类型

目前支持的复杂类型如下

Complex Type Description
keyvalue 表示键/值对的复杂类型
struct 表示 AVRO, JSON, and Protobuf.类型
key-value (键值对)

对于 SchemaInfokeyvalue schema ,Pulsar将键模式的SchemaInfo和值模式的SchemaInfo存储在一起。

Pulsar提供了两种方法来编码消息中的键/值对

  • INLINE

键/值对将在消息有效负载中一起编码

  • SEPARATED

    键/将在消息键中编码,值将在消息有效负载中编码

用户可以在构造keyvalue schema时选择编码类型

示例

这个例子展示了如何构造一个键/值模式,然后使用它来生成和使用消息。

使用INLINE编码类型来构造 key/value schema

Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);

使用SEPARATED编码类型来构造 key/value schema

Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);

使用 key/value schema 发送消息

Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);

Producer<KeyValue<Integer, String>> producer 
= client.newProducer(kvSchema)
    .topic(TOPIC)
    .create();

final int key = 100;
final String value = "value-100";

// send the key/value message
producer.newMessage()
.value(new KeyValue<>(key, value))
.send();

使用 key/value schema 接收消息

Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);

Consumer<KeyValue<Integer, String>> consumer 
= client.newConsumer(kvSchema)
    ...
    .topic(TOPIC)
    .subscriptionName(SubscriptionName).subscribe();

// receive key/value pair
Message<KeyValue<Integer, String>> msg = consumer.receive();
KeyValue<Integer, String> kv = msg.getValue();
struct

Pulsar使用Avro规范来定义的struct模式

  • 使用相同的工具管理 schema 定义
  • 使用不同的序列化/反序列化方法处理数据

struct schema 可以使用两种方法:

  • static
  • generic

static

预定义结构模式,它可以是Java中的POJO、Go中的struct或Avro或Protobuf工具生成的类。

示例

创建一个User类来定义发送到topic的消息

public class User {
    String name;
    int age;
}

使用 struct schema 创建一个生产者来发送消息

Producer producer 
= client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(
User.builder().userName("pulsar-user").userId(1L)
.build()).send();

使用 struct schema 创建一个消费者来接收消息

Consumer consumer 
= client.newConsumer(Schema.AVRO(User.class)).create();
User user = consumer.receive();

generic

某些应用你无法预先定义数据结构,你可以用如下方法来定义 schema 和访问数据。可以使用GenericSchemaBuilder定义 struct schema ,使用GenericRecordBuilder生成泛型结构,并将消息使用到GenericRecord中。

使用 RecordSchemaBuilder 构建一个 schema

RecordSchemaBuilder recordSchemaBuilder 
= SchemaBuilder.record("schemaName");
recordSchemaBuilder.field("intField").type(SchemaType.INT32);
SchemaInfo schemaInfo 
= recordSchemaBuilder.build(SchemaType.AVRO);

Producer producer 
=client.newProducer(Schema.generic(schemaInfo))
.create();

使用 RecordBuilder 构建数据记录

producer.newMessage().value(schema.newRecordBuilder()
            .set("intField", 32)
            .build()).send();
AUTO schema

如果您事先不知道Pulsar主题的模式类型,可以使用AUTO schema来生成

Auto Schema Type Description
AUTO_PRODUCE This is useful for transferring data from a producer to a Pulsar topic that has a schema.
AUTO_CONSUME This is useful for transferring data from a Pulsar topic that has a schema to a consumer.

AUTO_PRODUCE

AUTO_PRODUCE schema 帮助生产者验证生产者发送的字节是否与主题的 schema 兼容

需求描述

  • 你需要处理一个来自 Kafka topic k 消息
  • 你有一个 Pulsar topic P, 但是你不清楚他的 schema type.
  • 你的应用需要从kafka topic K 读取消息,然后 写入到Pulsar topic P

在这种情况下,可以使用AUTO_PRODUCE验证K生成的字节是否可以发送到P

Produce pulsarProducer
 = client.newProducer(Schema.AUTO_PRODUCE())
    …
    .create();

byte[] kafkaMessageBytes = … ; 

pulsarProducer.produce(kafkaMessageBytes);
…

AUTO_CONSUME

AUTO_CONSUME schema 帮助Pulsar主题验证Pulsar主题发送的字节是否与使用者兼容,即Pulsar主题使用从代理端检索的SchemaInfo将消息反序列化为语言特定的对象。AUTO_CONSUME 仅支持AVRO和JSON模式。它将消息反序列化为GenericRecord.

需求描述

  • 你有一个 Pulsar topic P.
  • 你的消费端 (例如 MySQL) 需要从 topic P 读取消息
  • 你的应用读取来自 P 的消息,然后将读取的消息写入到 MySQL.

在这种情况下,可以使用AUTO-CONSUME验证P生成的字节是否可以发送到MySQL

Consumer pulsarConsumer 
= client.newConsumer(Schema.AUTO_CONSUME())
    …
    .subscribe();

Message msg = consumer.receive() ; 
GenericRecord record = msg.getValue();
…

schema version

chemaInfo存储在主题上都有一个版本。schema 版本管理主题中发生的schema更改。

使用给定SchemaInfo生成的消息被标记为schema版本,因此当Pulsar客户端使用消息时,Pulsar客户端可以使用schema版本检索相应的SchemaInfo,然后使用SchemaInfo反序列化数据。

schema的版本是连续的。schema存储发生在处理关联主题的代理中,以便可以进行版本分配。

一旦将一个版本分配/获取到/用于一个schema,该生产者生成的所有后续消息都将被标记为相应的版本。

我们通过一个例子,来讲解schema版本如何工作。 假设使用以下代码创建Pulsar Java 客户端,尝试连接到Pulsar,并开始发送消息:

PulsarClient client 
= PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Producer producer 
= client.newProducer(JSONSchema.of(SensorReading.class))
        .topic("sensor-data")
        .sendTimeout(3, TimeUnit.SECONDS)
        .create();

下表列出了尝试连接时可能出现的场景, 以及每种场景下发生了什么:

发生了什么
该Topic不存在schema 使用给定的schema创建了producer。 Schema传输到broker并被存储 (因为没有现成的schema与SensorReading schema "兼容")。 任何使用同样schema/topic的consumer可以消费sensor-datatopic中的消息。
Schema已经存在;producer使用已经被保存过的schema进行连接 Schema被传输到Pulsar broker。 Broker确认此schema是兼容的。 Broker尝试在BookKeeper存储schema,但是发现它已经存在了,所以用它来标记生产的消息。
Schema已经存在;producer使用兼容的新schema进行连接。 Producer传输schema到broker。broker发现这个schema是兼容的,随后保存这个新schema作为当前版本(使用新的版本号)。

schema如何工作

Pulsar schema在主题级别应用和实施(schema不能在命名空间或租户级别应用)。

生产者和消费者将schema上传到代理,因此Pulsar schema在生产者和消费者方面都起作用。
生产者


schema-producer.png
  1. 应用程序使用 schema 实例来构造生产者实例。

    schema实例定义了使用生产者实例生成的数据的schema。

    以AVRO为例,Pulsar从POJO类中提取schema定义并构造SchemaInfo,生产者在连接时需要将其传递给broker。

  2. 生产者使用从传入的schema实例中提取的SchemaInfo连接到broker。

  3. broker在schema存储中查找schema,以检查它是否已经是注册的schema。

  4. 如果是,broker将跳过schema验证,因为它是已知的schema,并将schema版本返回给生产者。

  5. 如果否,则broker验证是否可以在此命名空间中自动创建schema:

    • 如果isAllowAutoUpdateSchema设置为true,则可以创建schema,并且broker根据为主题定义的schema兼容性检查策略验证schema。

    • 如果isAllowAutoUpdateSchema设置为false,则无法创建schema,并且将拒绝生产者连接到broker。

消费者


schema-consumer.png
  1. 应用程序使用 schema 实例来构造消费者实例。

    schema 实例定义使用者用于解码从代理接收的消息的 schema 。

  2. 消费者使用从传入的schema实例中提取的SchemaInfo连接到broker。

  3. broker确定主题是否有其中一个(模式/数据/本地消费者和本地生产者)。

  4. 如果一个主题没有全部(模式/数据/本地使用者和本地生产者):

    • 如果isAllowAutoUpdateSchema设置为true,则使用者注册一个架构并将其连接到broker。
    • 如果isAllowAutoUpdateSchema设置为false,则拒绝使用者连接到broker。
  5. 如果主题有其中一个(模式/数据/本地使用者和本地生产者),则执行模式兼容性检查。

    • 如果schema通过兼容性检查,则使用者连接到broker。
    • 如果schema未通过兼容性检查,则拒绝使用者连接到broker。
  6. 消费者从代理接收消息。

如果消费者使用的schema支持schema版本控制(例如,AVRO架构),则消费者获取消息中标记的版本的SchemaInfo,并使用传入schema和消息中标记的schema来解码消息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容