Apache Kafka
Type: pubsub.kafka
Status: stable
Reference: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/
Example
apiVersion: cra.diagrid.io/v1beta1
kind: Component
metadata:
name: <name>
spec:
type: pubsub.kafka
version: v1
metadata:
# Authentication type. This must be set to "password" for this authentication profile.
- name: authType
value: "password"
# The SASL authentication mechanism to use.
- name: saslMechanism
value: "PLAINTEXT"
# The SASL password.
- name: saslPassword
value: "mypassword"
# The SASL username.
- name: saslUsername
value: "myuser"
# A comma-separated list of Kafka brokers.
- name: brokers
value: "mycompany.com:9092,dapr-kafka.myapp.svc.cluster.local:9093"
# The number of events to buffer in internal and external channels. (Optional)
#- name: channelBufferSize
# value: "256"
# The max amount of time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. (Optional)
#- name: clientConnectionKeepAliveInterval
# value: "0"
# The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. (Optional)
#- name: clientConnectionTopicMetadataRefreshInterval
# value: "9m"
# A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. (Optional)
#- name: clientID
# value: "sarama"
# Enables message compression. There are five types of compression available: none, gzip, snappy, lz4, and zstd. The default is none. (Optional)
#- name: compression
# value: "none"
# Disables consumer retry by setting this to "false". (Optional)
#- name: consumeRetryEnabled
# value: "false"
# The interval between retries when attempting to consume topics. (Optional)
#- name: consumeRetryInterval
# value: "100ms"
# The default number of message bytes to fetch from the broker in each request. (Optional)
#- name: consumerFetchDefault
# value: "1048576"
# The minimum number of message bytes to fetch in a request. (Optional)
#- name: consumerFetchMin
# value: "1"
# A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. (Optional)
#- name: consumerGroup
# value: "group1"
# The strategy to use for consumer group rebalancing. (Optional)
#- name: consumerGroupRebalanceStrategy
# value: "range"
# Disable TLS for transport security. This is potentially insecure and not recommended for use in production. (Optional)
#- name: disableTls
# value: "false"
# Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. (Optional)
#- name: escapeHeaders
# value: "false"
# A regular expression to exclude keys from being converted to/from headers from/to metadata to avoid unwanted downstream side effects. (Optional)
#- name: excludeHeaderMetaRegex
# value: "^rawPayload|valueSchemaType$"
# The interval between heartbeats to the consumer coordinator. (Optional)
#- name: heartbeatInterval
# value: "3s"
# The initial offset to use if no offset was previously committed. (Optional)
#- name: initialOffset
# value: "newest"
# The maximum size in bytes allowed for a single Kafka message. (Optional)
#- name: maxMessageBytes
# value: "1024"
# Enables caching for schemas. (Optional)
#- name: schemaCachingEnabled
# value: "true"
# The TTL for schema caching when publishing a message with latest schema available. (Optional)
#- name: schemaLatestVersionCacheTTL
# value: "5m"
# The Schema Registry credentials API Key. (Optional)
#- name: schemaRegistryAPIKey
# value: "XYAXXAZ"
# The Schema Registry credentials API Secret. (Optional)
#- name: schemaRegistryAPISecret
# value: "ABCDEFGMEADFF"
# The Schema Registry URL. (Optional)
#- name: schemaRegistryURL
# value: "http://mycompany.com:8081"
# The maximum time between heartbeats before the consumer is considered inactive and will timeout. (Optional)
#- name: sessionTimeout
# value: "10s"
# Skip TLS verification. This is potentially insecure and not recommended for use in production. (Optional)
#- name: skipVerify
# value: "false"
# Enables Avro JSON schema for serialization. Only applicable when the subscription uses `valueSchemaType=Avro` (Optional)
#- name: useAvroJSON
# value: "false"
# Kafka cluster version. Note that this must be set to "1.0.0" if you are using Azure Event Hubs with Kafka. (Optional)
#- name: version
# value: "2.0.0.0"
Authentication profiles
Available authentication profiles:
-
OIDC Authentication
-
SASL Authentication
-
mTLS Authentication
-
No Authentication
-
AWS: Access Key ID and Secret Access Key
-
AWS: Assume IAM Role
-
AWS: IAM Roles Anywhere
OIDC Authentication
Authenticate using OpenID Connect.
authType (string)
Required - Authentication type. This must be set to "oidc" for this authentication profile.
Example value: oidc
Allowed values:
- oidc