如何使用python 连接kafka 并获取数据

Python013

如何使用python 连接kafka 并获取数据,第1张

连接

kafka

的库有两种类型,一种是直接连接

kafka

的,存储

offset

的事情要自己在客户端完成。还有一种是先连接

zookeeper

然后再通过

zookeeper

获取

kafka

brokers

信息

offset

存放在

zookeeper

上面,由

zookeeper

来协调。

我现在使用

samsa

这个

highlevel

Producer示例

from

kazoo.client

import

KazooClientfrom

samsa.cluster

import

Clusterzookeeper

=

KazooClient()zookeeper.start()cluster

=

Cluster(zookeeper)topic

=

cluster.topics['topicname']topic.publish('msg')

**

Consumer示例

**

from

kazoo.client

import

KazooClientfrom

samsa.cluster

import

Clusterzookeeper

=

KazooClient()zookeeper.start()cluster

=

Cluster(zookeeper)topic

=

cluster.topics['topicname']consumer

=

topic.subscribe('groupname')for

msg

in

consumer:

print

msg

Tip

consumer

必需在

producer

kafka

topic

里面提交数据后才能连接,否则会出错。

Kafka

中一个

consumer

需要指定

groupname

groue

中保存着

offset

信息,新开启一个

group

会从

offset

0

的位置重新开始获取日志。

kafka

的配置参数中有个

partition

,默认是

1

,这个会对数据进行分区,如果多个

consumer

想连接同个

group

就必需要增加

partition

,

partition

只能大于

consumer

的数量,否则多出来的

consumer

将无法获取到数据。

# -*- coding: utf8 -*-

# __author__ = '小红帽'

# Date: 2020-05-11

"""Naval Fate.

Usage:

        py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..>--groupId=<groupId>--topic=<topic_name>--es-servers=<host:port>--index=<schema>--type=<doc>--id=<order_id>

        py_kafka_protobuf_consume.py -h | --help

        py_kafka_protobuf_consume.py --version

Options:

        -h --help                                      打印帮助信息.

        --bootstrap_servers=<host:port,host2:port2..>  kafka servers

        --groupId=<groupId>                            kafka消费组

        --topic=<topic_name>                            topic名称

        --es-servers=<host:port>                        ES 地址

        --index=<index_name>                            ES 索引

        --type=<doc>ES type

        --id=<order_id>指定id主键,快速更新

"""

import json

from kafka import KafkaConsumer

from docopt import docopt

from elasticsearch import Elasticsearch

from elasticsearch import helpers

class Kafka_consumer():

    def __init__(self,args):

        self.topic = args['--topic']

        self.bootstrapServers = args['--bootstrap-servers']

        self.groupId = args['--groupId']

        self.id = args['--id']

        self.es_host = args['--es-servers'].split(':')[0]

        self.es_port = args['--es-servers'].split(':')[1]

        self.es_index = args['--index']

        self.es_type = args['--type']

        self.consumer = KafkaConsumer(

            bootstrap_servers=self.bootstrapServers,

            group_id=self.groupId,

            enable_auto_commit = True,

            auto_commit_interval_ms=5000,

            consumer_timeout_ms=5000

        )

    def consume_data_es(self):

        while True:

            try:

                es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)

                self.consumer.subscribe([self.topic])

                actions=[]

                for message in self.consumer:

                    if message is not None:

                        query = json.loads(message.value)['data'][0]

                        action = {

                            "_index": self.es_index,

                            "_type": self.es_type,

                            "_id": json.loads(message.value)['data'][0][self.id],

                            "_source": query

                        }

                        actions.append(action)

                        if len(actions) >50:

                            helpers.bulk(client=es, actions=actions)

                            print("插入es %s 条数据" % len(actions))

                            actions = []

                if len(actions) >0:

                    helpers.bulk(client=es, actions=actions)

                    print("等待超时时间,插入es %s 条数据" % len(actions))

                    actions=[]

            except BaseException as e:

                print(e)

if __name__ == '__main__':

    arguments = docopt(__doc__,version='sbin 1.0')

    consumer = Kafka_consumer(arguments)

    consumer.consume_data_es()

#从kafka消费

#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)

#从ZOOKEEPER消费

consumer_area = topic_area.get_balanced_consumer(

consumer_group=b'zs_download_04', # 自己命令

auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情况下,设置此变量,表示从最新的开始取

#auto_offset_reset=OffsetType.EARLIEST,

#reset_offset_on_start=True,

auto_commit_enable=True,

#auto_commit_interval_ms=1,

zookeeper_connect=ZK_LIST

)