Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema-registry): replace confluent schema registry #7930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
df56c9b
feat(kafka): Remove Confluent Schema Registry
agilelab-tmnd1991 Jun 23, 2022
0caf750
Try to fix module in Intellij
pedro93 Nov 24, 2022
244a968
Adds kafka serde classes and modifies java code to use them
pedro93 Nov 25, 2022
bc1403f
fix checkstyle
pedro93 Nov 25, 2022
72d5821
Adds initial python work + spring bean cleanup
pedro93 Dec 2, 2022
6a9e78f
Cleanup python code
pedro93 Dec 2, 2022
f4bc825
Fix java version requirements, python linting & test datahub-client t…
pedro93 Dec 2, 2022
1f1b0cf
Change lombok dependency
pedro93 Dec 2, 2022
ee024ab
More fixes for slf4j
pedro93 Dec 2, 2022
ce0100f
Add correct kafka module & move avro dependency to lower module
pedro93 Dec 5, 2022
5dc3f4f
Adds OpenAPI definition for Confluent Schema Registry
pedro93 Dec 9, 2022
3d61276
Remove lombok from schema-registry-api module
pedro93 Dec 12, 2022
530ad1d
Make build pass
pedro93 Dec 12, 2022
a14f2b3
Adds schema registry open api implementation skeleton
pedro93 Dec 14, 2022
e5d4c1d
cleanup
pedro93 Dec 14, 2022
2e5ef0b
Implement schema registry service
pedro93 Dec 15, 2022
ac3ae5a
WIP integration test for openapi controller with full-blown kafka pro…
pedro93 Dec 15, 2022
fba25f5
Update OpenAPI spec to v7.4.0 for correct return types + update test …
pedro93 Dec 20, 2022
0d39178
Finish test for MCLs & PEs
pedro93 Dec 21, 2022
a768a15
rebase with master
pedro93 Dec 21, 2022
9dd2a18
Remove Kafka avro serde module
pedro93 Dec 21, 2022
343855e
Update Docker dependencies, make test work
pedro93 Dec 23, 2022
6c57404
Cleanup code
pedro93 Dec 23, 2022
53e1fd5
more code cleanup
pedro93 Dec 26, 2022
ff9ce9a
Fix KafkaEventConsumerFactory after rebase
pedro93 Dec 26, 2022
9dc9b7a
Fix issues with linting & metadata-ingestion setup
pedro93 Dec 26, 2022
df0a7a0
modify docker compose files
pedro93 Dec 27, 2022
d6a4e66
Fix python linters + integration test
pedro93 Dec 28, 2022
206099d
Fix mae consumer & add spring test for it
pedro93 Dec 30, 2022
e32a46c
Add openapi servlet health controller & change onBootApplicationListe…
pedro93 Jan 6, 2023
594b0f3
Move schema registry controller to it's own servlet without authentic…
pedro93 Jan 9, 2023
a547bae
Ignore schema registry in quickstart health check
pedro93 Jan 9, 2023
51d7053
Update metadata-service war to have a dedicated schema registry servlet
pedro93 Jan 10, 2023
8e70e78
Update docker compose files & docker env vars
pedro93 Jan 12, 2023
1af0d52
Make kafka setup job _schema topic change optional
pedro93 Jan 12, 2023
35cf58d
fix lint
pedro93 Jan 12, 2023
572b71f
update quickstart files
pedro93 Jan 12, 2023
19f5adc
Set kafka schema registry url
pedro93 Jan 13, 2023
3510753
Fix lint
pedro93 Jan 13, 2023
088f6c8
Fix war naming for datahub-gms start.sh in CI
pedro93 Jan 13, 2023
e637aa4
Update health check endpoint on schema registry controller
pedro93 Jan 13, 2023
64b8c5d
Remove unused imports
pedro93 Jan 13, 2023
dffef58
Add schema-registry web config for json serialization
pedro93 Jan 16, 2023
d836e94
reset kafka-connect env vars
pedro93 Jan 16, 2023
d36c374
Add kafka setup env var to use confluent registry or not
pedro93 Jan 17, 2023
29d6c21
Fix registry url in smoke tests end to end
pedro93 Jan 17, 2023
6ffe788
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 17, 2023
1f813e9
Merge branch 'master' into remove-confluent-schema-registry
hsheth2 Jan 17, 2023
98d2e86
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 17, 2023
5e62a5e
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 17, 2023
0a0c7b4
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 18, 2023
4b54c41
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 18, 2023
29ec772
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 18, 2023
92a1fb8
Remove /up endpoint
pedro93 Jan 18, 2023
406bdfa
Delete schema registry references that were commented out
pedro93 Jan 18, 2023
bd89271
Apply review comments
pedro93 Jan 18, 2023
d5d79f2
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 18, 2023
9778319
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 19, 2023
605a638
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 19, 2023
faeb09f
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 20, 2023
9caf64d
fix lint
pedro93 Jan 20, 2023
2d7632f
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 20, 2023
1ad6a57
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 21, 2023
d7c2150
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 23, 2023
f110a71
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 24, 2023
bbc3191
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 24, 2023
d23bfe4
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 24, 2023
1cf13bd
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 24, 2023
cfdacd6
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 25, 2023
e8faded
Non standalone tests
pedro93 Jan 25, 2023
a904fbd
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 25, 2023
a367868
Fix MAE spring injection
pedro93 Jan 26, 2023
fae529a
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 26, 2023
ccd2a0e
Revert ingestion scheduler change
pedro93 Jan 26, 2023
e7b264b
Fix mae spring dependency
pedro93 Jan 27, 2023
d2437e8
Fix linting and revert spark-integration changes
pedro93 Jan 27, 2023
2d2183a
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Jan 30, 2023
29ba268
Fix lint
pedro93 Jan 30, 2023
8ef28e7
Merge branch 'master' into remove-confluent-schema-registry
pedro93 Feb 9, 2023
14f924f
Fix broken build
pedro93 Feb 9, 2023
9389bfe
Update docker compose files
pedro93 Feb 9, 2023
68e3411
Merge remote-tracking branch 'upstream/master' into remove-confluent-…
shirshanka Mar 14, 2023
684fc25
fix spring dep issues
RyanHolstien Mar 15, 2023
a61db33
Merge pull request #3 from RyanHolstien/sd-remove-confluent-schema-re…
shirshanka Mar 15, 2023
39486b9
Merge branch 'master' into sd-remove-confluent-schema-registry
RyanHolstien Apr 13, 2023
56b50c8
fix typo
RyanHolstien Apr 13, 2023
6ba6a75
Merge pull request #4 from RyanHolstien/sd-remove-confluent-schema-re…
shirshanka Apr 14, 2023
1f89647
Merge remote-tracking branch 'upstream/master' into sd-remove-conflue…
shirshanka Apr 18, 2023
02cc758
fix lint
shirshanka Apr 19, 2023
63849fa
Merge remote-tracking branch 'origin/master' into sd-remove-confluent…
david-leifker Apr 28, 2023
197c498
passing build
david-leifker Apr 29, 2023
09cefdc
Fix quickstart, add environment var docs
david-leifker Apr 29, 2023
03df8e5
quickstart default schema-registry for now
david-leifker Apr 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ project.ext.externalDependency = [
'testContainersPostgresql':'org.testcontainers:postgresql:' + testContainersVersion,
'testContainersElasticsearch': 'org.testcontainers:elasticsearch:' + testContainersVersion,
'testContainersCassandra': 'org.testcontainers:cassandra:' + testContainersVersion,
'testContainersKafka': 'org.testcontainers:kafka:' + testContainersVersion,
'typesafeConfig':'com.typesafe:config:1.4.1',
'wiremock':'com.github.tomakehurst:wiremock:2.10.0',
'zookeeper': 'org.apache.zookeeper:zookeeper:3.4.14'
Expand Down
1 change: 1 addition & 0 deletions docker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ task quickstart(type: Exec, dependsOn: ':metadata-ingestion:install') {
'--version', "v${version}",
'--dump-logs-on-failure'
]

commandLine 'bash', '-c', cmd.join(" ")
}

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-actions/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ DATAHUB_GMS_PORT=8080

