Skip to main content

Apache Kafka

Type: pubsub.kafka

Status: stable

Reference: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/

Example

apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: pubsub.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# The number of events to buffer in internal and external channels. (Optional)
#- name: channelBufferSize
# value: "256"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# The default number of message bytes to fetch from the broker in each request. (Optional)
#- name: consumerFetchDefault
# value: "1048576"
# The minimum number of message bytes to fetch in a request. (Optional)
#- name: consumerFetchMin
# value: "1"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. (Optional)
#- name: escapeHeaders
# value: "false"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"

Authentication profiles

Available authentication profiles:

  • OIDC Authentication

  • SASL Authentication

  • mTLS Authentication

  • No Authentication

  • AWS: Access Key ID and Secret Access Key

  • AWS: Assume IAM Role

  • AWS: IAM Roles Anywhere

OIDC Authentication

Authenticate using OpenID Connect.

authType (string)

Required - Authentication type. This must be set to "oidc" for this authentication profile.

Example value: oidc

Allowed values:

  • oidc

oidcClientID (string)

Required - The OAuth2 client ID that has been provisioned in the identity provider.

Example value: my-client-id

oidcClientSecret (string)

Required - The OAuth2 client secret that has been provisioned in the identity provider.

Example value: KeFg23!

oidcTokenEndpoint (string)

Required - URL of the OAuth2 identity provider access token endpoint.

Example value: https://identity.example.com/v1/token

oidcExtensions (string)

String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.

Example value: {"cluster":"kafka","poolid":"kafkapool"}

oidcScopes (string)

Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Although not required, this field is recommended.

Default value: openid

Example value: openid,kafka-prod

SASL Authentication

Authenticate using SASL.

authType (string)

Required - Authentication type. This must be set to "password" for this authentication profile.

Example value: password

Allowed values:

  • password

saslMechanism (string)

Required - The SASL authentication mechanism to use.

Default value: PLAINTEXT

Example value: SHA-512

Allowed values:

  • SHA-512

  • SHA-256

  • PLAINTEXT

saslPassword (string)

Required - The SASL password.

Example value: mypassword

saslUsername (string)

Required - The SASL username.

Example value: myuser

mTLS Authentication

Authenticate using mTLS.

authType (string)

Required - Authentication type. This must be set to "mtls" for this authentication profile.

Example value: mtls

Allowed values:

  • mtls

caCert (string)

Required - Certificate authority certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientCert (string)

Required - Client certificate.

Example value:

-----BEGIN CERTIFICATE-----
<base64-encoded DER>
-----END CERTIFICATE-----

clientKey (string)

Required - Client key.

Example value:

-----BEGIN RSA PRIVATE KEY-----
<base64-encoded DER>
-----END RSA PRIVATE KEY-----

No Authentication

Do not perform authentication.

authType (string)

Required - Authentication type. This must be set to "none" for this authentication profile.

Example value: none

Allowed values:

  • none

AWS: Access Key ID and Secret Access Key

Authenticate using an Access Key ID and Secret Access Key included in the metadata

authType (string)

Required - Authentication type. This must be set to "awsiam" for this authentication profile.

Example value: awsiam

Allowed values:

  • awsiam

accessKey

AWS access key associated with an IAM account

Example value: AKIAIOSFODNN7EXAMPLE

