HomeLinuxApache Kafka Shopper with Protobuf

Apache Kafka Shopper with Protobuf


Apache Kafka has been the main distributed occasion streaming platform for a very long time.

Generally referred to as Kafka, Apache Kafka is a free and open-source distributed occasion streaming platform which is developed to deal with large-scale and real-time knowledge streams. An instance of such can be inventory costs, trade charges, and so on. Such knowledge requires a excessive stage and close to real-time replace which permits the patron functions to learn the info and replace as quick as potential. That is the place the instruments equivalent to Apache Kafka involves life.

Kakfa was initially developed by the LinkedIn crew and later open-sourced in 2011. One of many main issues that’s solved by Kafka is the power to deal with extensively giant volumes of information in actual time. It does this by leveraging a distributed structure and a set of APIs that make it straightforward to scale out the platform horizontally.

For Kafka to switch the info throughout the Kafka cluster, the info must be encoded and serialized into a selected construction. This permits the patron and producer functions to agree on a selected format and construction.

Apache Kafka helps varied serialization codecs equivalent to JSON, Avro, and Protobuf. JSON is the most well-liked and simplistic because it permits the info to be organized in a easy, human-readable format. This makes it a wonderful software for testing and improvement functions.

On this tutorial, we’ll discover ways to arrange a easy Apache Kafka shopper utilizing the protobuf serialization format. Earlier than getting began, it’s good to know that this tutorial requires familiarity with Apache Kafka shopper/producer performance, the fundamentals of working with Protocol Buffers, and Java improvement information.

What Is Information Serialization in Kafka?

Allow us to begin on the very starting.

Information serialization in Kafka refers to changing structured knowledge right into a format that may transmit as a single unit, also referred to as a Kafka message, throughout a Kafka cluster.

Apache Kafka makes use of a binary protocol to facilitate the patron and producer functions communication. Therefore, the messages between the functions have to be serialized right into a format that may be despatched over the community and understood by all endpoints within the cluster.

Because the knowledge in a Kafka cluster is in a uniform format, the shoppers and producers can simply talk and cross the info with out the constraints of the programming language or the know-how stack behind it.

This allows the heterogeneous programs to speak seamlessly with out complicated integration work.

What Is Protocol Buffers?

Protocol Buffers, generally referred to as protobufs, is a language-neutral, platform-independent, extensible format to serialize the structured knowledge. Once more, you’ll be able to consider XML however constructed to be extra extensible, smaller, sooner, and less complicated.

Protocol Buffers permit us the way to outline the way in which our knowledge is structured. It then present a specifically generated supply code which permits us to learn and write the structured knowledge from varied knowledge streams and programming languages.

Putting in the Protocol Buffer Compiler

Step one is putting in the Protocol Buffer compiler in your goal machine. Subsequent, you’ll be able to verify the official documentation to discover ways to arrange your machine’s Protocol Buffer compiler.

This tutorial demonstrates the way to set up the Protobuf compiler on Ubuntu.

Begin by updating the system repositories:

Subsequent, run the apt command to put in the Protobuf compiler as proven within the following:

$ sudo apt set up protobuf-compiler -y

Subsequent, verify the put in Protocol Buffer compiler with the next command:

The command ought to print the put in Protocol Buffer compiler model as:

As soon as put in, we will proceed and develop our Kafka producer and shopper software.

Settting Up the Kafka Schema Registry

It’s essential to have the Kafka schema registry put in and configured in your Kafka cluster. For simplicity, we is not going to talk about the way to accomplish that on this tutorial. As a substitute, you’ll be able to verify the next hyperlink to find additional:

https://docs.confluent.io/platform/present/set up/installing_cp/deb-ubuntu.html#c3-short

Defining the Protobuf Message

On this step, we have to outline our Protocol Buffer message schema. We will do that by making a file which ends in “.proto”.

Begin by organising the undertaking directories:

Navigate into the goal listing and create a brand new file.

$ cd ~/kafka_proto && contact schema.proto

Subsequent, edit the “schema.proto” file and add the message definition as proven within the following:

syntax = “proto3”;

bundle com.instance.kafka;

message Product {
string identify = 1;
string description = 2;
float value = 3;
repeated string classes = 4;
bool in_stock = 5;
string producer = 6;
string sku = 7;
}

Allow us to break down every subject and what every part entails. For instance, within the earlier schema, we’ve the next areas:

  1. Identify – This subject defines the identify of the product of sort string.
  2. Description – Equally, the outline subject permits us to retailer the product’s description. This subject holds the string knowledge varieties.
  3. Value – The worth subject holds the product’s value as a floating level worth.
  4. Classes – This defines the class below which the set product belongs. Because the product can belong to a number of classes, we describe the info sort as an array of strings.
  5. In_stock – This subject shops the supply state of the product as a Boolean knowledge sort.
  6. Producer – The identify of the producer as a string worth.
  7. Sku – Lastly, the sku subject shops the stock-keeping unit variety of the product as a string.

