Skip to main content

Apache Kafka

Type: pubsub.kafka

Status: stable

Reference: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/

Example

apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: pubsub.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# The number of events to buffer in internal and external channels. (Optional)
#- name: channelBufferSize
# value: "256"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# The default number of message bytes to fetch from the broker in each request. (Optional)
#- name: consumerFetchDefault
# value: "1048576"
# The minimum number of message bytes to fetch in a request. (Optional)
#- name: consumerFetchMin
# value: "1"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"

Authentication profiles

Available authentication profiles:

  • OIDC Authentication

  • SASL Authentication

  • mTLS Authentication

  • No Authentication

  • AWS IAM

OIDC Authentication

Authenticate using OpenID Connect.

authType (string)

Required - Authentication type. This must be set to "oidc" for this authentication profile.

Example value: oidc

Allowed values:

  • oidc

oidcClientID (string)

Required - The OAuth2 client ID that has been provisioned in the identity provider.

Example value: my-client-id

oidcClientSecret (string)

Required - The OAuth2 client secret that has been provisioned in the identity provider.

Example value: KeFg23!

oidcTokenEndpoint (string)

Required - URL of the OAuth2 identity provider access token endpoint.

Example value: https://identity.example.com/v1/token

oidcExtensions (string)

String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.

Example value: {"cluster":"kafka","poolid":"kafkapool"}

oidcScopes (string)

Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Although not required, this field is recommended.

Default value: openid

Example value: openid,kafka-prod

SASL Authentication

Authenticate using SASL.

authType (string)

Required - Authentication type. This must be set to "password" for this authentication profile.

Example value: password

Allowed values:

  • password

saslMechanism (string)

Required - The SASL authentication mechanism to use.

Default value: PLAINTEXT

Example value: SHA-512

Allowed values:

  • SHA-512

  • SHA-256

  • PLAINTEXT

saslPassword (string)

Required - The SASL password.

Example value: mypassword

saslUsername (string)

Required - The SASL username.

Example value: myuser

mTLS Authentication

Authenticate using mTLS.

authType (string)

Required - Authentication type. This must be set to "mtls" for this authentication profile.

Example value: mtls

Allowed values:

  • mtls

caCert (string)

Required - Certificate authority certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientCert (string)

Required - Client certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientKey (string)

Required - Client key.

Example value:

-----BEGIN RSA PRIVATE KEY-----
<base64-encoded DER>
-----END RSA PRIVATE KEY-----

No Authentication

Do not perform authentication.

authType (string)

Required - Authentication type. This must be set to "none" for this authentication profile.

Example value: none

Allowed values:

  • none

AWS IAM

Authenticate using AWS IAM credentials or role for AWS MSK

authType (string)

Required - Authentication type. This must be set to "awsiam" for this authentication profile.

Example value: awsiam

Allowed values:

  • awsiam

awsAccessKey (string)

Required - AWS access key associated with an IAM account.

Example value: AKIAIOSFODNN7EXAMPLE

awsIamRoleArn (string)

Required - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.

Example value: arn:aws:iam::123456789:role/mskRole

awsRegion (string)

Required - The AWS Region where the MSK Kafka broker is deployed to.

Example value: us-east-1

awsSecretKey (string)

Required - The secret key associated with the access key.

Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

awsSessionToken (string)

AWS session token to use. A session token is only required if you are using temporary security credentials.

Example value: TOKEN

awsStsSessionName (string)

Represents the session name for assuming a role.

Default value: MSKSASLDefaultSession

Example value: MyAppSession

Metadata

brokers (string)

Required - A comma-separated list of Kafka brokers.

Example value: mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093

channelBufferSize (number)

The number of events to buffer in internal and external channels.

Default value: 256

Example value: 128

clientConnectionKeepAliveInterval (duration)

The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.

Default value: 0

Example value: 4m

clientConnectionTopicMetadataRefreshInterval (duration)

The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.

Default value: 9m

Example value: 4m

clientID (string)

A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.

Default value: sarama

Example value: my-dapr-app

consumeRetryEnabled (bool)

Disables consumer retry by setting this to "false".

Default value: false

Example value: true

consumeRetryInterval (duration)

The interval between retries when attempting to consume topics.

Default value: 100ms

Example value: 200ms

consumerFetchDefault (number)

The default number of message bytes to fetch from the broker in each request.

Default value: 1048576

Example value: 2097152

consumerFetchMin (number)

The minimum number of message bytes to fetch in a request.

Default value: 1

Example value: 4

consumerGroup (string)

A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.

Example value: group1

disableTls (bool)

Disable TLS for transport security. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

heartbeatInterval (duration)

The interval between heartbeats to the consumer coordinator.

Default value: 3s

Example value: 5s

initialOffset (string)

The initial offset to use if no offset was previously committed.

Default value: newest

Example value: oldest

Allowed values:

  • newest

  • oldest

maxMessageBytes (number)

The maximum size in bytes allowed for a single Kafka message.

Default value: 1024

Example value: 2048

schemaCachingEnabled (bool)

Enables caching for schemas.

Default value: true

Example value: true

schemaLatestVersionCacheTTL (duration)

The TTL for schema caching when publishing a message with latest schema available.

Default value: 5m

Example value: 5m

schemaRegistryAPIKey (string)

The Schema Registry credentials API Key.

Example value: XYAXXAZ

schemaRegistryAPISecret (string)

The Schema Registry credentials API Secret.

Example value: ABCDEFGMEADFF

schemaRegistryURL (string)

The Schema Registry URL.

Example value: http://mycompany.com:8081

sessionTimeout (duration)

The maximum time between heartbeats before the consumer is considered inactive and will timeout.

Default value: 10s

Example value: 20s

skipVerify (bool)

Skip TLS verification. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

version (string)

Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka.

Default value: 2.0.0.0

Example value: 0.10.2.0