awsAccessKey (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account.

Example value: AKIAIOSFODNN7EXAMPLE

awsIamRoleArn (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.

Example value: arn:aws:iam::123456789:role/mskRole

awsRegion (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'region' instead. The AWS Region where the AWS resource is deployed to.

Example value: us-east-1

awsSecretKey (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key.

Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

awsSessionToken (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials.

Example value: TOKEN

awsStsSessionName (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role.

Default value: DaprDefaultSession

Example value: MyAppSession

region (string)

The AWS Region where the AWS resource is deployed to. This will be marked required in Dapr 1.17.

Example value: us-east-1

secretKey

The secret key associated with the access key

Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

sessionToken (string)

AWS session token to use. A session token is only required if you are using temporary security credentials.

Example value: TOKEN

AWS: Assume IAM Role

Assume a specific IAM role. Note: This is only supported for Kafka and PostgreSQL.

authType (string)

Required - Authentication type. This must be set to "awsiam" for this authentication profile.

Example value: awsiam

Allowed values:

  • awsiam

region (string)

Required - The AWS Region where the AWS resource is deployed to.

Example value: us-east-1

assumeRoleArn (string)

IAM role that has access to AWS resource. This is another option to authenticate with MSK and RDS Aurora aside from the AWS Credentials. This will be marked required in Dapr 1.17.

Example value: arn:aws:iam::123456789:role/mskRole

awsIamRoleArn (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.

Example value: arn:aws:iam::123456789:role/mskRole

awsStsSessionName (string)

This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role.

Default value: DaprDefaultSession

Example value: MyAppSession

sessionName (string)

The session name for assuming a role.

Default value: DaprDefaultSession

Example value: MyAppSession

AWS: IAM Roles Anywhere

Use AWS IAM Roles Anywhere to establish trust between your AWS account and Diagrid.

assumeRoleArn

Required - ARN of the AWS IAM role to assume in the trusting AWS account.

Example value: arn:aws:iam:012345678910:role/exampleIAMRoleName

authType (string)

Required - Authentication type. This must be set to "awsiam" for this authentication profile.

Example value: awsiam

Allowed values:

  • awsiam

trustAnchorArn

Required - ARN of the AWS Trust Anchor in the AWS account granting trust to the Dapr Certificate Authority.

Example value: arn:aws:rolesanywhere:us-west-1:012345678910:trust-anchor/01234568-0123-0123-0123-012345678901

trustProfileArn

Required - ARN of the AWS IAM Profile in the trusting AWS account.

Example value: arn:aws:rolesanywhere:us-west-1:012345678910:profile/01234568-0123-0123-0123-012345678901

Metadata

brokers (string)

Required - A comma-separated list of Kafka brokers.

Example value: mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093

channelBufferSize (number)

The number of events to buffer in internal and external channels.

Default value: 256

Example value: 128

clientConnectionKeepAliveInterval (duration)

The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.

Default value: 0

Example value: 4m

clientConnectionTopicMetadataRefreshInterval (duration)

The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration.

Default value: 9m

Example value: 4m

clientID (string)

A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes.

Default value: sarama

Example value: my-dapr-app

consumeRetryEnabled (bool)

Disables consumer retry by setting this to "false".

Default value: false

Example value: true

consumeRetryInterval (duration)

The interval between retries when attempting to consume topics.

Default value: 100ms

Example value: 200ms

consumerFetchDefault (number)

The default number of message bytes to fetch from the broker in each request.

Default value: 1048576

Example value: 2097152

consumerFetchMin (number)

The minimum number of message bytes to fetch in a request.

Default value: 1

Example value: 4

consumerGroup (string)

A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic.

Example value: group1

disableTls (bool)

Disable TLS for transport security. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

escapeHeaders (bool)

Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers.

Default value: false

Example value: true

heartbeatInterval (duration)

The interval between heartbeats to the consumer coordinator.

Default value: 3s

Example value: 5s

initialOffset (string)

The initial offset to use if no offset was previously committed.

Default value: newest

Example value: oldest

Allowed values:

  • newest

  • oldest

maxMessageBytes (number)

The maximum size in bytes allowed for a single Kafka message.

Default value: 1024

Example value: 2048

schemaCachingEnabled (bool)

Enables caching for schemas.

Default value: true

Example value: true

schemaLatestVersionCacheTTL (duration)

The TTL for schema caching when publishing a message with latest schema available.

Default value: 5m

Example value: 5m

schemaRegistryAPIKey (string)

The Schema Registry credentials API Key.

Example value: XYAXXAZ

schemaRegistryAPISecret (string)

The Schema Registry credentials API Secret.

Example value: ABCDEFGMEADFF

schemaRegistryURL (string)

The Schema Registry URL.

Example value: http://mycompany.com:8081

sessionTimeout (duration)

The maximum time between heartbeats before the consumer is considered inactive and will timeout.

Default value: 10s

Example value: 20s

skipVerify (bool)

Skip TLS verification. This is potentially insecure and not recommended for use in production.

Default value: false

Example value: true

version (string)

Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka.

Default value: 2.0.0.0

Example value: 0.10.2.0