KAFKA_BOOTSTRAP_SERVER=broker:29092
SCHEMA_REGISTRY_URL=http://schema-registry:8081
# SCHEMA_REGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=MetadataChangeLog_Versioned_v1

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.cassandra.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.mariadb.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_URL=jdbc:mariadb://mariadb:3306/datahub
EBEAN_DATASOURCE_DRIVER=org.mariadb.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.postgres.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ EBEAN_DATASOURCE_DRIVER=org.postgresql.Driver
# EBEAN_POSTGRES_USE_AWS_IAM_AUTH=true
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mae-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

MAE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=false
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mae-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

MAE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=false
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-mce-consumer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RUN chmod +x /datahub/datahub-mce-consumer/scripts/start.sh
FROM base as dev-install
# Dummy stage for development. Assumes code is built on your machine and mounted to this image.
# See this excellent thread https://github.com/docker/cli/issues/1134
COPY metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-mce-consumer/resources/entity-registry.yml

FROM ${APP_ENV}-install as final

Expand Down
1 change: 1 addition & 0 deletions docker/datahub-mce-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=fal
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
3 changes: 2 additions & 1 deletion docker/datahub-mce-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ MCE_CONSUMER_ENABLED=true
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=mysql:3306
EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8&enabledTLSProtocols=TLSv1.2
EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8
EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver

KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/

ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-upgrade/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver

KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
# KAFKA_SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/

ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
depends_on:
- broker
ports:
- "8081:8081"
- ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081

elasticsearch:
image: elasticsearch:7.10.1
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose.consumers.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ services:
datahub-mae-consumer:
image: linkedin/datahub-mae-consumer:debug
build:
context: datahub-mae-consumer
dockerfile: Dockerfile
context: ../
dockerfile: docker/datahub-mae-consumer/Dockerfile
args:
APP_ENV: dev
volumes:
Expand All @@ -16,8 +16,8 @@ services:
datahub-mce-consumer:
image: linkedin/datahub-mce-consumer:debug
build:
context: datahub-mce-consumer
dockerfile: Dockerfile
context: ../
dockerfile: docker/datahub-mce-consumer/Dockerfile
args:
APP_ENV: dev
volumes:
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.tools.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
depends_on:
- zookeeper
- broker
- schema-registry
- schema-registry # -datahub-gms
- kafka-rest-proxy

kibana:
Expand Down
1 change: 1 addition & 0 deletions docker/kafka-rest-proxy/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
KAFKA_REST_LISTENERS=http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081/
# KAFKA_REST_SCHEMA_REGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
KAFKA_REST_HOST_NAME=kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS=PLAINTEXT://broker:29092
1 change: 1 addition & 0 deletions docker/kafka-setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ENV METADATA_CHANGE_PROPOSAL_TOPIC_NAME="MetadataChangeProposal_v1"
ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME="FailedMetadataChangeProposal_v1"
ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1"
ENV DATAHUB_UPGRADE_HISTORY_TOPIC_NAME="DataHubUpgradeHistory_v1"
ENV USE_CONFLUENT_SCHEMA_REGISTRY="TRUE"

COPY docker/kafka-setup/kafka-setup.sh ./kafka-setup.sh
COPY docker/kafka-setup/kafka-config.sh ./kafka-config.sh
Expand Down
1 change: 1 addition & 0 deletions docker/kafka-setup/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_BOOTSTRAP_SERVER=broker:29092
USE_CONFLUENT_SCHEMA_REGISTRY=TRUE

# Configure the topics that are created by kafka-setup
# Make sure these names are consistent across the whole deployment
Expand Down
8 changes: 7 additions & 1 deletion docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ echo "Topic Creation Complete."
# End Topic Creation Logic
############################################################

kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
## If using confluent schema registry as a standalone component, then configure compact cleanup policy.
if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--entity-type topics \
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi

# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=false
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
Expand Down
4 changes: 2 additions & 2 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=true
- PE_CONSUMER_ENABLED=false
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
Expand Down Expand Up @@ -41,7 +41,7 @@ services:
- EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver
- EBEAN_DATASOURCE_HOST=mysql:3306
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8&enabledTLSProtocols=TLSv1.2
- EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ services:
- DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS:-false}
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- USE_CONFLUENT_SCHEMA_REGISTRY=TRUE
hostname: kafka-setup
image: ${DATAHUB_KAFKA_SETUP_IMAGE:-linkedin/datahub-kafka-setup}:${DATAHUB_VERSION:-head}
labels:
Expand Down
1 change: 1 addition & 0 deletions docker/schema-registry-ui/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
SCHEMAREGISTRY_URL=http://schema-registry:8081
# SCHEMAREGISTRY_URL=http://datahub-gms:8080/schema-registry/api/
ALLOW_GLOBAL=true
ALLOW_TRANSITIVE=true
ALLOW_DELETION=true
Expand Down
13 changes: 9 additions & 4 deletions docs/deploy/environment-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ In general, there are **lots** of Kafka configuration environment variables for
These environment variables follow the standard Spring representation of properties as environment variables.
Simply replace the dot, `.`, with an underscore, `_`, and convert to uppercase.

| Variable | Default | Unit/Type | Components | Description |
|------------------------------------------------------|----------|-----------|------------------------------------------|--------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| Variable | Default | Unit/Type | Components | Description |
|-----------------------------------------------------|----------------------------------------------|-----------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `KAFKA_LISTENER_CONCURRENCY` | 1 | integer | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Number of Kafka consumer threads. Optimize throughput by matching to topic partitions. |
| `SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE` | 1048576 | bytes | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Max produced message size. Note that the topic configuration is not controlled by this variable. |
| `SCHEMA_REGISTRY_TYPE` | `INTERNAL` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry implementation. One of `INTERNAL` or `KAFKA` or `AWS_GLUE` |
| `KAFKA_SCHEMAREGISTRY_URL` | `http://localhost:8080/schema-registry/api/` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Schema registry url. Used for `INTERNAL` and `KAFKA`. The default value is for the `GMS` component. The `MCE Consumer` and `MAE Consumer` should be the `GMS` hostname and port. |
| `AWS_GLUE_SCHEMA_REGISTRY_REGION` | `us-east-1` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry implementation. |
| `AWS_GLUE_SCHEMA_REGISTRY_NAME` | `` | string | [`GMS`, `MCE Consumer`, `MAE Consumer`] | If using `AWS_GLUE` in the `SCHEMA_REGISTRY_TYPE` variable for the schema registry. |
| `USE_CONFLUENT_SCHEMA_REGISTRY` | `true` | boolean | [`kafka-setup`] | Enable Confluent schema registry configuration. |
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -95,7 +96,7 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);

@Override
@WithSpan
public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
public Future<?> produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog) {
GenericRecord record;
try {
Expand All @@ -112,14 +113,14 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
return _producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
}

@Override
@WithSpan
public void produceMetadataChangeProposal(@Nonnull final Urn urn, @Nonnull final MetadataChangeProposal
metadataChangeProposal) {
public Future<?> produceMetadataChangeProposal(@Nonnull final Urn urn,
@Nonnull final MetadataChangeProposal metadataChangeProposal) {
GenericRecord record;

try {
Expand All @@ -133,12 +134,12 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}

String topic = _topicConvention.getMetadataChangeProposalTopicName();
_producer.send(new ProducerRecord(topic, urn.toString(), record),
return _producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
}

@Override
public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) {
public Future<?> producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) {
GenericRecord record;
try {
log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s",
Expand All @@ -151,7 +152,7 @@ record = EventUtils.pegasusToAvroPE(event);
}

final String topic = _topicConvention.getPlatformEventTopicName();
_producer.send(new ProducerRecord(topic, key == null ? name : key, record),
return _producer.send(new ProducerRecord(topic, key == null ? name : key, record),
_kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
}

Expand Down
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,27 @@ def download_compose_files(
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
if kafka_setup:
kafka_setup_github_file = f"{base_url}/{KAFKA_SETUP_QUICKSTART_COMPOSE_FILE}"

default_kafka_compose_file = (
Path(DATAHUB_ROOT_FOLDER) / "quickstart/docker-compose.kafka-setup.yml"
)
with open(
default_kafka_compose_file, "wb"
) if default_kafka_compose_file else tempfile.NamedTemporaryFile(
suffix=".yml", delete=False
) as tmp_file:
path = pathlib.Path(tmp_file.name)
quickstart_compose_file_list.append(path)
click.echo(
f"Fetching consumer docker-compose file {kafka_setup_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = request_session.get(kafka_setup_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")


def valid_restore_options(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class _KafkaConnectionConfig(ConfigModel):
bootstrap: str = "localhost:9092"

# schema registry location
schema_registry_url: str = "http://localhost:8081"
schema_registry_url: str = "http://localhost:8080/schema-registry/api/"

schema_registry_config: dict = Field(
default_factory=dict,
Expand Down
Loading