Compiling the Protocol Buffer Schema

As soon as we outline the earlier schema, we should compile it into the Java lessons utilizing the Protocol Buffer compiler. We will do that utilizing the proc command syntax as proven within the following:

protoc –java_out=<output_directory> <input_file>.proto

The command is as follows:

$ protoc –java_out=./Java schema.proto

Notice: Guarantee to create the Java listing earlier than operating the earlier command.

As soon as the command is executed, you must see a “Schema.java” file which accommodates the definition of your Protocol Buffer schema.

An instance output is as proven within the following:

NOTE: You shouldn’t edit the generated file. As a substitute, should you want to make adjustments, edit the “.proto” file and recompile it.

Making a Kafka Producer

Within the subsequent step, we should create a Kafka producer utilizing Java. This put up makes use of the confluent Java consumer because it helps the Protocol Buffer serialization and deserialization.

On this case, we should create a Java undertaking with the required dependencies. Then, you should use Maven so as to add the dependencies.

Subsequent, initialize the file that shops the supply code for the Kafka producer. Lastly, edit the Kafka producer supply file and add the supply code as proven:

import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;

import org.apache.kafka.shoppers.producer.*;

import org.apache.kafka.frequent.serialization.StringSerializer;

import com.instance.kafka.Product;

import java.util.Properties;

public class ProductProducer {

public static void fundamental(String[] args) {

// producer properties

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);

props.put(“schema.registry.url”, “http://localhost:8081”);

// Create a Kafka producer

Producer<String, Product> producer = new KafkaProducer<>(props);

// Create a brand new Product object

Product product = Product.newBuilder()

  .setName(“Apple MacBook Air”)

  .setDescription(“M1 Model of the apple macbook air in gold shade…”)

  .setPrice(999.99f)

  .addCategories(“laptops”)

  .addCategories(“apple”)

  .setInStock(true)

  .setManufacturer(“Apple Inc”)

  .setSku(“M12021”)

  .construct();

// Ship the thing to the “merchandise” subject in Kafka

ProducerRecord<String, Product> file = new ProducerRecord<>(“merchandise”, product.getSku(), product);

producer.ship(file, new Callback() {

  @Override

  public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null) {

System.out.println(“Message despatched efficiently to subject “ + metadata.subject() +

” partition “ + metadata.partition() + ” at offset “ + metadata.offset());

} else {

System.err.println(“Didn’t ship message to subject “ + metadata.subject() + “: “ + exception.getMessage());

  }

 }

});

// Flush and shut the producer

producer.flush();

producer.shut();

 }

}

Within the earlier instance, we use the KafkaProtobufSerializer from the confluent schema registry to serialize the merchandise’ object.

NOTE: You need to have the “merchandise” subject already created in your cluster.

As soon as accomplished, we will compile and run the producer software to put in writing the message to the Kafka subject.

Consuming the Protobuf Messages

We have to arrange the patron software able to deserializing the info to learn the messages which can be written to the producer subject.

Begin by creating the file to retailer the supply code in your Kafka shopper app. Then, edit the file and the supply code as proven within the following:

import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;

import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;

import org.apache.kafka.shoppers.shopper.*;

import org.apache.kafka.frequent.serialization.StringDeserializer;

import com.instance.kafka.Product;

import java.time.Length;

import java.util.Collections;

import java.util.Properties;

public class ProductConsumer {

public static void fundamental(String[] args) {

// Arrange shopper properties

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);

props.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, “http://localhost:8081”);

props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Product.class.getName());

props.put(ConsumerConfig.GROUP_ID_CONFIG, “product-consumer”);

// Create a Kafka shopper

KafkaConsumer<String, Product> shopper = new KafkaConsumer<>(props);

// Subscribe to the “merchandise” subject

shopper.subscribe(Collections.singletonList(“merchandise”));

// Constantly ballot for brand new messages

whereas (true) {

ConsumerRecords<String, Product> data = shopper.ballot(Length.ofSeconds(1));

for (ConsumerRecord<String, Product> file : data) {

Product product = file.worth();

System.out.printf(“Acquired message with sku ‘%s’ and identify ‘%s’%n”, product.getSku(), product.getName());

    }

   }

  }

}

Within the instance shopper software, we use the KafkaProtobufDeserializer to deserialize the product message from the cluster. To constantly learn the messages from the subject, we use the ballot perform to ballot the cluster for brand new messages.

We will then compile and skim the messages from the cluster.

Conclusion

This put up explored the fundamentals of organising a easy Kafka producer and shopper software that permits you to write and skim the messages to a Kafka cluster utilizing the Protocol Buffers.

Though this tutorial permits you to get within the door and begin working with Protocol Buffers in Kafka, it’s nonetheless a primary software you can prolong to incorporate extra performance.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments