kafka конспект однако
------------------------ part 1 ---------------------
schema evolution 1:
avro - в сообщении содержится схема данных.
предоставляет API для сериализации данных соответственно схеме и десериализации по схеме.
exactly once processing:
тут проблема в том что нет контроля над тем какой partition будет назначен. есть две стратегии:
range:
round robin:
rebalance listener:
случай когда consumer group coordinator запустил rebalance и consumer, обработавший часть сообщений хочет commit offset.
чтобы это сделать надо знать:
how to commit a particular offset?
how to know that a rebalance is triggered?
ConsumerRebalanceListener
onPartitionsRevoked - будет вызван перед переключением partition.
onPartitionsAssigned - будет вызвано после назначения partition и перед передачей сообщений.
offset management:
current offset:
kafka broker у себя хранит указатель на на отправленные сообщения клонкретному consumer. maintain offset from which send next banch of messages to consumer on poll().
commited offset:
kafka broker у себя хранит offset который consumer подтвердил что обработал. - нужно если partition rebalance - чтобы заного отправить consumer'у( может уже новому consumer) может уже отправлявшиеся ранее сообщения( current offset уже сдвинут).
auto commit:
enable.auto.commit = true - тогда consumer's offset будет периодически передаваться в фоне.
auto.commit.interval.ms - через какое время в фоне передавать consumer offset to kafka broker.
может привести к повторной отправке уже принятых сообщений, когда например partition rebalance - тогда на новый consumer будут отправлены сообщения с commited offset.
manual commit:
enable.auto.commit =
commit synch: blocking & retry on error
commit async: callback & not retry - почему not retry: напрмер, consumer один коммит отослал что обработал 75 сообщений, и тут rebalance и 5 секунд ожидается ответ тк это async и дальше получил следующие сообщения от брокера и обработал их и посылает уже уведомление брокеру что уже обработал предыдущие 75 + 25 итого 100 и это уведомление брокеру дошло.
тогда уведомление о 75 не должно быть повторно отправлено( retry) - потому что в следющем уведомлении предыдущие обработанные сообщения уже включены( 75 + новы 25 = commit 100).
если в while consumer.commitAsync() то consumer.commitSync() нужно вызвать в final перед consumer.close().
creating consumer:
consumer params:
group.id - не нужно нигде настраивать - просто указать имя - дальше все автоматом. если не указать группу то будет один обрабатывать всех topic partitions.
несколько групп и отдельных consumer могут одновременно читать сообщения из топика.
consumer может подписаться сразу на несколько топиков( через указание списка или через регулярку). consumer.subscribe(Arrays.asList(topicName)) - говорит брокеру что этот consumer хочет читать сообщения из этих топиков.
consumer.poll(100 - timeout - это если нет сообщений return из poll через 100 ms).
как бы правило что цикл while(true) {обработка сообщений из poll} должен выполняться меньше чем за 3 секунды, иначе можно:
heartbeat.interval.ms
session.timeout.ms
иначе group coordinator решит что consumer мертв и будет rebalance.
если нужно через часы обрабатывать накопленные сообщения то без while(true)
consumer groups:
в обсчем и целом: внутри 1 application могут быть consumer groups.
1 consumer блокирует 1 partition на чтение внутри совей группы приемников. если partition 5 штук то 6му consumer будет нечего читать.
нельзя больше consumers чем partitions.
если какие-то consumer вылетают и добавляются - то group coordinator( на каком-то из broker живет):
когда 1й consumer хочет присоединиться к consumer group то идет к group coordinator - и становится лидером consumer group. остальные становятся member.
group coordinator - manage a list of group members. - все присоединения и отсоединения к группе проходят через group coordinator - начинает( тут все consumer перестают читать сообщения из своих partition) rebalance between consumers and partitions посылая запрос consumer group leader - executes the rebalance activity( назначает приемникам партишины и передает отношение group coordinator), потом group coordinator говорит приемникам какие теперь у них партишины.
producer configs:
acks: типа на сколько реплик обязано скопироваться( leader считается за одного). это перед тем как отправить Producer RecordMetadata ответ дождаться скопированностей.
0: Producer ваще не ждет RecordMetadata от брокера. быстро. retries отключаются. reacordMetadata.offset always will be set to -1.
1: leader of Partition должны записать себе в лог сообщение и отослать RecordMetadata to Producer.
ежели leader отвалился то retry на другого выбранного leader.
ежели leader сразу после отправки recordMeatadata ляжет, при том что другие partition не скопировали сообщение - тогда сообщение потеряется.
all/-1: все partition скопирую и тогда только ok.
retries: сколько раз Producer будет повторять пересылать сообщение, после получения ошибки от broker.
retry.backoff.ms: время между попытками.
max.in.flight.requests.per.connection: когда async produce тогда сколько сообщений без полученного ответа может отослаться - потом блокируется это соединение.
если были ошибки и есть retries тогда на повторах посылок будет сбит первоначальный порядок отсылки сообщений.
поэтому ежели надо чтобы порядок всегда сохранялся тогда либо synch producer либо max.in.flight.requests.per.connection = 1 - то есть одно сообщение отправили и ждем ответа.
buffer.memory
compression.type
batch.size
linger.ms
client.id
max.request.size
custom partitioner:
hash - for the same keys it's will be always the same hash.
but hash can be the same for the different keys.
default partitioner find the partition number to send a message through using number of partitions => if num of partitions changed than keys will can go to another partitions.
callback & aknowledgment:
3 ways to send message:
* fire & forget: producer send message and don't care whether message received or not.
* synchronous send: producer send a message and wait until response.
on failure will be exception.
block on every message until it will consumed or exception.
it's can be adopted to situation when every message must be received.
Future res = producer.send(record);
res.get(); // blocking operation
@ya @todo вопрос: что если 1 receiver принял сообщение а есть еще другие которые тоже обязательно должны получить сообщение?
* asynchronous send: non-blocking;
provide callback(metadata, exception?=null) that calls anyway after receive acknowledgement. callback будет по любому вызван.
on exception data can be saved for later investigation.
max.in.flight.requests.per.connection - how many messages can be send to
the server without receiving responses.
@todo @ya как это настроить? от чего зависит - там потоки или что?
producer workflow:
ProducerRecord(topic, partition, timestamp, key, val):
partition:
kafka comes with default partitioner that define partition where send the message
on key parameter:
if producer send huge amount of messages with the same key( hash from the key) then all messages will be send to the same partition.
but key is optional - that's why if key is apsent than default partitioner will равномерно spare messages between partitions using round robin algo.
but it's can be set exact partition for message to be send. it's comes with the message kafka receive.
после выбора конкретного partition сообщения/ record добавляется в ОЗУ буфер и потом откправляется как batch:
парметры:
размер batch
max time to wait new messages before send batch.
после отправки batch with messages/records to kafka broker whether it's return RecordMetadata(offset, timestamp, etc) or recoverible error( producer can repeat batch( amount and period configurable)) or if it's non recoverible error doesn't retry send batch.
timestamp: kafka gives timestamp to every message.
it's time when the message reach a broker.
if provided in constructor - it's set to a time when a message( record) created.
как решается на какой брокер послать данные в kafka cluster?
как решается на како partition послать данные в топике?
пул приемников данных в очередь из разных источников- это серверы kafka( brokers).
пул потребителей( consumer group) сервера потребители данных из очереди. не более 2х consumer на 1 broker - это чтобы не было дублирования чтения.
broker: это серверы kafka
topic раскидывается на разных брокеров, через деление 1 topic на Y x partition
каждый partition может быть реплицирован в реальнмо времени на несколько brokers( ).
partition это горизонтальное масштабирование большого объема данных пишущихся в topic, когда данные не помещаются на 1 комп( broker); это пронумерованная FIFO
replication factor - на сколько серверов копировать каждый partition
обычно 1 брокер на 1 сервер
каждый partition узнает о других через zookeeper
есть главная partition с которой берут данные бэкапы
leader partititon общается с consumer и producer
bootstrap.servers:
list of kafka brokers
***********************
***********************
***********************
***********************
----------------------- part 2 ---------------------------------------
zookeeper - БД распределенная заточенная под быстрое чтение. юзается kafkoy как конфиг, если нода отвалилась то мол какой правильный стейт сейчас. для консенсуса - либо 1 либо 3 инстанса zookeeper серверов. split brain - когда четное колво zookeperov то например 2 в одной сети и 2 в другой сети и оба думают что они главные. поэтому нечетное количество zookeperov рекомендуется.
broker - лазят к zookeeper за конфигом где что кто. брокеров несколько чтобы данные в них на диске дублировались на другие серверы.
много топиков на 1 брокере падает скорость.
кафка кластер это zookeeper + brokers
producer - можно передать не всех brokerov. пишет в топик у брокеров.
consumer - pollit данные из брокеров а не кафка отправляет сообщения - это для скорости так сделано - что кафка не отправляет сама а у нее запрашивают. consumerov может быть несколько в одном приложении если сделан пул потоков. может быть несколько приложений которые получают сообщения из одного топика - consumeri можно объединить в группы для чтения из топика.
топик это файл(.log) на диске куда дописываются байты сообщений. еще есть файл где хранятся оффсеты(.index) с какого байта начинается какое сообщение. и еще один файл с timestamp сообщений(.timestamp). producer может установить timestamp сообщения либо кафка сама установит timestamp. поэтому если корректные timestamp то можно запрашивать сообщения с timestampa.
апи для удаления записей нет - то есть кафка подходит для продьюсеров fire and forget потому что удалить сообщение нельзя. поэтому у топика при создании можно указать retention time и retention size. consumer не узнает если закончилось место на диске в брокере если такое случится.
с помощью kafki можно сделать паттерн event sourcing - это значит есть последовательность событий которую можно перечитать для восстановления конечного состояния. но по времени кафка ограничивает хранение лога. время хранения выбирается из того как быстро consumer прочитает сообщения, с учетом времени на перезапуск после падения consumera.
kafka гарантирует порядок сообщений в порядке получения.
кафка хранит оффсеты( какое последнее прочитал сообщение) для consumerov в топике __consumer_offset. consumer может сказать kafke что переключи мой оффсет на определенный оффсет. но нельзя producerom сказать kafke дай мне сообщение номер х потому что в kafkу нет random access( потому что тогда бы было непоследовательное чтение с диска и было бы медленно) то есть можно сказать что поставь мой оффсет на 10 сообщений назад и я их перечитаю.
у consumer есть auto.offset.reset = earliest/latest - это куда поставить оффсет для этого consumera когда долго не подключался либо когда первый раз consumer подключается. то есть оффсеты могут протухать со временем если consumer редко подключается. если сменить consumer id то сработает auto.offset.reset настройка - но это делать не надо.
timestamp сообщения не влияет на порядок в логе. если послать сообщение с будущим timestamp то оно будет в логе пока не наступит это будущее и потом не сработает retention time. и все сообщения с нормельным timestamp записанные в лог после сообщения с timestampom из будущего будут храниться в логе до наступления этого будущего потому что кафка не random access чистит по retention time а последовательно чистит лог. поэтому лучше timestamp по бизнес логике записывать в само тело сообщения а не юзать timestamp сообщения.
kafka при удалении не удаляет в файле лога а удаляет чанк - чанк это часть лога. лог пишется в виде чанков в связки лог файл + index файл + timestamp файл.
__consumer_offset для группы consumerov только 1.
кафка в лог пишет байты поэтому хорошо бы сказать каким конвертером( string это сериалайзер) из типа например avro или prtobuf перевести в байты.
в json сериалайзере можно юзать json схемы если объект в jave поменялся а json еще со старыми полями.
partitioning - это чтобы в группе consumerov каждый consumer не получал сообщение другого consumera в этой группе. partitioning - это разбить topic на скажем 3 равносильные части которые сами по себе реализуются через лог файл + index файл + timestamp файл. более того для ускорения разные брокеры могут хранить разные партишны.
у producer есть 3 стратегии в какую партицию писать:
1) round robin (key = null)- то есть по кругу писать по сообщению в каждую партицию.
key нужен producer чтобы понять в какую партицию слать msg.
если key null то юзается round robin. - можно настроить чтобы срабатывал конкретный partition.
2) producer руками ставим партицию
3) when key != null => partition by hash(murmur2 функция ): вычисляется hash, потом делится на кол-во партиций и выячисляется в какую партицию послать.
Если консюмеров меньше в группе то какие-то консюмеры будут читать из нескольких партиций. Если консюмеров в группе больше партиций то лишние буду idle то есть простаивать. поэтому через партиции контролируется параллелизм консюмеров в группе. количество партишнов в топике это максимальное кол-во консюмеров в группе которое можно создать. можно добавлять партиции к топику, убирать нельзя. если консюмеров в группе меньше партиций в топике то кафка не гарантирует порядок при чтении консюмером читающим из нескольких партиций. можно как-то настроить чтобы кафка чанки читала по очереди из разных партиция для одного консюмера. тут можно с помощью одного и того же ключа делать так чтобы сообщение попадало в одну и ту же партицию - для случая чтобы принудительно сделать четкую последовательность для потребителя.
например если в один и тот же топик пуляются после обработки разными сервисами например заказы в интернет-магазине то можно сделать один и тот же key для всех этих сообщений и тогда будет гарантирована последовательность. ключ null лучше не ставить хотя будет последовательно - чтобы потом можно было отследить изменения формуляра.( можно кстати в redis пулять данные чтобы понять последнее ли сообщение по этому ключу сейчас получено в консюмере!( например key: {count, time(чтобы по времени сравнивать)})). если key один и тот же то если несколько консюмеров читают один формуляр то не будет race condition в бд потому что один и тот же консюмер будет работать с конкретным формуляром по ключу.
для отказоустойчивости можно по топику на брокера. и еще можно партиции топика разносить по брокерам - partition assignment при старте кафки. кафка назначит брокера лидером для партиции. можно указывать чтобы какой-то брокер не был никому лидером.
продюсер пойдет к какому-то брокеру узнать кто лидер - знания о лидерах хранятся в zookeeper.
запись может потеряться в момент когда продюсер спросил брокера кто лидер - лидер взял из своего кеша( можно настроить период обновления из zookeepera) знание о лидере и начал пулять мессаджи в лидера а лидер в это время упал. отсылка хоть и через sendAsync все равно сама отсылка она блокирующая - пока там все разрулится кто лидер( для скорости это кешируется).
при создании топика ставится колво партиций и колво репликаций. нет смысла делать реплик больше чем брокеров тк будут брокеры дубликаты хранить.
реплики бывают "пассивные" - которые сами запрашивают обновления. и "активные" insyncReplic то есть лидер сам шлет репликам данные. min.insync.replicas( 1 это только в лидера, 2 это лидер и еще один брокер и тд) это параметр минимального значения insync реплик для топика или кластера. кафка сама назначит кто будет insync и кто будет "пассивной" репликой.
так же есть у publishera режим записи - 0(fire and forget)|1( by default: leader aknowledge)|-1( это aknowledge что все реплики обновились( только min.insync.replicas без "пассивных")). тут если -1 и например min.insync.replicas = 3 и записались 2 и 3й брокер упал то пока не произойдет восстановления брокеров паблишер будет заблочен и получит ошибку. поэтому ставят на 1 меньше колва реплик потому что если упала активная то пассивная станет активной и будет дальше норм работать.
у продюсера есть настройки размера буфера и таймаут по которому отправляется незаполненный буфер.
у консюмера - как получать - по одному или пачкой.
как aknowledge сообщений сразу или не сразу и тд.
KafkaTampleat<String( by default: что получаем ), String( by default: что отправляем )>
header это кастомный мета payload например ts, type чтобы правильно распарсить сообщение например.
spring.kafka.bootstrap-servers: список брокеров
++++++++++++++++++++++ 2 ++++++++++++++
kafka rest это rest api к кафке
kafka-manager интерфейс к kafke . consumer - total lag - так можно следить насколько консюмер отстает от продюсера.
zookeeperov тоже для отказоустойчивости можно несколько
если типа абстрагироваться от кафки чтобы типа потом можно было заменить то отправлять через org.springframework.messaging.Message
gatling kafka performance testing
kafka это CQRS то есть типа получать ивенты и если шо можно прочитать с начала очереди сообщения. например чтобы восстановить стейт.
Комментарии
Отправить комментарий