Skip to main content

Apache Kafka

Type: bindings.kafka

Status: stable

Reference: https://docs.dapr.io/reference/components-reference/supported-bindings/kafka/

Example

apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: bindings.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# Indicates the direction of the binding component.
- name: direction
value: "input,output"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# The topic to publish to. (Optional)
#- name: publishTopic
# value: "mytopic"
# Specifies a custom route for incoming events. (Optional)
#- name: route
# value: "/custom-path"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# A comma-separated list of topics to subscribe to. (Optional)
#- name: topics
# value: "mytopic1,topic2"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"

Binding information

Input Binding: yes

Output Binding: yes

Output Binding operations:

  • create: Publish a new message in the topic.

Authentication profiles

Available authentication profiles:

  • OIDC Authentication

  • SASL Authentication

  • mTLS Authentication

  • No Authentication

  • AWS IAM

OIDC Authentication

Authenticate using OpenID Connect.

authType (string)

Required - Authentication type. This must be set to "oidc" for this authentication profile.

Example value: oidc

Allowed values:

  • oidc

oidcClientID (string)

Required - The OAuth2 client ID that has been provisioned in the identity provider.

Example value: my-client-id

oidcClientSecret (string)

Required - The OAuth2 client secret that has been provisioned in the identity provider.

Example value: KeFg23!

oidcTokenEndpoint (string)

Required - URL of the OAuth2 identity provider access token endpoint.

Example value: https://identity.example.com/v1/token

oidcExtensions (string)

String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.

Example value: {"cluster":"kafka","poolid":"kafkapool"}

oidcScopes (string)

Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Although not required, this field is recommended.

Default value: openid

Example value: openid,kafka-prod

SASL Authentication

Authenticate using SASL.

authType (string)

Required - Authentication type. This must be set to "password" for this authentication profile.

Example value: password

Allowed values:

  • password

saslMechanism (string)

Required - The SASL authentication mechanism to use.

Default value: PLAINTEXT

Example value: SHA-512

Allowed values:

  • SHA-512

  • SHA-256

  • PLAINTEXT

saslPassword (string)

Required - The SASL password.

Example value: mypassword

saslUsername (string)

Required - The SASL username.

Example value: myuser

mTLS Authentication

Authenticate using mTLS.

authType (string)

Required - Authentication type. This must be set to "mtls" for this authentication profile.

Example value: mtls

Allowed values:

  • mtls

caCert (string)

Required - Certificate authority certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientCert (string)

Required - Client certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientKey (string)

Required - Client key.

Example value:

-----BEGIN RSA PRIVATE KEY-----
<base64-encoded DER>
-----END RSA PRIVATE KEY-----

No Authentication

Do not perform authentication.

authType (string)

Required - Authentication type. This must be set to "none" for this authentication profile.

Example value: none

Allowed values:

  • none

AWS IAM

Authenticate using AWS IAM credentials or role for AWS MSK

authType (string)

Required - Authentication type. This must be set to "awsiam" for this authentication profile.

Example value: awsiam

Allowed values:

  • awsiam

awsAccessKey (string)

Required - AWS access key associated with an IAM account.

Example value: AKIAIOSFODNN7EXAMPLE

awsIamRoleArn (string)

Required - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.

Example value: arn:aws:iam::123456789:role/mskRole

awsRegion (string)

Required - The AWS Region where the MSK Kafka broker is deployed to.

Example value: us-east-1

awsSecretKey (string)

Required - The secret key associated with the access key.

Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

awsSessionToken (string)

AWS session token to use. A session token is only required if you are using temporary security credentials.

Example value: TOKEN

awsStsSessionName (string)

Represents the session name for assuming a role.

Default value: MSKSASLDefaultSession

Example value: MyAppSession

Metadata

brokers (string)

Required - A comma-separated list of Kafka brokers. Used for input binding direction. Used for output binding direction.

Example value: mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093

direction (string)

Required - Indicates the direction of the binding component.

Example value: input,output

Allowed values:

  • input

  • output

  • input,output

Documentation: https://docs.dapr.io/reference/api/bindings_api/#binding-direction-optional

clientConnectionKeepAliveInterval (duration)

The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.

Default value: 0

Example value: 4m

clientConnectionTopicMetadataRefreshInterval (duration)

The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.

Default value: 9m

Example value: 4m

clientID (string)

A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.

Default value: sarama

Example value: my-dapr-app

consumeRetryEnabled (bool)

Disables consumer retry by setting this to "false".

Default value: false

Example value: true

consumeRetryInterval (duration)

The interval between retries when attempting to consume topics.

Default value: 100ms

Example value: 200ms

consumerGroup (string)

A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. Used for input binding direction.

Example value: group1

disableTls (bool)

Disable TLS for transport security. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

heartbeatInterval (duration)

The interval between heartbeats to the consumer coordinator.

Default value: 3s

Example value: 5s

initialOffset (string)

The initial offset to use if no offset was previously committed. Used for input binding direction.

Default value: newest

Example value: oldest

Allowed values:

  • newest

  • oldest

maxMessageBytes (number)

The maximum size in bytes allowed for a single Kafka message.

Default value: 1024

Example value: 2048

publishTopic (string)

The topic to publish to. Used for output binding direction.

Example value: mytopic

route (string)

Specifies a custom route for incoming events.

Example value: /custom-path

Documentation: https://docs.dapr.io/developing-applications/building-blocks/bindings/howto-triggers/#specify-a-custom-route

schemaCachingEnabled (bool)

Enables caching for schemas.

Default value: true

Example value: true

schemaLatestVersionCacheTTL (duration)

The TTL for schema caching when publishing a message with latest schema available.

Default value: 5m

Example value: 5m

schemaRegistryAPIKey (string)

The Schema Registry credentials API Key.

Example value: XYAXXAZ

schemaRegistryAPISecret (string)

The Schema Registry credentials API Secret.

Example value: ABCDEFGMEADFF

schemaRegistryURL (string)

The Schema Registry URL.

Example value: http://mycompany.com:8081

sessionTimeout (duration)

The maximum time between heartbeats before the consumer is considered inactive and will timeout.

Default value: 10s

Example value: 20s

skipVerify (bool)

Skip TLS verification. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

topics (string)

A comma-separated list of topics to subscribe to. Used for input binding direction.

Example value: mytopic1,topic2

version (string)

Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka.

Default value: 2.0.0.0

Example value: 0.10.2.0