Kafka
Apache Kafka is an open-source distributed streaming platform used for real time data transmission. The exchange of messages takes place between producers and consumers. The producers write messages into a kafka topic and the consumers read the data in real time.
General Information
In the case when a Kafka broker is provided to the user to publish his data, our kafka service is configured to support SCRAM (Salted Challenge Response Authentication Mechanism) with SHA-512 hash function and SimpleAclAuthorizer, which uses the Apache Zookeeper to store all the ACLs
. ACLs (Access Control Lists) are basically permissions of users on kafka topics. For example, a user may have read access rights to specific topic, whereas a different user may have write access.
In the case that our service subscribes to a user-provided Kafka broker, it is configured to support either No authentication or PLAIN authentication mechanism or SCRAM with SHA-256 or SHA-512 hash functions. The user is required to fill in the fields displayed in the ExternalKafkaCredentailsDTO
class below, except from the username/ password fields which are only required in the cases of PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512.
export class ExternalKafkaCredentialsDTO {
username: string;
password: string;
topic: string;
groupId: string;
url: string;
saslMechanism: any;
}
Available Services
createUser
Creates a user with a random password (8 characters).
Param | Type | Description |
---|---|---|
username | string | The username of the user |
resetPassword
Resets a user's password.
Same parameters as the createUser service.
deleteUser
Deletes a user from the zookeeper.
Same parameters as the createUser service.
createTopic
Creates a kafka topic.
Param | Type | Description |
---|---|---|
topic | string | The name of the kafka topic |
deleteTopic
Deletes a kafka topic.
Same parameters as the createTopic service.
giveWritePermission
Gives write access to user on a specific topic.
Param | Type | Description |
---|---|---|
username | string | The username of the user |
topic | string | Topic name |
giveReadPermission
Gives read access to user on a specific topic and group.
Param | Type | Description |
---|---|---|
username | string | The username of the user |
topic | string | Topic name |
group | string | Group name |
removeWritePermission
Removes the write access from user on a specific topic.
Same parameters as the giveWritePermission service.
removeReadPermission
Removes the read access from user on a specific topic.
Same parameters as the giveReadPermission service.
getTopics
Lists all the topics that are available in the kafka server.
getUsers
Lists all the users that are available in the zookeeper.
getTopicPermissions
Lists all the permissions of a topic.
Param | Type | Description |
---|---|---|
topic | string | Topic name |
getGroupPermissions
Lists all the permissions of a group.
Param | Type | Description |
---|---|---|
group | string | Group name |
getUserPermissions
Lists all the permissions of a user.
Param | Type | Description |
---|---|---|
username | string | The username of the user |
deleteTopicPermissions
Deletes all the permissions of a topic.
Param | Type | Description |
---|---|---|
topic | string | Topic name |
deleteGroupPermissions
Deletes all the permissions of a group.
Param | Type | Description |
---|---|---|
group | string | Group name |
deleteUserPermissions
Deletes all the permissions of a user.
Param | Type | Description |
---|---|---|
username | string | The username of the user |
testCredentialsAndCreateSample
When our service subscribes to a user-provided Kafka broker(s), a connection with that broker is trying to be established with the data provided by the user. This is done by creating a new kafka broker with the help of the kafkajs==1.15.0
library and then tries to connect to it by subscribing to the user provided topic. If the connection has been established successfully, the consumer tries to consume messages from the topic in order to create a data Sample.
Param | Type | Description |
---|---|---|
data | ExternalKafkaCredentialsDTO | User provided data (further described in the General Information section) |
fileType | string | The type of the file (.json/.xml) |
Examples
Python library used: kafka-python==2.0.1
Kafka Producer
import json
from kafka import KafkaProducer
KAFKA_URL = "<CONNECTION_URL>"
producer = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<USERNAME>",
sasl_plain_password="<PASSWORD>",
value_serializer=lambda m: json.dumps(m).encode("utf8"),
# value_serializer=str.encode,
)
producer.send("<TOPIC>", {"test": "message"})
The python script above creates a kafka producer and sends a simple json message to a topic. Replace all words in < >
with your own credentials and details. In order to send an XML message, use the other serializer (in comments).
Kafka Consumer
import json
from kafka import KafkaConsumer
KAFKA_URL = "<CONNECTION_URL>"
consumer = KafkaConsumer(
"<TOPIC>",
group_id="<GROUP_ID>",
bootstrap_servers=[KAFKA_URL],
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<USERNAME>",
sasl_plain_password="<PASSWORD>",
value_deserializer=lambda m: json.loads(m.decode("utf8")),
# value_deserializer=lambda m: m.decode("utf8"),
)
for message in consumer:
print(message)
The python script above creates a kafka consumer and reads json messages from a topic. Replace all words in < >
with your own credentials and details. In order to read XML messages, use the other serializer (in comments).