ãã®è¨äºã¯ MicroAd Advent Calendar 2024 㨠Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2023 ã®25æ¥ç®ã®è¨äºã§ãã
12/25ã¯çµãã£ã¦ãã¾ã£ã¦ãã¾ãããã25æ¥ç®ã®è¨äºã§ãã25æ¥ç®ã¨ãã£ãã25æ¥ç®ãªãã§ãã
é
ããçç±ã¯è²ã
ã¨ããã®ã§ãããæ¬é¡ããã¾ããã1ã
ä»åã¯ããã£ã¨Spark Structure Streamingã§è¯ããã¡ããï¼ã£ã¦äºã§æ¨ªç®ã§è¦ç¶ãã¦ããããã®åã
Flinkã«ã¤ãã¦ãã£ã¦ããã¾ããã
Sparkã¯å¦çãããã¼ã¿ã®ç¯å²ã決ã¾ã£ã¦ãããããé åããçºå±ãã¦ã常ã«ãã¼ã¿ãæµãè¾¼ãã§ããã¹ããªã¼ã é åããã¤ã¯ããããã£ã¦è¦³ç¹ã§åºãã¦ãã£ãçµç·¯ãããã¾ããä¸æ¹ãFlinkã¯ãã®éã§ãã¹ããªã¼ã ãããããã¸å¯¾å¿é åãåºãã¦ãã£ãçµç·¯ãããã¾ããã©ãã®ä¸çãããããã¹ããªã¼ã ããã¹ã¦å¯¾å¿ããæããæªæ¥ï¼ï¼ï¼ã«åãã¦åç£ç¢ç£¨ãã¦ãã¾ãã
ä»åã¯Flinkã®è©±ã«è§¦ãã¦ãããå®éã«Kubernetesã«ã¯RKE2 2 ã使ã£ã¦ã¯ã©ã¹ã¿ããµãµãã¨ç¨æãã¦ããã«Flinkã®ç°å¢ãããã¦ãµã³ãã«èµ·åãã¦ã¿ãæµãã§ãããã©ãããè£è¶³ããªããç´¹ä»ãã¦ããã¾ãã ãã¡ã¯Proxyããã£ã¦ããããã£ã¦æ¹ã大ä¸å¤«ã§ãã
åæï¼ã¨ãããä»åã®ç°å¢ï¼
- Ubuntu v22以é
- RKE2 ï¼Kubernetes v1.31ï¼
- Apache Flink v1.20
- Apache Flink Kubernetes Operator v1.10
- åæï¼ã¨ãããä»åã®ç°å¢ï¼
- Flinkã®ææ°æ å ±ã¨v2.0ã®è©±
- RKE2ã使ã£ã¦Flink on Kubernetesãå§ãã
- æå¾ã«
Flinkã®ææ°æ å ±ã¨v2.0ã®è©±
ä»å¹´ã®Flinkãæ¯ãè¿ã
ä»å¹´ã¯Flinkã®è©±ãã¦ããçãã®å¹´ã§ããã
Flinkã®ã«ã³ãã¡ã¬ã³ã¹ãªã®ã§Flinkè´å±ã«ãªããã¡ã§ããã Flink Forward 2024 ã§ä»¥ä¸ã®é¢ç½ãã»ãã·ã§ã³ãããã¾ããã
Sparkã¨Flinkã®éãã«ã¤ãã¦èª¬æããã»ãã·ã§ã³ã§ã¨ã¦ãèå³æ·±ãå 容ã§ãã3ã
ã¾ããç»å£è ã®æ¹ãå¾æ¥ã以ä¸ã®è¨äºã§è£è¶³ãã¦ãã¾ãã
ã¬ã¤ãã³ã·ä»¥å¤ã«ãã¹ããªã¼ã ã«ç¹åãã¦ããç¹ãKafkaãããªãæèããå®è£ ã«ãªã£ã¦ããåãSpark Structure Streamingã§ã®ãã©ããèããã¨Flinkãããã ãªãã¨ãã¿ãã¿æãã¾ããããã
ã¾ããåãã¹ãã¼ã«ã¼ã®æ¹ãConfluentã®å¹´æ¬¡ã¤ãã³ãã®Current 2024ã«ã¦ãFlink SQLã§è¡ãã¤ãã³ãã¿ã¤ã å¦çã®è©±ããã¦ãã¦ããã¡ãã®ã¹ã©ã¤ããã¨ã¦ãèå³æ·±ãã§ãã
ä»ã«ããFlinkã¨Icebergã¯å ããé¢ä¿ã¯æ·±ãã®ã§ãããFlussã£ã¦ãããStreaming Lakehouseãªãã¦é¢ç½ãããªãã®ãåºã¦ãã¾ãããã¾ããFlussã¯ASFã«ä»²éå ¥ããããããã§ãã
Fluss: Unified Streaming Storage For Next-Generation Data Analytics
ä»å¾ã®ååãæ°ã«ãªãã¾ãã
ã¾ãããã®è¨äºã®Ververica4ã¯ã2024å¹´ã®1æã«Flink CDC v3ã¨ãã¦Apache Software Foundationã«å¯è´ããã¦ã¾ãã以ä¸ã®ã¹ã©ã¤ãã¯Flink CDCã®v3ã«è³ãå¤é·ãåãããããã§ãã YAMLã§CDCã®è¨å®ãæ¸ãã£ã¦ã®ã¯ã·ã³ãã«ã§è¯ãæãã§ããï¼Embulkã£ã½ããããï¼ã
Flink CDCã«ã¤ãã¦ã¯ãããã°è¨äºãªã©ã¯ä»¥ä¸ã«ã¾ã¨ã¾ã£ã¦ã¾ãã
ã¾ãã詳ãã解説ã¯ä»¥ä¸ã®è¨äºãåèã«ãªãã¾ãã
YAMLã§CDCåºæ¥ããªã横å±éããããã®ã§ãããããã§ããã
Flink v2.0ã®è©±
Apache Flink 1.20 ã¯ãä»å¹´ã®8æã«ãªãªã¼ã¹ããã¦ãã¾ãã ã¾ãã以ä¸ã®éããFlink 1.0 ã®ãªãªã¼ã¹ãã8å¹´ãçµéããFlinkã¯v2.0ããã¬ãã¥ã¼çã¨ãã¦10æã«å ¬éããã¦ããã2024å¹´æ«ã«åãã¦GAç®æãã¦ãããããã§ãããå¹´è¶ãããã§ããã
Breaking Changes
Flink v2.0ã§ã¯ã¡ã¸ã£ã¼ã¢ãããã¼ãã«ä¼´ãéäºææ§ã®å¤æ´ãçµæ§å ¥ã£ã¦ãã¾ãã
- åé¤ãããAPI
- å¤æ´ãããAPI
- è¨å®ãã¡ã¤ã«ã®å¤æ´
- å¾æ¥ã®
flink-conf.yaml
ãå»æ¢ããæ°ããYAMLæ¨æºã«æºæ ããè¨å®ãã¡ã¤ã«config.yaml
ã¸ç§»è¡ - éæ¨å¥¨ã®è¨å®ãã¼ãåé¤
- å¾æ¥ã®
- Stateã®äºææ§
- Checkpoint/Savepointã®ãã©ã¼ããããå¤æ´ããã1.xç³»ã¨2.0ã®éã§100%ã®äºææ§ã¯ä¿è¨¼ãããªã
- 移è¡ç¨ã®ãã¼ã«ãã³ãã¥ããã£ãéçºä¸ããã
- Java 8ãµãã¼ãã®çµäº
- Per-Jobã¢ã¼ãã®åé¤
- å¾æ¥ã®Per-Jobã¯ã©ã¹ã¿ã¼ã¢ã¼ãã2.0ã§ã¯éæ¨å¥¨
ã¾ããæ°ããAPIã«åãããã³ãã¥ããã£ãä¸è¨ä¸»è¦ã³ãã¯ã¿ãåªå çã«å¯¾å¿ä¸: KafkaãPaimonãJDBCãElasticsearch ä¸è¨ä»¥å¤ã®ã³ãã¯ã¿ã¯é 次対å¿äºå®ã§ãéçºã«åå ããã³ããã¿ã¼ãåéä¸ã ããã§ãã
ãªã®ã§ãFlink v2.0ããå§ããå ´åã¯ã以ä¸ã使ããã¨ãæèããã¨è¯ãããã
- ãããå¦çãå«ã㦠Java DataStream API ã«çµ±ä¸
- Table API/SQLã®æ´»ç¨
- ã«ã¹ã¿ã ã³ãã¯ã¿ãªã©ã®å®è£ ã¯ãæ°ãã Source / Sink V2 ã使ã
- ã¦ã¼ã¶ã¼å®ç¾©ã®ãã¼ãã«ã½ã¼ã¹ããã¼ãã«ã·ã³ã¯ã使ãå ´åã¯ãDynamicTableSourceãDynamicTableSink ã使ã
- æ§ã¿ã¤ãå®ç¾©ç³»ã¯ã©ã¹ï¼ä¾: TableSchemaãTableColumnãTypesï¼ã¯ä½¿ããã« SchemaãColumnãDataTypes ã使ã
ã¹ããªã¼ãã³ã°æ©è½å¼·åï¼Disaggregated State Management
ä»ã«ãã¹ããªã¼ãã³ã°æ©è½å¼·åã¨ãã¦ãDisaggregated State Managementãæãã£ã¦ãã¾ãã
Flinkã¯ä¼çµ±çã«è¨ç®ãã¼ãã¨ãã¼ã«ã«ãã£ã¹ã¯ä¸ã®Stateãå¯çµåããã¦é«ããã©ã¼ãã³ã¹ãå®ç¾ãã¦ãã¾ããããããã以ä¸ã®ç¹ã§å¶ç´ãããã¾ãã
- ã³ã³ãã¥ã¼ããªã½ã¼ã¹ã¨ã¹ãã¬ã¼ã¸å®¹éãç¬ç«ãã¦ã¹ã±ã¼ã«ã§ããªã
- ã³ã³ããç°å¢ä¸ã§ããã§ãã¯ãã¤ã³ãå¦çã«ããCPUãI/Oè² è·ã®ãã¼ã¯ãäºæ¸¬ãã¥ãã
- ã¯ã©ã¦ãã¹ãã¬ã¼ã¸ï¼S3, OSSãªã©ï¼ãæå¹æ´»ç¨ããã
- ã¹ãã¼ãã巨大ã«ãªã£ãå ´åã®ãªã¹ã±ã¼ãªã³ã°ããªã«ããªæéãå¢å¤§
ããã§ã解決çã¨ãã¦ãDisaggregated State Management ãç»å ´ã
ãã®è¦ã«ãªãã®ãã âForSDBâ (For Streaming DB) ã§ãã
- Flinkã¿ã¹ã¯ããStateãåãé¢ããã¯ã©ã¦ãã¹ãã¬ã¼ã¸ï¼S3ãOSSãHDFSãªã©ï¼ã«ç´æ¥æ¸ãè¾¼ã¿ï¼èªã¿åããå¯è½ã«ããä»çµã¿
- ãã¼ã«ã«ãã£ã¹ã¯ã¸ã®ä¾åãä½æ¸ãã容éå¶éã®åé¡ã解æ¶
- ãã§ãã¯ãã¤ã³ãæããªã¹ã±ã¼ã«æã«å·¨å¤§ãªç¶æ ããã¦ã³ãã¼ãã»åé ç½®ããå¿ è¦ããªããªããããé«éåãæå¾ ã§ãã
ãã ãã¯ã©ã¦ãã¹ãã¬ã¼ã¸ã使ãå ´åã®èª²é¡ã¨ãã¦ãã¹ãã¬ã¼ã¸ã®é
延ãã©ã解æ¶ãããããã¤ã³ãã«ãªãã¾ãã
ããã«ã¤ãã¦ã¯ã以ä¸ã®åãçµã¿ããã¦ããããã§ãã
- ãã¼ã«ã«ãã£ã¹ã¯ããã¬ã¤ãã³ã·ãé«ãDFSã使ãã¨ããã©ã¼ãã³ã¹ä½ä¸ãæ¸å¿µããã
- Asyncå®è¡ã¢ã¼ããå°å ¥ããState I/Oã¨è¨ç®ãéåæå
- å¿ è¦ã«å¿ãã¦ãã¼ã«ã«ãã£ã¹ã¯ããã£ãã·ã¥ã¨ãã¦å©ç¨ã§ããâHybrid Cacheâãéçº
å®è£ ç¶æ³ã¨ãã¦ã¯ã以ä¸ã®éãã
- Flinkã³ãã¥ããã£ã«ã¦è¤æ°ã®Flink Improvement Proposal (FLIP)ãçå®ãã段éçã«å®è£ ä¸
- ãã§ã«ãã¬ãã¥ã¼çãå ¬éããã¦ãããä¸é¨æ©è½ã試ããã¨ãå¯è½
ForStDBã«ã¤ãã¦ã¯ã以ä¸ã§è©³ããç´¹ä»ããã¦ããã®ã§ããããç解ã§ããªãç¹ãå¤ãã¦ã誰ãæãã¦ð
- Enabling Flink's Cloud-Native Future Introducing ForStDB in Fink 2.0
- ForStDBã®ãã¬ãã¥ã¼çï¼ https://github.com/ververica/ForSt/releases/tag/v0.1.2-beta
Materialized Table
ãããªã¢ã©ã¤ãºããã¼ãã«ã¯ãã¯ã¨ãªã¨ãã¼ã¿é®®åº¦ï¼FRESHNESSï¼ ãå®ç¾©ãã¦ããã¨ãFlink ã¨ã³ã¸ã³ãèªåçã«ãã¼ãã«ã¹ãã¼ããæ¨è«ããç¶ç¶çãªãã¼ã¿æ´æ°ãã¤ãã©ã¤ã³ãæ§ç¯ãã¦ããã¾ãã
ãã§ã« Flink 1.20 㧠MVPï¼æå°å®è¡å¯è½è£½åï¼ã¨ãã¦å°å
¥æ¸ã¿ã®æ©è½ã§ããFlink 2.0 ã§ã¯ãéç¨é¢ãªã©ãããã«å¼·åãããè¦è¾¼ã¿ã§ãã
ã¾ãããããã¨ã¹ããªã¼ãã³ã°ã®å¦çãã·ã¼ã ã¬ã¹ã«æ±ããSQL ã®ã¿ã§ç¶ç¶çãªãã¼ã¿å¤æãè¡ããã®ã大ããªç¹å¾´ã§ãã
Adaptive Batch Execution
è«çãã©ã³ããã³ç©çãã©ã³ã®åçãªæé©åãå°å ¥ãããäºå®ã§ãã ããã«ããããã®å®è¡ä¸ã«å¾ãããå®è¡çµæï¼ä¾ï¼ã«ã¼ãã£ããªãã£æ å ±ããã¼ã¿åå¸ï¼ããã¨ã«ãæé©åæ¦ç¥ã®ç¬¬1段ã¨ãã¦ã¯ãããã¼ããã£ã¹ãçµåã¨ã¹ãã¥ã¼çµåããåçã«æé©åãããããã§ãã
Streaming Lakehouse
以ä¸ã®éããFlink 2.0 ã§ã¯ãApache Paimonï¼æ§å Flink Table Storeï¼ã¨ã®é£æºãããã«ç·å¯åããã¾ã
- SQL ãã©ã³ã®æé©åï¼Paimon ã®ãªãããªãã¼ã¸ã¨ã³ã¸ã³ãæ´»ç¨ããFlink SQL ã®ãã©ã³æé©åãæ¨é²
- ã«ãã¯ã¢ããçµåã®ããã©ã¼ãã³ã¹åä¸ï¼ãã±ããæ å ±ãæ´»ç¨ãããã¨ã§ããã¼ã¿èªã¿è¾¼ã¿ã®å¹çåãæå¾ ã§ãã¾ã
- Materialized TableãAdaptive Batch ExecutionãSpeculative Execution ãªã© Flink 2.0 ã®æ°æ©è½ãµãã¼ãï¼Paimon ã¨çµã¿åããããã¨ã§ãã¬ã¤ã¯ãã¦ã¹å ¨ä½ã®ãªã¢ã«ã¿ã¤ã æ´æ°ããããæé©åãããã·ã¼ã ã¬ã¹ã«è¡ãã¾ã
ããã«å ãã¦ãFlussãå ããã°ãã«ã©ã å½¢å¼ã«ããã¹ããªã¼ãã³ã°ä¿åãå¯è½ã«ãªãããä»å¾äºå®ãã¦ããKafkaãããã³ã«äºæãé²ãã°ãã¹ããªã¼ã ãã¼ã¿ã¯KafkaããConsumeãã¦ãã¼ã¿ã¬ã¤ã¯ã«Sinkãã¦ãããã§ãã¼ãã«ã«åãè¾¼ãã§ã¨ãããªãã¦ããç´æ¥ã¯ã¨ãªããä¸çãã¾ã£ã¦ãã¨æãã¨æé«ã§ããã
ã§ã¯ã触ãããã¦ãã¾ããªããªã£ã¦ããé ã§ãã®ã§ãåããã¦ããã¾ãã
Flinkã«ã¯å
¬å¼ã«Kubernetesã®Operatorãããã®ã§ããã使ã£ã¦ç°å¢ãæ§ç¯ãã¦ããã¾ãã
Flink Kubernetes Operatorã使ãã¨ãFlinkDeploymentãªã½ã¼ã¹ããããã§ã¹ãã§å®ç¾©ãã¦applyããã¨ãFlink Clusterãèµ·åãã¾ãã
ãã ãæ®å¿µãªãã¨ã«Flink v2.0ã«ã¯å¯¾å¿ãã¦ãªãã®ã§ãGAãã¦ããææ°ã®Flink v1.20ã対象ã«ãã¾ãã
RKE2ã使ã£ã¦Flink on Kubernetesãå§ãã
RKE2ã§Kubernetesã¯ã©ã¹ã¿ã®æ§ç¯
ä»åã¯ã試ããªã®ã§ãKuberneteså´ã¯HAç¡è¦ãã¦ç°¡åãªæ§æã«ãã¾ãã
Control Planeã1å°ã¨ãFlink OperatorãFlinkã¸ã§ãã¨ãã£ãå®éã®ã¯ã¼ã¯ãã¼ããåãããã¼ã3å°ã®æ§æã¨ãã¾ãã
ä»®ã«FQDNã§ä»¥ä¸ã®æ§æã¨ãã¾ãã
# Control Plane yassan-cp01.internal # Node yassan-node01.internal yassan-node02.internal yassan-node03.internal
ã¾ããKubeadmã¯ä½¿ããã«ãRKE2ã使ã£ã¦ããã¾ããRKE2ã«ã¤ãã¦ã¯éå»ã®ã¢ãã«ã¬ã§æ±ã£ã¦ãã®ã§ãã£ã¡ãæ¯éèªãã§ã¿ã¦ãã ããã
ã¾ãã¯ãOSã®ã»ããã¢ããã¾ã§æ¸ãã Nodeã4å°ç¨æãã¾ããNodeã«é¢ããè¦ä»¶ã¯ä»¥ä¸ãåèã«ãã¦ãã ããã
Control Planeï¼RKE2 Server Nodeï¼ã®æ§ç¯
ã¾ãã¯ãControl Planeããæ§ç¯ãã¦ããã¾ãã
Control Planeã®Nodeã«ã¦ä»¥ä¸ãå®è¡ãã¦RKE2ãã¤ã³ã¹ãã¼ã«ã
# curl -x http://proxy.example.com:8080 -sfL https://get.rke2.io | INSTALL_RKE2_CHANNEL="v1.31" INSTALL_RKE2_TYPE="server" sh -
â» Proxyä¸è¦ãªå ´å㯠-x http://proxy.example.com:8080
ã¯ä¸è¦
次ã«Proxyãªæ¹ã¯ä»¥ä¸ãä½æ5ã
tee /etc/default/rke2-server <<EOF HTTP_PROXY=${HTTP_PROXY} HTTPS_PROXY=${HTTP_PROXY} NO_PROXY="${NO_PROXY},.svc,.local" EOF
次ã«è¨å®ãã¡ã¤ã« /etc/rancher/rke2/config.yaml
ã以ä¸ã®ããã«ä½æã
# openssl rand -base64 32 ã¨ãã§ã©ã³ãã æååãçæãã¦ãã token: SYfnuawdPNXGEqnGPWGIM/WJy3wYV1Pixd2gRwL7hTA= cluster-domain: yassan.local # ãã³ãã¯ããããç¡ããã©LBãªãã«ç´æ¥ã¤ãªãã®ã§ä»¥ä¸ãå ¥ãã¦ãã tls-san: - yassan-cp01.internal - yassan.local # Network & CNI é¢é£ cni: cilium
å¾ã¯ããµã¼ãã¹èµ·åãã¦å¾ ã¤ã ãã
# systemctl daemon-reload # systemctl enable --now rke2-server.service # journalctl -ef -u rke2-server.service
Nodeï¼RKE2 Agent Nodeï¼ã®æ§ç¯
次ã«ãä½æããControl Planeã®é ä¸ã«ãªãNodeã3å°ã¨ãåãæé ã§æ§ç¯ã
åNode ã§ä»¥ä¸ãå®æ½ãã¦ãRKE2ãã¤ã³ã¹ãã¼ã«ã
# curl -sfL https://get.rke2.io | INSTALL_RKE2_CHANNEL="v1.31" INSTALL_RKE2_TYPE="agent" sh -
Control Planeã¨åæ§ã«Proxyãªäººã¯ä»¥ä¸ãå®è¡
tee /etc/default/rke2-agent <<EOF HTTP_PROXY=${HTTP_PROXY} HTTPS_PROXY=${HTTP_PROXY} NO_PROXY="${NO_PROXY},.svc,.local" EOF
Nodeç¨ã®è¨å®ãã¡ã¤ã« /etc/rancher/rke2/config.yaml
ã以ä¸ã®ããã«ä½æã
server: https://yassan-cp01.internal:9345 token: SYfnuawdPNXGEqnGPWGIM/WJy3wYV1Pixd2gRwL7hTA=
å¾ã¯ãµã¼ãã¹èµ·åãã¦å¾ ã¤ã ãã
# systemctl daemon-reload # systemctl enable --now rke2-agent.service # journalctl -u rke2-agent -f
ããã§çµããã§ããå¾ã¯ãé©å½ãªãã¼ãã§kubectl get node
ã§ããã°OK
export KUBECONFIG=/etc/rancher/rke2/rke2.yaml /var/lib/rancher/rke2/bin/kubectl get nodes
以ä¸ã®ã³ãã³ãéã便å©ã§ãã
RKE2 commands
Kuibeconfigã®æºå
以ä¸ãåèã«ç¨æãã¾ãã
yassan-cp01.internal ã® /etc/rancher/rke2/rke2.yaml
ã«ã¯ã¬ãã³ã·ã£ã«æ
å ±ãããã®ã§ãããåèã«ä»¥ä¸ãä»å¾ä½æ¥ããKubernetesã¯ã©ã¹ã¿ã®å¤ã®Nodeã®Kubeconfigã«è¿½å ã
- cluster: certificate-authority-data: LS0tLS1CRUd(ç¥) server: https://yassan-cp01.internal:6443 name: yassan.local - context: cluster: yassan.local user: yassan.local name: yassan.local - name: yassan.local user: client-certificate-data: LS0tLS1CR(ç¥) client-key-data: LS0tLS1CRUdJTiBFQ(ç¥)
å¾ã¯ãkubectlã®contextãåãæ¿ã㦠kubectl get nodes
ã確èªåºæ¥ããå®äºã§ãã
Flink Kubernetes Operatorã®ã»ããã¢ãã
ç¶ãã¦Flink Kubernetes Operatorã®Quick Startãåèã«ã»ããã¢ãããã¦ããã¾ãã
ã¾ããNamespaceã¯ä»¥ä¸ã¨ãã¾ãã
- Flink Kubernetes Operatorï¼ flink-operator
- Flinkã¸ã§ãç¨ï¼ flink-a
ã¾ããä»å¾ã®ä½æ¥ã¯Kubernetesã¯ã©ã¹ã¿ã®å¤ã®ãã¼ãããå®è¡ãã¾ãã
cert-managerã®ã¤ã³ã¹ãã¼ã«
ã¾ãã¯ãWebhookã³ã³ãã¼ãã³ãã追å ã§ããããã«cert-managerã®ã¤ã³ã¹ãã¼ã«
$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
S3ã«å¯¾å¿ããããã¤ã¡ã¼ã¸ãå¤æ´
S3äºæã¹ãã¬ã¼ã¸ã®ãã±ããï¼s3a://
ï¼ã使ãããã®ã§ãFlinkã®ã³ã³ããã¤ã¡ã¼ã¸ã« Hadoop S3 File Systems pluginsã追å ããã
æ¸ãã¦ãã¨ããããªãã®ã§ã以ä¸ã®ãªãã¸ããªãåèã«ãã¦ãã ããã
ã¤ã¡ã¼ã¸ã¯Pushãã¦ãã®ã§ãã«ããããã©ãæ¹ã¯ãã¡ãã
S3ã¯ã¬ãã³ã·ã£ã«ã®è¿½å
Flinkãã°ãªã©ã¯S3äºæã¹ãã¬ã¼ã¸ã«è¨è¿°ãããã®ã§ãS3ã¯ã¬ãã³ã·ã£ã«ãSecretã§Namespace flink-operator 㨠flink-a ã«è¿½å
--- # secret-s3-iam.yaml apiVersion: v1 kind: Secret metadata: name: s3-iam type: Opaque data: AWS_ACCESS_KEY_ID: Q(ç¥) AWS_SECRET_ACCESS_KEY: K(ç¥)
$ kubectl -n flink-operator -f secret-s3-iam.yaml $ kubectl -n flink-a -f secret-s3-iam.yaml
Flink Kubernetes Operatorã®ã¤ã³ã¹ãã¼ã«
ããã¥ã¡ã³ãã®éãã helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
ãã¦ãè¯ããã§ããããã³ãã¬ã¼ãã®æ§é ãè¦ããvalueãã©ã使ããããè¦ããï¼ã¡ãã£ã¨ã ãããããããã¨ãå¤ã
ããã®ã§ãHelmãã£ã¼ãã¯DLãã¦ãã使ãã¾ãã
$ curl -f -sL --retry 3 "https://github.com/apache/flink-kubernetes-operator/archive/refs/tags/release-1.10.0.tar.gz" | tar xz
Helmãã£ã¼ã㯠flink-kubernetes-operator-release-1.10.0/helm/flink-kubernetes-operator
ã«ããã¾ãã
Helmãã£ã¼ãã®Valueã¯ä»¥ä¸ã®ããã«ãã¾ããã
⯠diff ãªãªã¸ãã«values.yaml æ¹å¤å¾values.yaml 23a6,8 > watchNamespaces: > - "flink-a" > - "flink-b" 26c11 < repository: ghcr.io/apache/flink-kubernetes-operator --- > repository: docker.io/yassan/flink-operator 28,30c13,14 < tag: "c703255" < # If image digest is set then it takes precedence and the image tag will be ignored < # digest: "" --- > tag: "1.10.0-1" > 47c31 < create: false --- > create: true 76a61,66 > - name: "HTTP_PROXY" > value: "http://proxy.example.com:8080" > - name: "HTTPS_PROXY" > value: "http://proxy.example.com:8080" > - name: "NO_PROXY" > value: "localhï¼ç¥ï¼" 79a70,71 > - secretRef: > name: s3-iam 165a158,166 > > fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider > s3.endpoint: http://minio:9000 > s3.path.style.access: true > > high-availability.type: kubernetes > high-availability.storageDir: s3a://flink-test/recovery > > jobmanager.archive.fs.dir: s3a://flink-test/flink-history
ð values.yamlã®ã®ä¸èº« ð
--- watchNamespaces: - "flink-a" - "flink-b" image: repository: docker.io/yassan/flink-operator pullPolicy: IfNotPresent tag: "1.10.0-1" imagePullSecrets: [] replicas: 1 strategy: type: Recreate rbac: create: true nodesRule: create: true operatorRole: create: true name: "flink-operator" operatorRoleBinding: create: true name: "flink-operator-role-binding" jobRole: create: true name: "flink" jobRoleBinding: create: true name: "flink-role-binding" operatorPod: priorityClassName: null annotations: {} labels: {} env: - name: "HTTP_PROXY" value: "http://proxy.example.com:8080" - name: "HTTPS_PROXY" value: "http://proxy.example.com:8080" - name: "NO_PROXY" value: "localhost,127.0.0.1,ï¼ç¥ï¼" envFrom: - secretRef: name: s3-iam nodeSelector: {} affinity: {} tolerations: [] topologySpreadConstraints: [] resources: {} webhook: resources: {} operatorServiceAccount: create: true annotations: {} name: "flink-operator" jobServiceAccount: create: true annotations: "helm.sh/resource-policy": keep name: "flink" operatorVolumeMounts: create: false data: - name: flink-artifacts mountPath: /opt/flink/artifacts operatorVolumes: create: false data: - name: flink-artifacts hostPath: path: /tmp/flink/artifacts type: DirectoryOrCreate podSecurityContext: runAsUser: 9999 runAsGroup: 9999 operatorSecurityContext: {} webhookSecurityContext: {} webhook: create: true keystore: useDefaultPassword: true defaultConfiguration: create: true append: true flink-conf.yaml: |+ # Flink Config Overrides kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE kubernetes.operator.reconcile.interval: 15 s kubernetes.operator.observer.progress-check.interval: 5 s # S3ã¯ã¬ãã³ã·ã£ã«ã¯ AWS_ACCESS_KEY_ID 㨠AWS_SECRET_ACCESS_KEY ã使ãã®ã§ä»¥ä¸ã¨ãã fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider s3.endpoint: http://minio:9000 s3.path.style.access: true high-availability.type: kubernetes high-availability.storageDir: s3a://flink-test/recovery jobmanager.archive.fs.dir: s3a://flink-test/flink-history log4j-operator.properties: |+ # Flink Operator Logging Overrides # rootLogger.level = DEBUG # logger.operator.name= org.apache.flink.kubernetes.operator # logger.operator.level = DEBUG log4j-console.properties: |+ # Flink Deployment Logging Overrides # rootLogger.level = DEBUG metrics: port: nameOverride: "" fullnameOverride: "" jvmArgs: webhook: "" operator: "" operatorHealth: port: 8085 livenessProbe: periodSeconds: 10 initialDelaySeconds: 30 startupProbe: failureThreshold: 30 periodSeconds: 10 postStart: {} tls: create: false secretName: flink-operator-cert secretKeyRef: name: operator-certificate-password key: password
ãã¨ã¯ä»¥ä¸ã®è¦é ã§ã¤ã³ã¹ãã¼ã«ã
$ export FLINK_OP_NS=flink-operator $ export DIR_HELM_CHARTS=/home/yassan/flink/flink-kubernetes-operator-release-1.10.0/helm/flink-kubernetes-operator $ export DIR_HELM_VAL=/home/yassan/flink $ kubectl create ns ${FLINK_OP_NS} # ãããã§ã¹ããçæï¼ãã§ãã¯ç¨ï¼ $ helm template ${FLINK_OP_NS} ${DIR_HELM_CHARTS} -n ${FLINK_OP_NS} -f ${DIR_HELM_VAL}/values_flink-operator.yaml > ${DIR_HELM_VAL}/manifest_${FLINK_OP_NS}_$(date '+%Y-%m-%d_%H%M%S').yaml # ã¤ã³ã¹ãã¼ã«ãã§ã㯠$ helm install ${FLINK_OP_NS} ${DIR_HELM_CHARTS} -n ${FLINK_OP_NS} -f ${DIR_HELM_VAL}/values_flink-operator.yaml --dry-run 2>&1 | tee ${DIR_HELM_VAL}/${FLINK_OP_NS}_dry_$(date '+%Y-%m-%d_%H%M%S').yaml # ã¤ã³ã¹ãã¼ã«å®æ½(åå) $ helm install ${FLINK_OP_NS} ${DIR_HELM_CHARTS} -n ${FLINK_OP_NS} -f ${DIR_HELM_VAL}/values_flink-operator.yaml 2>&1 | tee ${DIR_HELM_VAL}/${FLINK_OP_NS}_$(date '+%Y-%m-%d_%H%M%S').yaml # ã¤ã³ã¹ãã¼ã«å®æ½(2åç®ä»¥é) $ helm upgrade ${FLINK_OP_NS} ${DIR_HELM_CHARTS} -n ${FLINK_OP_NS} -f ${DIR_HELM_VAL}/values_flink-operator.yaml 2>&1 | tee ${DIR_HELM_VAL}/${FLINK_OP_NS}_$(date '+%Y-%m-%d_%H%M%S').yaml # ã¢ã³ã¤ã³ã¹ãã¼ã« $ helm delete ${FLINK_OP_NS} -n ${FLINK_OP_NS} # ãã§ã㯠$ helm ls --all-namespaces
Flinkã¸ã§ãã®å®è¡
以ä¸ãåèã«ãµã³ãã«ã®Flinkã¸ã§ããå®è¡ãã¦ããã¾ãã
æé ã¨ãã¦ã¯ãFlinkDeploymentãªã½ã¼ã¹ãä½æãã¦applyããã°OKã
--- # basic.yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: docker.io/yassan/flink:1.20.0-1 flinkVersion: v1_20 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" rest.flamegraph.enabled: "true" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 podTemplate: spec: containers: - name: flink-main-container env: - name: "HTTP_PROXY" value: "http://proxy.example.com:8080" - name: "HTTPS_PROXY" value: "http://proxy.example.com:8080" - name: "NO_PROXY" value: "localhost,ï¼ç¥ï¼" envFrom: - secretRef: name: s3-iam taskManager: resource: memory: "2048m" cpu: 1 podTemplate: spec: containers: - name: flink-main-container env: - name: "HTTP_PROXY" value: "http://proxy.example.com:8080" - name: "HTTPS_PROXY" value: "http://proxy.example.com:8080" - name: "NO_PROXY" value: "localhost,ï¼ç¥ï¼" envFrom: - secretRef: name: s3-iam job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless
ãããã§ã¹ãä½æåºæ¥ãããã¨ã¯applyãã¦ãFlinkDeploymentãªã½ã¼ã¹ãKubernetesã¯ã©ã¹ã¿ã«åæ ããã ãã
$ kubectl -n flink-a apply -f basic.yaml
ããã¨ãFlink Kubernetes Operatorãæ¤ç¥ãã¦ãFlinkã¸ã§ãã¨ãã¦ãå°ç¨ã®Job Managerã¨Task Managerãèµ·åãã¦ã¯ã¼ã¯ãã¼ããå®è¡ãã¾ãã
ä»åã¯ã upgradeMode: stateless
ã¨ãã¦ããã®ã§ãè¨å®ãå¤ããã¨æ¯åããã®ç¶æ
ããããç´ãã«ãªã£ã¦ãã¾ãä¸çªç°¡åãªæ¬çªå©ç¨åãã§ã¯ãªãç°¡åãªã¸ã§ãã¨ãªã£ã¦ã¾ãããã¡ãããæ¬çªãæèããupgradeModeãããã¾ãããä»åã¯ããã¾ã§ã
Flink Clusterèªä½ã«ã¤ãã¦ããä»åã®ããã«Flinkã¸ã§ããã¨ã«ã¯ã©ã¹ã¿æ§æããã¨ãã£ãFlinkã§ããã¨ããã® Application Modeãä¸åº¦èµ·åããJob Managerã使ãã¾ããã¦Flinkã¸ã§ããå®è¡ããSession Modeãããã¾ããã¤ã¡ã¼ã¸ã¯ä¸å³ã®éãï¼çãä¸ã®ãã¤ã¯deprecatedãªã®ã§è§¦ãã¾ããï¼ã
ä»ã«ã以ä¸ã«ãµã³ãã«ãããã®ã§æ°ã«ãªãæ¹ã¯ãµã³ãã«ãè¦ãã¨ã¤ã¡ã¼ã¸ãä»ããããã§ãã
話ãå®è¡ä¸ã®Flinkã¸ã§ãã«æ»ãã¾ãã
å®è¡æã®ãã°ã¯ä»¥ä¸ã§è¦ãã¾ãã
$ kubectl -n flink-a logs -f deploy/basic-example
ã¾ããèµ·åä¸ã®Flinkã¸ã§ãã®Flink Dashbord ã¯ä»¥ä¸ã§è¦ãã¾ãã
$ kubectl -n flink-a port-forward svc/basic-example-rest 8081
æ¬çªãæ³å®ããå ´åã¯ãIngressãç¨æããã¦ããã®ãã¡ãã使ãã°è¯ãã§ãã
Ingress | Apache Flink Kubernetes Operator
ãããªããªã£ãã以ä¸ã§åé¤åºæ¥ã¾ãã
$ kubectl -n flink-a delete flinkdeployment/basic-example
æå¾ã«
ãããã ã£ãã§ããããã
Flinkã®è¨å®ãå
¨ç¶ãããã¦ãªãã®ã§ãç¶æ
ãä¿ååºæ¥ãS3ã¯ç¨æåºæ¥ã¦ãã®ã§ãä»å¾ã¯ããã£ã¦ã¿ã¦æ©ä¼ããã£ããç¶ç·¨ãæ¸ãã¾ãã
ï¼Flink Kubernetes OperatorãFlinkèªä½ã®HAãã¢ãã¿ãªã³ã°ãã¢ã©ã¼ããã¢ããã°ã¬ã¼ããFlinkã¸ã§ãã®ãªã½ã¼ã¹å¶å¾¡ãªã©ãªã©ï¼
以ä¸ãMicroAd Advent Calendar 2024 㨠Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2024 ã®25æ¥ç®ã®æçµæ¥ã®è¨äºã§ããã
-
è¨ã訳
è²ã 調ã¹ã¦ãããã¡ã«é¢ç½ããªã£ã¦èª¿ã¹ããã¦æéãçµæ§ããã«åãããã®ã¨ãå½åã¯ZooKage使ã£ã¦Rancher Desktopç°å¢ã«OzoneãHDFSç«ã¡ä¸ãã¦S3ãã±ãããHDFSç¨æãã¦ãããã«Flinkã®ç°å¢ãå ¥ãã¦ããã¨ãããäºå®ã ã£ããã§ãããããè¨äºæ¸ãããã®ãã©ã¦ã¶å¤æ°ã¨Desktopã®Kubernetesç°å¢ã ã¨ããã¡ã®ãã·ã³ãéåããã¦ã¡ã¢ãªã足ããªããªã£ã¦ãã¾ããSwapç¨æãã¦éãããããããã§ããããã£ã¦ãããã£ã¦ãã¨ã§ããã¾ãããã
Zookageã¯ã»ãã¨ç°¡åã«æå ã§ç°å¢ãç°¡åã«ã¹ãã³ã¢ããåºæ¥ãã®ã§ã¨ã£ã¦ã便å©ãªã®ã§ãã¿ããªä½¿ã£ã¦æ¬²ããã kustomization.yaml ã§ä½¿ããªããã®ãã³ã¡ã³ãã¢ã¦ããã¦./bin/up
ããã ãã§ç«ã¡ä¸ãã£ã¦ãã¾ãã
ãã ããDesktopç°å¢ãåæã¨ãã¦ãã®ã§PVCã¯ãã¼ã«ã«ãã¹ãã®ãã¹ãç´æ¥ä½¿ã£ããããã®ã§Docker DesktopãRancher Desktopãªã©ã®Desktopç°å¢ä»¥å¤ã§ã¯ãã®ã¾ã¾ä½¿ããªãã®ã§æ³¨æã§ãã
ãã©ã¦ã¶éãéããã³ãªã®ã§ã¡ã¢ãªã¯32GBã64GBã¯æ¬²ããã§ããããã©ã¦ã¶ã§ã¡ã¢ãªãã£ããæã£ã¦ããããã®ãè¾ããã zookage.github.io↩ -
Certified Kubernetes Distributionã¨ãã¦CNCFã«åãä¸ãããã¦ããå
¬å¼ã®Kubernetesãã£ã¹ããªãã¥ã¼ã·ã§ã³ãæµãããåããã°ç°¡åã«ã¹ãã³ã¢ããåºæ¥ã便å©ãªãã¤ã§ãï¼K3sãå°å
¥ã¯ã ãã¶ä¼¼ã¦ã¾ãï¼ãRancherå¿
é ã§ããªãããSUSEã ããSUSEã®OSãããªãã¨ãã¡ã£ã¦ãã¨ããªãã®ã§å®å¿ãã¦ä½¿ã£ã¦æ¬²ããã
ãã¾ããå°å ¥ãªè©±ã¯2022ã®ã¢ãã«ã¬ã§è¨äºã«ãã¦ãã®ã§ãã¡ããæ¯éãã
yassan.hatenablog.jp↩ -
Flink Forward 2024ã®ä»ã®ã»ãã·ã§ã³åç»ã®ä¸é¨ã以ä¸ã§å
¬éããã¦ããã®ã§ãä»åã®ã»ãã·ã§ã³ãåºã¦ããªãããªãã¨æå¾
ãã¾ãããã
youtube.com↩ - Ververicaã¯ãAlibabaãè²·åããData Artisansã®ç¾ç¶ã®ç¤¾å↩
-
以ä¸ãããã¥ã¡ã³ãã
docs.rke2.io↩