Django:使用django-logpipe进行Kafka整合

django-logpipe库

——该库充当用于在Django应用程序和服务之间移动数据的通用管道。它建立在Boto3Apache Kafkakafka-pythonDjango REST Framework之上。
安装:pip install django-logpipe

将logpipe添加到已安装的应用中:
INSTALLED_APPS = [ 
    ... 
    'logpipe',
    ... 
]

将连接设置添加到settings.py文件。配置Kafka是这样的:

LOGPIPE = { 
    #必需设置
    'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
    'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer',
    'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer',
    'KAFKA_BOOTSTRAP_SERVERS ':[ 
        'kafka:9092' 
    ],
    'KAFKA_CONSUMER_KWARGS':{ 
        'group_id':'django-logpipe',
    },

    #可选设置
    #'KAFKA_SEND_TIMEOUT':10,
    #'KAFKA_MAX_SEND_RETRIES':0,
    #'MIN_MESSAGE_LAG_MS':0 ,
    #'DEFAULT_FORMAT':'json',
}

运行迁移python manage.py migrate logpipe
。这将创建用于存储Kafka日志位置偏移的模型:

用法

串行器

使用logpipe发送或接收消息的第一步是定义序列化程序。logpipe的序列化程序有一些规则:

  1. 必须是rest_framework.serializers.Serializer的子类或实现模仿rest_framework.serializers.Serializer的接口的类。
  2. 必须在类上定义MESSAGE_TYPE属性。该值应该是一个字符串,它定义唯一定义其主题/流中的数据类型。
  3. 必须在类上定义VERSION属性。该值应为表示模式版本号的单调整数。
  4. 必须有KEY_FIELD在类上定义的属性,表示要用作消息键的字段的名称。消息密钥由Kafka在执行日志压缩时使用,并由Kinesis用作分片分区键。对于不需要密钥的主题,可以省略该属性。
  5. 如果序列化程序将用于传入消息,则应实现类方法lookup_instance(cls,** kwargs)。在实例化序列化程序之前,将直接使用消息数据作为关键字参数调用此类方法。它应该查找并返回相关对象(如果存在),以便在初始化期间将其传递给序列化程序的实例参数。如果还没有对象存在(消息表示新对象),则应返回None。
    下面是一个示例Django模型及其序列化程序。
from django.db import models
from rest_framework import serializers
import uuid

class Person(models.Model):
    uuid = models.UUIDField(default=uuid.uuid4, unique=True)
    first_name = models.CharField(max_length=200)
    last_name = models.CharField(max_length=200)

class PersonSerializer(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'

    class Meta:
        model = Person
        fields = ['uuid', 'first_name', 'last_name']

    @classmethod
    def lookup_instance(cls, uuid, **kwargs):
        try:
            return Person.objects.get(uuid=uuid)
        except models.Person.DoesNotExist:
            pass
发送消息

一旦存在序列化程序,就可以通过创建Producer对象并调用send方法向Kafka发送消息。

from logpipe import Producer
joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
producer = Producer('people', PersonSerializer)
producer.send(joe)

上面的示例代码将以下消息发送到名为people的Kafka topic 。

json:{
    "type":"person",
    "version":1,"message":{
        "first_name":"Joe",
        "last_name":"Schmoe",
        "uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
    }
}
接收消息

为了处理传入的消息,可以重用相同的模型和序列化器。我们只需要实例化一个Consumer对象。

# Watch for messages, but timeout after 1000ms of no messages
consumer = Consumer('people', consumer_timeout_ms=1000)
consumer.register(PersonSerializer)
consumer.run()

# Watch for messages and block forever
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.run()

Consumer对象使用Django REST Framework的内置保存,创建和更新方法来应用消息。如果消息没有直接绑定到Django模型,可以跳过定义的lookup_instance类方法并覆盖save方法来自定义逻辑。
如果在单个topic或stream中有多个数据类型,则可以通过向Consumer注册多个序列化程序来使用它们。

consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

还可以通过为每种消息类型版本定义序列化程序并将其全部注册到Consumer来支持多种不兼容的消息类型。

consumer = Consumer('people')
consumer.register(PersonSerializerVersion1)
consumer.register(PersonSerializerVersion2)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

如果有多个stream或topic,需要为每一个都创建一个Consumer,并使用MultiConsumer查看这些stream或topic。

from logpipe import MultiConsumer
people_consumer = Consumer('people')
people_consumer.register(PersonSerializer)
places_consumer = Consumer('places')
places_consumer.register(PlaceSerializer)
multi = MultiConsumer(people_consumer, places_consumer)

# Watch for 'people' and 'places' topics indefinitely
multi.run()

最后,可以通过build_kafka_consumer管理命令中的构建来自动注册和运行Consumer。

# myapp/apps.py
from django.apps import AppConfig
from logpipe import Consumer, register_consumer

class MyAppConfig(AppConfig):
    name = 'myapp'

# Register consumers with logpipe
@register_consumer
def build_person_consumer():
    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    return consumer

使用register_consumer装饰器可以根据需要注册尽可能多的Consumer和topic。然后运行run_kafka_consumer命令以循环方式自动处理所有Consumer的消息。python manage.py run_kafka_consumer

处理架构更改

使用每个序列化程序类所需的VERSION属性处理架构更改。发送时,生产者在消息数据中包含模式版本号。然后,当消费者收到消息时,它会查找具有匹配版本号的寄存器序列化器。如果未找到具有匹配版本号的序列化程序,则会引发logpipe.exceptions.UnknownMessageVersionError异常。

要执行向后不兼容的架构更改,应执行以下步骤。

更新使用者代码以了解新架构版本。将生产者代码更新为发送新架构版本。经过一段时间后(当确定Kafka中仍然不存在旧版本消息时),请删除与旧架构版本相关的代码。
例如,如果我们想要在上面定义的Person模型上需要一个电子邮件字段,那么第一步是更新消费者以了解新字段:

class Person(models.Model):
    uuid = models.UUIDField(default = uuid.uuid4,unique = True)
    first_name = models.CharField(max_length = 200)
    last_name = models.CharField(max_length = 200)
    email = models.EmailField( max_length = 200,null = True)


class PersonSerializerV1(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'class 
    class Meta:
        model = Person 
        fields = ['uuid','first_name','last_name'] 


class PersonSerializerV2(PersonSerializerV1):
    MESSAGE_TYPE ='person'
    VERSION = 2 
    class Meta(PersonSerializerV1.META):
        fields = ['uuid','first_name','last_name','email'] 

consumer = Consumer('people',consumer_timeout_ms = 1000)
consumer.register(PersonSerializerV1)
consumer.register(PersonSerializerV2)

消费者现在将使用适当的序列化程序来显示消息版本。其次,我们需要将生产者代码更新为使用模式版本2:

producer = Producer('people', PersonSerializerV2)

最后,在删除所有旧版本1消息(通过日志压缩)之后,可以从代码库中删除PersonSerializerV1类。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352