Kafka

Apache Kafka is an open-source distributed streaming platform used for real time data transmission. The exchange of messages takes place between producers and consumers. The producers write messages into a kafka topic and the consumers read the data in real time.

General Information

In the case when a Kafka broker is provided to the user to publish his data, our kafka service is configured to support SCRAM (Salted Challenge Response Authentication Mechanism) with SHA-512 hash function and SimpleAclAuthorizer, which uses the Apache Zookeeper to store all the ACLs. ACLs (Access Control Lists) are basically permissions of users on kafka topics. For example, a user may have read access rights to specific topic, whereas a different user may have write access.

In the case that our service subscribes to a user-provided Kafka broker, it is configured to support either No authentication or PLAIN authentication mechanism or SCRAM with SHA-256 or SHA-512 hash functions. The user is required to fill in the fields displayed in the ExternalKafkaCredentailsDTO class below, except from the username/ password fields which are only required in the cases of PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512.

export class ExternalKafkaCredentialsDTO {
    username: string;
    password: string;
    topic: string;
    groupId: string;
    url: string;
    saslMechanism: any;
}

Available Services

createUser

Creates a user with a random password (8 characters).

Param Type Description
username string The username of the user

resetPassword

Resets a user's password.

Same parameters as the createUser service.

deleteUser

Deletes a user from the zookeeper.

Same parameters as the createUser service.

createTopic

Creates a kafka topic.

Param Type Description
topic string The name of the kafka topic

deleteTopic

Deletes a kafka topic.

Same parameters as the createTopic service.

giveWritePermission

Gives write access to user on a specific topic.

Param Type Description
username string The username of the user
topic string Topic name

giveReadPermission

Gives read access to user on a specific topic and group.

Param Type Description
username string The username of the user
topic string Topic name
group string Group name

removeWritePermission

Removes the write access from user on a specific topic.

Same parameters as the giveWritePermission service.

removeReadPermission

Removes the read access from user on a specific topic.

Same parameters as the giveReadPermission service.

getTopics

Lists all the topics that are available in the kafka server.

getUsers

Lists all the users that are available in the zookeeper.

getTopicPermissions

Lists all the permissions of a topic.

Param Type Description
topic string Topic name

getGroupPermissions

Lists all the permissions of a group.

Param Type Description
group string Group name

getUserPermissions

Lists all the permissions of a user.

Param Type Description
username string The username of the user

deleteTopicPermissions

Deletes all the permissions of a topic.

Param Type Description
topic string Topic name

deleteGroupPermissions

Deletes all the permissions of a group.

Param Type Description
group string Group name

deleteUserPermissions

Deletes all the permissions of a user.

Param Type Description
username string The username of the user

testCredentialsAndCreateSample

When our service subscribes to a user-provided Kafka broker(s), a connection with that broker is trying to be established with the data provided by the user. This is done by creating a new kafka broker with the help of the kafkajs==1.15.0 library and then tries to connect to it by subscribing to the user provided topic. If the connection has been established successfully, the consumer tries to consume messages from the topic in order to create a data Sample.

Param Type Description
data ExternalKafkaCredentialsDTO User provided data (further described in the General Information section)
fileType string The type of the file (.json/.xml)

Examples

Python library used: kafka-python==2.0.1

Kafka Producer

import json
from kafka import KafkaProducer

KAFKA_URL = "<CONNECTION_URL>"

producer = KafkaProducer(
    bootstrap_servers=[KAFKA_URL],
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username="<USERNAME>",
    sasl_plain_password="<PASSWORD>",
    value_serializer=lambda m: json.dumps(m).encode("utf8"),
    # value_serializer=str.encode,
)

producer.send("<TOPIC>", {"test": "message"})

The python script above creates a kafka producer and sends a simple json message to a topic. Replace all words in < > with your own credentials and details. In order to send an XML message, use the other serializer (in comments).

Kafka Consumer

import json
from kafka import KafkaConsumer

KAFKA_URL = "<CONNECTION_URL>"

consumer = KafkaConsumer(
    "<TOPIC>",
    group_id="<GROUP_ID>",
    bootstrap_servers=[KAFKA_URL],
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username="<USERNAME>",
    sasl_plain_password="<PASSWORD>",
    value_deserializer=lambda m: json.loads(m.decode("utf8")),
    # value_deserializer=lambda m: m.decode("utf8"),
)

for message in consumer:
    print(message)

The python script above creates a kafka consumer and reads json messages from a topic. Replace all words in < > with your own credentials and details. In order to read XML messages, use the other serializer (in comments).